0
votes

According to the Akka.Net documentation, PersistentView is deprecated, and PersistenceQuery should be used instead. In an ASP.Net Core 2.0 Web-API application I am using Akka.Net with event sourcing. I am using the SQL Server plugin for persistence, with events and snapshots. For persistent views I want to start using PersistenceQuery. When the application starts, the events are played back to restore the states of the actors.

I have implemented a journal reader, that is receiving events, and uses it to compose a view. Question is, how can I tell that the last played back event has arrived, so that the composed view can be saved (as a sort of snapshot)? I don't want to save the view after every event during the restore phase.

The journal reader is now started when the ActorSystem is initialised (called via Startup.cs). The code looks like this:

private static void InitialiseJournalReader()
{
    // Obtain read journal by plugin id.
    var readJournal = PersistenceQuery.Get(ActorSystem).ReadJournalFor<SqlReadJournal>("akka.persistence.query.myjournal");

    // Materialize stream, consuming events.
    var materializer = ActorMaterializer.Create(ActorSystem);

    var writer = ActorSystem.ActorOf(CreateViewsActor.GetActorProps(), CreateViewsActor.GetActorName());

    // issue query to journal
    Source<EventEnvelope, NotUsed> source = readJournal.CurrentEventsByTag("MyEvents");
    source.RunForeach(envelope => writer.Ask(envelope.Event), materializer);
}

CreateViewsActor is an Actor, which uses the messages to create one or more views. It also has to save these views (currently in JSON-format into an SQL Server table).

Unfortunately so far I have not found a working example of creating persistent views through a journal reader. But maybe I have been looking in the wrong places. So far I have the following questions:

  1. Are there any working examples of creating persistent views through a journal reader?
  2. How can the CreateViewsActor (or any code responsible for creating and saving the views) know that all recovery messages have been processed?
  3. What is the best place to initialise the journal readers?
1

1 Answers

0
votes

Read journals can be used for multiple purposes. In most cases for generating specialized read views from events. This however doesn't have to mean actors - you could easily transform views into updates in database tables to have a materialized views.

In case if you want to combine actors with streams, you can use either Sink.ActorRef or Sink.ActorRefWithAck, depending on if you want to include backpressure in your actor or work in full-thrust mode. Example:

using (var materializer = system.Materializer())
{
    var readJournal = PersistenceQuery.Get(system)
        .ReadJournalFor<SqlReadJournal>("akka.persistence.query.my-read-journal");

    var writer = asysem.ActorOf(CreateViewsActor.Props(), CreateViewsActor.GetActorName());

    readJournal
        .CurrentEventsByTag("MyEvents")
        .Collect(envelope => envelope.Event as MyEvent)
        .RunWith(Sink.ActorRefWithAck<MyEvent>(writer, 
            onInitMessage: CreateViewsActor.Init.Instance,
            ackMessage: CreateViewsActor.Ack.Instance,
            onCompleteMessage: CreateViewsActor.Done.Instance), materializer);
}

Here a init and ack messages are send from the actor to inform stream, when to start emitting or just to emit a next element to an actor (when available). A last sink parameter (on complete message), will be send to an actor, once a stream has completed.

In case of doubts, you can always look at official Akka.NET tests (see: 1 and 2).

Regarding the journal reader initialization - it's an object bounded with it's lifetime to an actor system, so it could be initialized and available from the same place as your actor system.