0
votes

I am using Akka.net and looking to implement a reactive equivalent of a 'DDD repository', from what I have seen from here http://qnalist.com/questions/5585484/ddd-eventsourcing-with-akka-persistence and https://gitter.im/petabridge/akka-bootcamp/archives/2015/06/25

I understand the idea of having a coordinator that keeps a number of actors in memory according to some live in-memory count or some amount of elapsed time.

As a summary (based on the links above) I am trying to:

  1. Create an Aggregate coordinator (for each actor type) that returns aggregates on request.
  2. Each aggregate uses Context.SetReceiveTimeout method to identify if it's not used for some period of time. If so, it will receive ReceiveTimeout message.
  3. On receipt of timeout message, the Child will send a Passivate message back to coordinator (which in turn will then cause the coordinator to shut the child down).
  4. Whilst the child is being shutdown, all messages to child are intercepted by the coordinator and buffered.
  5. Once shutdown of child has been confirmed (in the coordinator), if there are buffered messages for that child it is recreated and all messages flushed through to the recreated child.

How would one intercept the messages that are being attempted to be sent to the child (step 4) and instead route them to the parent? Or in other words I want the child to say at the point of sending the Passivate message to also say "hey don't send me anymore messages, send them to my parent instead".

This would save me routing everything through the coordinator (or am i going about it in the wrong way and message intercept impossible to do, and should instead proxy everything through the parent)?

I have my message contracts:

public class GetActor
{
    public readonly string Identity;

    public GetActor(string identity)
    {
        Identity = identity;
    }
}

public class GetActorReply
{
    public readonly IActorRef ActorRef;

    public GetActorReply(IActorRef actorRef)
    {
        ActorRef = actorRef;
    }
}

public class Passivate // sent from child aggregate to parent coordinator
{
}

Coordinator class, which for every aggregate type there is a unique instance:

public class ActorLifetimeCoordinator r<T> : ReceiveActor where T : ActorBase
{
    protected Dictionary<Identity,IActorRef> Actors = new Dictionary<Identity, IActorRef>();
    protected Dictionary<Identity, List<object>> BufferedMsgs = new Dictionary<Identity, List<object>>();

    public ActorLifetimeCoordinator()
    {
        Receive<GetActor>(message =>
        {
            var actor = GetActor(message.Identity);
            Sender.Tell(new GetActorReply(actor), Self); // reply with the retrieved actor
        });

        Receive<Passivate>(message =>
        {
            var actorToUnload = Context.Sender;
            var task = actorToUnload.GracefulStop(TimeSpan.FromSeconds(10));

            // the time between the above and below lines, we need to intercept messages to the child that is being
            // removed from memory - how to do this?

            task.Wait(); // dont block thread, use pipeto instead?
        });
    }

    protected IActorRef GetActor(string identity)
    {
        IActorRef value;
        return Actors.TryGetValue(identity, out value)
            ? value : Context.System.ActorOf(Props.Create<T>(identity));            
    }
}

Aggregate base class from which all aggregates derive:

public abstract class AggregateRoot : ReceivePersistentActor
{
    private readonly DispatchByReflectionStrategy _dispatchStrategy
        = new DispatchByReflectionStrategy("When");      

    protected AggregateRoot(Identity identity)
    {
        PersistenceId = Context.Parent.Path.Name + "/" + Self.Path.Name + "/" + identity;

        Recover((Action<IDomainEvent>)Dispatch);

        Command<ReceiveTimeout>(message =>
        {
            Context.Parent.Tell(new Passivate());    
        });

        Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
    }

    public override string PersistenceId { get; }

    private void Dispatch(IDomainEvent domainEvent)
    {
        _dispatchStrategy.Dispatch(this, domainEvent);
    }

    protected void Emit(IDomainEvent domainEvent)
    {
        Persist(domainEvent, success =>
        {
            Dispatch(domainEvent);
        });
    }
}
1

1 Answers

1
votes

Easiest (but not simplest) option here is to use Akka.Cluster.Sharding module, which covers areas of coordinator pattern with support for actors distribution and balancing across the cluster.

If you will choose that you don't need it, unfortunately you'll need to pass messages through coordinator - messages themselves need to provide identifier used to determine recipient. Otherwise you may end up sending messages to dead actor.