This can be easy achived with reminders.
Since the actor need to be called first, you can do this.
The create method will set the connections string , topic name, subscription name and create them if needed. The reminder will check if the Subscription client its not null and if it is will create it. The reminder will always execute on failure, like this you will be able to control the failures and restart it on crush.
https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md
public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
{
if (options?.ConnectionString == null)
{
return false;
}
await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);
var client = new ManagementClient(options.ConnectionString);
try
{
var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
if (!exist)
{
await client.CreateTopicAsync(options.TopicName, cancellationToken);
}
exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
if (!exist)
{
await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
}
var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
{
SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
}
}
catch (Exception ex)
{
ActorEventSource.Current.ActorMessage(this, ex.Message);
}
return true;
}
public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
{
var client = new ManagementClient(options.ConnectionString);
try
{
await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
}
catch (Exception ex)
{
ActorEventSource.Current.ActorMessage(this, ex.Message);
}
}
private ISubscriptionClient subscriptionClient;
public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
{
var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
if (!options.HasValue)
{
ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
return false;
}
var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);
var msg = new Message(message.Body);
if(message.UserProperties != null)
{
foreach (var item in message.UserProperties)
{
msg.UserProperties.Add(item);
}
}
msg.Label = message.Label;
await client.SendAsync(msg);
await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);
return true;
}
void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
{
ActorEventSource.Current.ActorMessage(this, message.Label);
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
ActorEventSource.Current.ActorMessage(this,
string.Format("Exception context for troubleshooting: - Endpoint: {0}- Entity Path: {1}- Executing Action: {2} - MEssage: {3}",
context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
return Task.CompletedTask;
}
protected override async Task OnActivateAsync()
{
ActorEventSource.Current.ActorMessage(this, $"Actor '{Id.GetStringId()}' activated.");
IActorReminder Recieve_Message = await this.RegisterReminderAsync(
"Recieve_Message",
null,
TimeSpan.FromSeconds(1), //The amount of time to delay before firing the reminder
TimeSpan.FromSeconds(1));
}
public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
if (reminderName.Equals("Recieve_Message"))
{
if(subscriptionClient == null)
{
var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
if (!options.HasValue)
{
ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
return;
}
var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);
subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);
RegisterOnMessageHandlerAndReceiveMessages();
}
}
}