0
votes

How can you use Akka to perform an operation on the db using Hibernate and not block the web client?

UPDATE:

Turns out the error was caused by the dao.get() method. I changed the start() method to take an actual object and not the id from the database and now I get no error, but nothing is happening(it gets stuck on em.merge() as I previously said).

public CompletionStage<Result> start(SomeObject object) {
    ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
    return CompletableFuture.supplyAsync(() -> doStuff(object), play.libs.concurrent.HttpExecution.fromThread(ec))
            .thenApply(i -> ok("Got result: " + i));
}

OUTDATED:

If I try something like:

@Transactional
public CompletionStage<Result> start(Long id) {
    ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
    return CompletableFuture.supplyAsync(() -> doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec))
            .thenApply(i -> ok("Got result: " + i));
}

Where doStuff only performs an entityManager.merge(), I get:

[CompletionException: org.hibernate.SessionException: Session is closed!]

or

[CompletionException: java.lang.IllegalStateException: EntityManager is closed]

When I use the code below to start the process above:

@Transactional
public Result mainMethod() {
    List<SomeObject> allObjects= dao.getAll();
    int size = allObjects.size();
    for(int i = 0; i < size; i++) {
        start(allObjects.get(i).getId());
    }
    return ok("Started");
}

Then the newly created threads(actors) get into an infinite loop when trying a database operation.

Thanks!

Complete stack trace:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[CompletionException: org.hibernate.SessionException: Session is closed!]] at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:280) at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:206) at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160) at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188) at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:98) at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100) at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99) at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:344) at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:343) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109) at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:94) at scala.concurrent.java8.FuturesConvertersImpl$P.accept(FutureConvertersImpl.scala:89) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) at play.core.j.HttpExecutionContext$$anon$2.run(HttpExecutionContext.scala:56) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: org.hibernate.SessionException: Session is closed! at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ... 7 common frames omitted Caused by: org.hibernate.SessionException: Session is closed! at org.hibernate.internal.AbstractSessionImpl.errorIfClosed(AbstractSessionImpl.java:133) at org.hibernate.internal.SessionImpl.setCacheMode(SessionImpl.java:1455) at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1144) at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1068) at daos.dao.get(dao.java:45) at controllers.DemoController.lambda$start$0(DemoController.java:195) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 7 common frames omitted

LE:

Tried with:

ExecutionContext ec = Akka.system().dispatchers().lookup("akka.actor.db-context");
return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", true, ()-> doStuff(dao.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
                .thenApply(i -> ok("Got result: " + i));

And I get the same error.

2
Maybe I'm doing it wrong, so my simpler question: How do you make non web client blocking database operations using Play's Akka Actors? For example, start some update operation that sets some fields to certain values without having the client that sent the controller action waiting for the result(as it is not relevant to the user).taviss

2 Answers

0
votes

I solved it by using what @asch suggested:

Controller:

    @Inject
    private JPAApi jpa;
    [...]
    return CompletableFuture.supplyAsync(() -> jpa.withTransaction("default", false, ()-> doStuff(objectDAO.get(id))), play.libs.concurrent.HttpExecution.fromThread(ec))
                .thenApply(i -> ok("Got result: " + i));

And in the doStuff method class:

    private static final JPAApi jpaApi = Play.current().injector().instanceOf(JPAApi.class);
    [...]
    jpaApi.withTransaction(() -> {
        EntityManager em = jpaApi.em();
        em.merge(object);
    });
    [...]
-1
votes

The @Transactional annotation is on the synchronous method. I guess you need a transaction inside the async block. You can use for jpa method withTransaction instead of the annotation. Its signature (from Play java api)

/**
 * Run a block of code in a JPA transaction.
 *
 * @param name The persistence unit name
 * @param readOnly Is the transaction read-only?
 * @param block Block of code to execute
 * @param <T> type of result
 * @return code execution result
 */
public <T> T withTransaction(String name, boolean readOnly, Supplier<T> block);

It should be something like this:

  1. Inject JPAApi in the controller:

    @Inject
    JPAApi jpa;
    
  2. Implement async executor with call to jpa:

     jpa.withTransaction("default", true, ()->doStuff(dao.get(id)), play.libs.concurrent.HttpExecution.fromThread(ec))
        .thenApply(i -> ok("Got result: " + i)));
    

    Note the second argument true in the call to withTransaction is for readOnly operation (in your case).

  3. Change the DAO class to use injected JPAAPi as well.