I am having an issue while trying akka.net routers. My round robin router does not terminate after broadcasting PoisonPill message if there is an Exception in routee which is handled using supervisorstrategy. If the exception is not throw or handled with try catch in the routee, router actor terminates just fine. Is there anything I am missing with my approach?
sample code to reproduce the issue:
using System;
using System.IO;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
using NLog;
using NLog.Config;
using NLog.Targets;
using LogLevel = NLog.LogLevel;
namespace AkkaTest
{
class Program
{
static void Main(string[] args)
{
InitNlog();
var actorSystem = ActorSystem.Create("ActorSystem");
IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));
for (int i = 0; i < 1000; i++)
{
ChildActor.ProcessData processData = new ChildActor.ProcessData(i);
coordinator.Tell(processData);
}
coordinator.Tell(new Coordinator.DisposeAll());
Console.ReadLine();
}
static void InitNlog()
{
// Step 1. Create configuration object
var config = new LoggingConfiguration();
// Step 2. Create targets and add them to the configuration
var consoleTarget = new ColoredConsoleTarget();
config.AddTarget("console", consoleTarget);
// Step 3. Set target properties
consoleTarget.Layout = @"${date:format=HH\:mm\:ss} ${logger} ${message}";
// Step 4. Define rules
var rule1 = new LoggingRule("*", LogLevel.Debug, consoleTarget);
config.LoggingRules.Add(rule1);
// Step 5. Activate the configuration
LogManager.Configuration = config;
}
}
public class Coordinator : ReceiveActor
{
public class DisposeAll
{
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _consumer;
public Coordinator()
{
Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });
Receive<DisposeAll>(x => { _consumer.Tell(x); });
}
protected override void PreStart()
{
if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
{
_consumer = Context.ActorOf(
Props.Create(() => new Consumer())
, "Consumer");
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
if (ex is InvalidDataException)
{
return Directive.Resume;
}
return Directive.Stop;
});
}
}
public class Consumer : ReceiveActor
{
private readonly ILoggingAdapter _logger = Context.GetLogger();
private IActorRef _childRouter;
private int _progress;
public Consumer()
{
Receive<ChildActor.ProcessData>(x =>
{
_progress++;
if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);
_childRouter.Forward(x);
});
Receive<Terminated>(x =>
{
_logger.Error("Child Router terminated.");
});
Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
}
protected override void PreStart()
{
if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
{
_childRouter =
Context.ActorOf(
Props.Create(() => new ChildActor())
.WithRouter(new RoundRobinPool(100))
.WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");
Context.Watch(_childRouter);
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex => Directive.Escalate);
}
}
public class ChildActor : ReceiveActor
{
public class ProcessData
{
public int Data { get; private set; }
public ProcessData(int data)
{
Data = data;
}
}
private readonly ILoggingAdapter _logger = Context.GetLogger();
public ChildActor()
{
Receive<ProcessData>(x =>
{
if (x.Data % 5 == 0)
{
_logger.Info("{0} is Divisible by 5", x.Data);
}
else
{
//if this line is commented, router terminates just fine
throw new InvalidDataException("Error while processing.");
}
});
}
}
}