0
votes

I am having an issue while trying akka.net routers. My round robin router does not terminate after broadcasting PoisonPill message if there is an Exception in routee which is handled using supervisorstrategy. If the exception is not throw or handled with try catch in the routee, router actor terminates just fine. Is there anything I am missing with my approach?

sample code to reproduce the issue:

using System;
using System.IO;
using Akka.Actor;
using Akka.Event;
using Akka.Routing;
using NLog;
using NLog.Config;
using NLog.Targets;
using LogLevel = NLog.LogLevel;

namespace AkkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            InitNlog();

            var actorSystem = ActorSystem.Create("ActorSystem");

            IActorRef coordinator = actorSystem.ActorOf(Props.Create(() => new Coordinator()));

            for (int i = 0; i < 1000; i++)
            {
                ChildActor.ProcessData processData = new ChildActor.ProcessData(i);

                coordinator.Tell(processData);
            }

            coordinator.Tell(new Coordinator.DisposeAll());

            Console.ReadLine();

        }

        static void InitNlog()
        {
            // Step 1. Create configuration object 
            var config = new LoggingConfiguration();

            // Step 2. Create targets and add them to the configuration 
            var consoleTarget = new ColoredConsoleTarget();
            config.AddTarget("console", consoleTarget);

            // Step 3. Set target properties 
            consoleTarget.Layout = @"${date:format=HH\:mm\:ss} ${logger} ${message}";

            // Step 4. Define rules
            var rule1 = new LoggingRule("*", LogLevel.Debug, consoleTarget);
            config.LoggingRules.Add(rule1);

            // Step 5. Activate the configuration
            LogManager.Configuration = config;
        }
    }

    public class Coordinator : ReceiveActor
    {
        public class DisposeAll
        {

        }


        private readonly ILoggingAdapter _logger = Context.GetLogger();

        private IActorRef _consumer;

        public Coordinator()
        {
            Receive<ChildActor.ProcessData>(x => { _consumer.Tell(x); });

            Receive<DisposeAll>(x => { _consumer.Tell(x); });
        }

        protected override void PreStart()
        {
            if (Context.Child("Consumer").Equals(ActorRefs.Nobody))
            {
                _consumer = Context.ActorOf(
                    Props.Create(() => new Consumer())
                    , "Consumer");               
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(ex =>
            {
                if (ex is InvalidDataException)
                {
                    return Directive.Resume;
                }

                return Directive.Stop;
            });
        }
    }

    public class Consumer : ReceiveActor
    {
        private readonly ILoggingAdapter _logger = Context.GetLogger();

        private IActorRef _childRouter;

        private int _progress;

        public Consumer()
        {
            Receive<ChildActor.ProcessData>(x =>
            {
                _progress++;

                if(_progress%100==0) _logger.Info("{0} items pushed to router", _progress);

                _childRouter.Forward(x);
            });

            Receive<Terminated>(x =>
            {                
                _logger.Error("Child Router terminated.");
            });

            Receive<Coordinator.DisposeAll>(x => { _childRouter.Forward(new Broadcast(PoisonPill.Instance)); });
        }

        protected override void PreStart()
        {
            if (Context.Child("ChildRouter").Equals(ActorRefs.Nobody))
            {
                _childRouter =
                    Context.ActorOf(
                        Props.Create(() => new ChildActor())
                            .WithRouter(new RoundRobinPool(100))
                            .WithSupervisorStrategy(new OneForOneStrategy(ex => Directive.Escalate)), "ChildRouter");

                Context.Watch(_childRouter);
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(ex => Directive.Escalate);
        }

    }



    public class ChildActor : ReceiveActor
    {

        public class ProcessData
        {
            public  int Data { get; private set; }

            public ProcessData(int data)
            {
                Data = data;
            }
        }

        private readonly ILoggingAdapter _logger = Context.GetLogger();
        public ChildActor()
        {
            Receive<ProcessData>(x =>
            {
                if (x.Data % 5 == 0)
                {
                    _logger.Info("{0} is Divisible by 5", x.Data);
                }
                else
                {
                   //if this line is commented, router terminates just fine
                   throw new InvalidDataException("Error while processing.");
                }
            });
        }
    }
}
1

1 Answers

0
votes

Broadcast message is used to send a given message to all children of a given router. This also exactly what's happening in your case - you're sending poison pill stop children of the router not the router itself.

If you want to kill router using PoisonPill, send it to the router directly.