I'm implementing a Reactive project with Spring boot 2.3.1, Webflux, Spring Data with reactive mongodb driver and Amazon SDk 2.14.6.
I have a CRUD that persist an entity on MongoDB and must upload a file to S3. I'm using the SDK reactive method s3AsyncClient.putObject
and I facing some issues. The CompletableFuture throws the following exception:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 60000 millis
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[na:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map(Mono.java:3054)
br.com.wareline.waredrive.service.S3Service.uploadFile(S3Service.java:94)
The file that I trying to upload have about 34kb, It is a simple text file.
The upload method is in my S3Service.java
class which is autowired at DocumentoService.java
@Component
public class S3Service {
@Autowired
private final ConfiguracaoService configuracaoService;
public Mono<PutObjectResponse> uploadFile(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final String cliente) {
return configuracaoService.findByClienteId(cliente)
.switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, String.format("Configuração com id %s não encontrada", cliente))))
.map(configuracao -> uploadFileToS3(headers, body, fileKey, configuracao))
.doOnSuccess(response -> {
checkResult(response);
});
}
private PutObjectResponse uploadFileToS3(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final Configuracao configuracao) {
final long length = headers.getContentLength();
if (length < 0) {
throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
}
final Map<String, String> metadata = new HashMap<>();
final MediaType mediaType = headers.getContentType() != null ? headers.getContentType() : MediaType.APPLICATION_OCTET_STREAM;
final S3AsyncClient s3AsyncClient = getS3AsyncClient(configuracao);
return s3AsyncClient.putObject(
PutObjectRequest.builder()
.bucket(configuracao.getBucket())
.contentLength(length)
.key(fileKey)
.contentType(mediaType)
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body))
.whenComplete((resp, err) -> s3AsyncClient.close())
.join();
}
public S3AsyncClient getS3AsyncClient(final Configuracao s3Props) {
final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMinutes(1))
.writeTimeout(Duration.ofMinutes(1))
.connectionTimeout(Duration.ofMinutes(1))
.maxConcurrency(64)
.build();
final S3Configuration serviceConfiguration = S3Configuration.builder().checksumValidationEnabled(false).chunkedEncodingEnabled(true).build();
return S3AsyncClient.builder()
.httpClient(httpClient)
.region(Region.of(s3Props.getRegion()))
.credentialsProvider(() -> AwsBasicCredentials.create(s3Props.getAccessKey(), s3Props.getSecretKey()))
.serviceConfiguration(serviceConfiguration)
.overrideConfiguration(builder -> builder.apiCallTimeout(Duration.ofMinutes(1)).apiCallAttemptTimeout(Duration.ofMinutes(1)))
.build();
}
I based my implementation in Amazon SDK documentation and the code examples at https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/s3/src/main/java/com/example/s3/S3AsyncOps.java
I can't figured out what is the cause of the async client timeout problem. The weird thing is that when I use the same S3AsyncClient, to download files from bucket, it works. I tried to increase the timeout in S3AsyncClient to about 5 min without success. I don't know what I'm doing wrong.
Mono.fromFuture
return it and call the uploadFileToS3 method from flatMap operator. – Martin TarjányiMono.fromFuture
as you sugested but I got the same error. – Ciro AnacletoFlux<ByteBuffer> body
whether it is actually consumed by the aws sdk or just hanging there. Also check that you don't subscribe to the same flux before/after uploading, that could cause similar issue. – Martin Tarjányi