1
votes

I am thinking about building a system that requires Actors to create a subscription to an Azure Service Bus topic with a filter specific to the Actor instance. My question is, if the Actor (that has the subscription to the Topic) has been deactivated in Service Fabric will it be (re-)activated by a new message being sent by Azure Service Bus?

Thank you

3

3 Answers

3
votes

You Actor will not be activated by receiving a message. It is activated by remoting calls and reminders only. So this approach won't work.

What you can do is receive messages in a Service, and forward them to an Actor instance. Calling an Actor creates the instance(s) on the fly, if needed.

1
votes

Based on Actor's lifecycle it has to be activated. Azure Service Bus message coming from a topic will not activate an actor. Instead, you'd need a supervisor process that would do so. Messages could contain a property to represent the required actor ID. It would also allow to simplify your Azure Service Bus topology by having a single topics and scaled out supervisor.

1
votes

This can be easy achived with reminders. Since the actor need to be called first, you can do this.

The create method will set the connections string , topic name, subscription name and create them if needed. The reminder will check if the Subscription client its not null and if it is will create it. The reminder will always execute on failure, like this you will be able to control the failures and restart it on crush.

https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
    {
        if (options?.ConnectionString == null)
        {
            return false;
        }
        await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);

        var client = new ManagementClient(options.ConnectionString);
        try
        {
            var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
            if (!exist)
            {
               await client.CreateTopicAsync(options.TopicName, cancellationToken);
            }
            exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            if (!exist)
            {
                await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            }
            var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
            if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
            {
                SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
            }

        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);                
        }
        return true;
    }
    public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
    {
        var client = new ManagementClient(options.ConnectionString);
        try
        {
            await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
            await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);
        }

    }
    private ISubscriptionClient subscriptionClient;       
    public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
    {
        var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
        if (!options.HasValue)
        {
            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
            return false;
        }


        var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);

        var msg = new Message(message.Body);
        if(message.UserProperties != null)
        {
            foreach (var item in message.UserProperties)
            {
                msg.UserProperties.Add(item);
            }
        }
        msg.Label = message.Label;



       await client.SendAsync(msg);
       await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);

        return true;
    }
    void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {                
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    {
        ActorEventSource.Current.ActorMessage(this, message.Label);

        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);


    }
    Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {

        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        ActorEventSource.Current.ActorMessage(this,
            string.Format("Exception context for troubleshooting: - Endpoint: {0}- Entity Path: {1}- Executing Action: {2} - MEssage: {3}",
            context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
        return Task.CompletedTask;
    }
    protected override async Task OnActivateAsync()
    {
        ActorEventSource.Current.ActorMessage(this, $"Actor '{Id.GetStringId()}' activated.");

        IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                        "Recieve_Message",
                        null,
                        TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                        TimeSpan.FromSeconds(1));


    }
    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    {
        if (reminderName.Equals("Recieve_Message"))
        {
            if(subscriptionClient == null)
            {
                var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                {
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return;
                }

                var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);

                subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);

                RegisterOnMessageHandlerAndReceiveMessages();
            }

        }


    }