0
votes

I'm implementing a Reactive project with Spring boot 2.3.1, Webflux, Spring Data with reactive mongodb driver and Amazon SDk 2.14.6.

I have a CRUD that persist an entity on MongoDB and must upload a file to S3. I'm using the SDK reactive method s3AsyncClient.putObject and I facing some issues. The CompletableFuture throws the following exception:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 60000 millis
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[na:na]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
    reactor.core.publisher.Mono.map(Mono.java:3054)
    br.com.wareline.waredrive.service.S3Service.uploadFile(S3Service.java:94)

The file that I trying to upload have about 34kb, It is a simple text file.

The upload method is in my S3Service.java class which is autowired at DocumentoService.java

@Component
public class S3Service {

    @Autowired
    private final ConfiguracaoService configuracaoService;

    public Mono<PutObjectResponse> uploadFile(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final String cliente) {
        return configuracaoService.findByClienteId(cliente)
                .switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, String.format("Configuração com id %s não encontrada", cliente))))
                .map(configuracao -> uploadFileToS3(headers, body, fileKey, configuracao))
                .doOnSuccess(response -> {
                    checkResult(response);
                });
    }

    private PutObjectResponse uploadFileToS3(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final Configuracao configuracao) {

        final long length = headers.getContentLength();
        if (length < 0) {
            throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
        }
        final Map<String, String> metadata = new HashMap<>();
        final MediaType mediaType = headers.getContentType() != null ? headers.getContentType() : MediaType.APPLICATION_OCTET_STREAM;

        final S3AsyncClient s3AsyncClient = getS3AsyncClient(configuracao);

        return s3AsyncClient.putObject(
                PutObjectRequest.builder()
                        .bucket(configuracao.getBucket())
                        .contentLength(length)
                        .key(fileKey)
                        .contentType(mediaType)
                        .metadata(metadata)
                        .build(),
                AsyncRequestBody.fromPublisher(body))
                .whenComplete((resp, err) -> s3AsyncClient.close())
                .join();
    }

    public S3AsyncClient getS3AsyncClient(final Configuracao s3Props) {

        final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
            .readTimeout(Duration.ofMinutes(1))
            .writeTimeout(Duration.ofMinutes(1))
            .connectionTimeout(Duration.ofMinutes(1))
            .maxConcurrency(64)
            .build();

        final S3Configuration serviceConfiguration = S3Configuration.builder().checksumValidationEnabled(false).chunkedEncodingEnabled(true).build();

        return S3AsyncClient.builder()
            .httpClient(httpClient)
            .region(Region.of(s3Props.getRegion()))
            .credentialsProvider(() -> AwsBasicCredentials.create(s3Props.getAccessKey(), s3Props.getSecretKey()))
            .serviceConfiguration(serviceConfiguration)
            .overrideConfiguration(builder -> builder.apiCallTimeout(Duration.ofMinutes(1)).apiCallAttemptTimeout(Duration.ofMinutes(1)))
            .build();

    }

I based my implementation in Amazon SDK documentation and the code examples at https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/s3/src/main/java/com/example/s3/S3AsyncOps.java

I can't figured out what is the cause of the async client timeout problem. The weird thing is that when I use the same S3AsyncClient, to download files from bucket, it works. I tried to increase the timeout in S3AsyncClient to about 5 min without success. I don't know what I'm doing wrong.

1
Not sure if that's the issue, but you don't use AWS sdk reactively. When you call join you are effectively blocking a thread. Instead you should wrap the CompletableFuture with Mono.fromFuturereturn it and call the uploadFileToS3 method from flatMap operator.Martin Tarjányi
I already tryed wrap the completableFuture in a Mono.fromFuture as you sugested but I got the same error.Ciro Anacleto
Then as a next step I'd check the Flux<ByteBuffer> body whether it is actually consumed by the aws sdk or just hanging there. Also check that you don't subscribe to the same flux before/after uploading, that could cause similar issue.Martin Tarjányi

1 Answers

0
votes

I found the error. When I am defining the contentLength in PutObjectRequest.builder().contentLength(length) I use the headers.getContentLength() which is the size of whole request. In my request other informations is passed together, making the content length being greater than the real file length.

I found this in Amazon documentation:

The number of bytes set in the "Content-Length" header is more than the actual file size

When you send an HTTP request to Amazon S3, Amazon S3 expects to receive the amount of data specified in the Content-Length header. If the expected amount of data isn't received by Amazon S3, and the connection is idle for 20 seconds or longer, then the connection is closed. Be sure to verify that the actual file size that you're sending to Amazon S3 aligns with the file size that is specified in the Content-Length header.

https://aws.amazon.com/pt/premiumsupport/knowledge-center/s3-socket-connection-timeout-error/

The timeout error occurred because S3 waits until the content length sended reach the size informed in client, the file ends be transmited before to reach the content length informed. Then the connection stays idle and S3 closes the socket.

I change the content length to the real file size and the upload was successful.