1
votes

I'm new to Spring Webflux/Reactive Programming in Spring and I have some problem with the scheduler task:

The scheduler (executor thread) doesn't terminate after an error occured, e.g. when trying to save an entity to the database which is offline using Schedulers.parallel() threads.

My code actually looks like this:

public Mono<Void> addCatalogEvent(Mono<CatalogEvent> catalogWriteEvent) {
    return catalogWriteEvent
            .filter(event -> event.getStoreId() != null && event.getCatalogEventType() != null && event.getProductId() != null && event.getUnitPrice() != null)
            .switchIfEmpty(Mono.error(new BadRequestException("Not all required fields have been provided for the event!")))
            .publishOn(Schedulers.parallel())
            .map(event -> {
                catalogEventRepository.save(event);
                return event;
            }).log()
            .then()
            .onErrorResume(e -> {
                Schedulers.shutdownNow();
                return Mono.error(new PersistenceException("Persistence failed:  " + e.getMessage()));
            });
}

However, when the database goes offline, the I'm getting error messages printed out to the console but the scheduler/executor doesn't stop trying to persist the entity to the database. The process keeps going on and the error messages are printed out to the console again and again...

How can I tell the scheduler/executor to stop if an error occurs so that these threads won't run forever when an error occured?

And how can I automatically restart these scheduler/executor threads as soon as the database is online again?

I just added the .log() operator after the .map() operator which executes the persistence operation. It seems that the .log() operation isn't executed again after Mono.error() has been thrown:

2018-09-21 12:17:12.942  INFO 28234 --- [ctor-http-nio-8] reactor.Mono.Map.2                       : onSubscribe(FluxMap.MapSubscriber)
2018-09-21 12:17:12.942  INFO 28234 --- [ctor-http-nio-8] reactor.Mono.Map.2                       : request(unbounded)
2018-09-21 12:17:12.965  WARN 28234 --- [     parallel-3] org.postgresql.jdbc.PgConnection         : Validating connection.

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:333) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:155) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:132) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgConnection.isValid(PgConnection.java:1364) ~[postgresql-42.2.2.jar:42.2.2]
    at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:150) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) [HikariCP-2.7.9.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getConnectionForTransactionManagement(LogicalConnectionManagedImpl.java:254) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.begin(LogicalConnectionManagedImpl.java:262) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.begin(JdbcResourceLocalTransactionCoordinatorImpl.java:214) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.engine.transaction.internal.TransactionImpl.begin(TransactionImpl.java:56) [hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:164) [spring-orm-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) [spring-orm-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) [spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) [spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) [spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) [spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) [spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:135) [spring-data-jpa-2.0.8.RELEASE.jar:2.0.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:61) [spring-data-commons-2.0.8.RELEASE.jar:2.0.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at com.sun.proxy.$Proxy92.save(Unknown Source) [na:na]
    at com.catalog.api.reactive.service.CatalogEventService.lambda$addCatalogEvent$1(CatalogEventService.java:33) [classes/:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:178) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
Caused by: java.io.EOFException: null
    at org.postgresql.core.PGStream.receiveChar(PGStream.java:295) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1947) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:306) ~[postgresql-42.2.2.jar:42.2.2]
    ... 45 common frames omitted

2018-09-21 12:17:12.966  WARN 28234 --- [     parallel-3] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection org.postgresql.jdbc.PgConnection@74772caf (This connection has been closed.)
2018-09-21 12:17:12.969  WARN 28234 --- [     parallel-3] org.postgresql.jdbc.PgConnection         : Validating connection.


2018-09-21 12:17:12.970  WARN 28234 --- [     parallel-3] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection org.postgresql.jdbc.PgConnection@5b22b00 (This connection has been closed.)
2018-09-21 12:17:12.972  WARN 28234 --- [     parallel-3] org.postgresql.jdbc.PgConnection         : Validating connection.

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
 at [...]
Caused by: java.io.EOFException: null
 at [...]
2018-09-21 12:17:12.973  WARN 28234 --- [     parallel-3] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection org.postgresql.jdbc.PgConnection@48ed6d4d (This connection has been closed.)
2018-09-21 12:17:12.975  WARN 28234 --- [onnection adder] unknown.jul.logger                       : ConnectException occurred while connecting to localhost:5432

java.net.ConnectException: Connection refused (Connection refused)
 at [...]

2018-09-21 12:17:12.975  WARN 28234 --- [     parallel-3] org.postgresql.jdbc.PgConnection         : Validating connection.

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
 at [...]
Caused by: java.io.EOFException: null
 at [...]
2018-09-21 12:17:12.976  WARN 28234 --- [     parallel-3] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection org.postgresql.jdbc.PgConnection@4f66a7fc (This connection has been closed.)
2018-09-21 12:17:12.977 ERROR 28234 --- [onnection adder] org.postgresql.Driver                    : Connection error: 
org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
 at [...]
Caused by: java.net.ConnectException: Connection refused (Connection refused)
 at [...]

2018-09-21 12:17:12.978  WARN 28234 --- [     parallel-3] org.postgresql.jdbc.PgConnection         : Validating connection.

[SAME ERROR is repeating again until Mono.error() is thrown which results in the following]

2018-09-21 12:17:42.955  WARN 28234 --- [     parallel-3] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: 08001
2018-09-21 12:17:42.956 ERROR 28234 --- [     parallel-3] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30005ms.
2018-09-21 12:17:42.956  WARN 28234 --- [     parallel-3] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: 08001
2018-09-21 12:17:42.956 ERROR 28234 --- [     parallel-3] o.h.engine.jdbc.spi.SqlExceptionHelper   : Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
2018-09-21 12:17:42.969 ERROR 28234 --- [     parallel-3] reactor.Mono.Map.2                       : onError(org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection)
2018-09-21 12:17:42.970 ERROR 28234 --- [     parallel-3] reactor.Mono.Map.2                       : 

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:450) ~[spring-orm-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:135) ~[spring-data-jpa-2.0.8.RELEASE.jar:2.0.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:61) ~[spring-data-commons-2.0.8.RELEASE.jar:2.0.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at com.sun.proxy.$Proxy92.save(Unknown Source) ~[na:na]
    at com.catalog.api.reactive.service.CatalogEventService.lambda$addCatalogEvent$1(CatalogEventService.java:33) ~[classes/:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:178) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:111) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:97) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:109) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getConnectionForTransactionManagement(LogicalConnectionManagedImpl.java:254) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.begin(LogicalConnectionManagedImpl.java:262) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.begin(JdbcResourceLocalTransactionCoordinatorImpl.java:214) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.engine.transaction.internal.TransactionImpl.begin(TransactionImpl.java:56) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:164) ~[spring-orm-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    ... 26 common frames omitted
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30005ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.9.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
    ... 33 common frames omitted
Caused by: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:245) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:195) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.makeConnection(Driver.java:452) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.connect(Driver.java:254) ~[postgresql-42.2.2.jar:42.2.2]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:117) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:123) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:365) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:194) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:460) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.access$100(HikariPool.java:71) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:699) ~[HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:685) ~[HikariCP-2.7.9.jar:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    ... 3 common frames omitted
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:400) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:243) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:225) ~[na:na]
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:402) ~[na:na]
    at java.base/java.net.Socket.connect(Socket.java:591) ~[na:na]
    at org.postgresql.core.PGStream.<init>(PGStream.java:69) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156) ~[postgresql-42.2.2.jar:42.2.2]
    ... 17 common frames omitted

2018-09-21 12:17:51.326  WARN 28234 --- [onnection adder] unknown.jul.logger                       : ConnectException occurred while connecting to localhost:5432

java.net.ConnectException: Connection refused (Connection refused)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:400) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:243) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:225) ~[na:na]
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:402) ~[na:na]
    at java.base/java.net.Socket.connect(Socket.java:591) ~[na:na]
    at org.postgresql.core.PGStream.<init>(PGStream.java:69) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49) [postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:195) [postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.makeConnection(Driver.java:452) [postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.connect(Driver.java:254) [postgresql-42.2.2.jar:42.2.2]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:117) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:123) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:365) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:194) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:460) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.access$100(HikariPool.java:71) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:699) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:685) [HikariCP-2.7.9.jar:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]

2018-09-21 12:17:51.326 ERROR 28234 --- [onnection adder] org.postgresql.Driver                    : Connection error: 

org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:245) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:195) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.makeConnection(Driver.java:452) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.Driver.connect(Driver.java:254) ~[postgresql-42.2.2.jar:42.2.2]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:117) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:123) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:365) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:194) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:460) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.access$100(HikariPool.java:71) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:699) [HikariCP-2.7.9.jar:na]
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:685) [HikariCP-2.7.9.jar:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:400) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:243) ~[na:na]
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:225) ~[na:na]
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:402) ~[na:na]
    at java.base/java.net.Socket.connect(Socket.java:591) ~[na:na]
    at org.postgresql.core.PGStream.<init>(PGStream.java:69) ~[postgresql-42.2.2.jar:42.2.2]
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156) ~[postgresql-42.2.2.jar:42.2.2]

[REPEATED FOREVER]

UPDATE

It seems that the Hikari threads won't stop establishing a connection if my database goes down. However, when I restart my database again, the error logging seems to stop and the persistence operation is not executed.

Can I stop the reestablishing of a connection process while my database is offline to stop wasting resources?

1

1 Answers

0
votes

With project reactor, doOn*** operators are "side effects" operators and you should not implement business logic or I/O operations within those.

In this case, I believe that an exception thrown by your repository call is not translated into an error signal in the reactive chain. This would explain why this is not behaving how you'd expect it to. You should probably use something else, like a map operator.

public Mono<Void> addEvent(Mono<Event> event){
    return event
        .filter(event -> event.getID()!= null)
        .switchIfEmpty(Mono.error(new BadRequestException("ID shouldn't be null!")))
        .publishOn(Schedulers.parallel())
        .map(event -> {
            // if this throws an exception, this will be turned into an error message
            eventRepository.save(event); 
            return event;
        })
        .then();

Now, we you're mentioning that:

the scheduler/executor doesn't stop trying to persist the entity to the database

Looking at your logs, it seems that the Hikari pool is retrying to connect to the database. Once a connection timeout is hit, an error is thrown by the pool and the Mono.error is triggered. On the background, it seems Hikari is still trying to reconnect (possibly with an exponential backoff), because that's its job. The threads you're looking at aren't the ones from the Scheduler, but the ones from the connection pool.

A few other notes about your code snippet:

  • You should avoid at all cost using blocking libraries in a reactive application. You're pretty much losing all the benefits of using reactive programming.
  • switchIfEmpty(Mono.error(new BadRequestException("ID shouldn't be null!"))) effectively translates an empty Mono (in case the id is null here), into an error signal. When an error signal flows through the chain, doOnNext won't be called.
  • doOnError(e -> Mono.error(new PersistenceException("Persistence failed...."))) won't do what you think it should; again, this is a side-effect method, so this won't translate an error into a persistence exception. In there, you should just log the error, but not act on it.
  • onErrorResume is meant to switch to a fallback method in case of an error