I have one queue-publisher and one hundred queue-subscribers. Subscribers bind to one queue. The Rabbit`s guide said that to each queue Rabbit creates single thread. I want to send all messages to subscribers through one queue and save all unsended messages in the same queue if destination subscriber is offline. I have two solutions:
- I can send all messages through one queue to all subscribers by declaring exchange type "Direct" and bind to one QueueName. But if routingKey in publishing message not equals queue name, it doesnt save in queue, if direct consumer is offline.
suscriber`s code
static void Main(string[] args)
{
List<string> serevities1 = new List<string>() { "qwerty.red" };
List<string> serevities2 = new List<string>() { "asdfgh.green" };
string exchange = "topic_logs";
//string direction = ExchangeType.Topic;
string direction = ExchangeType.Direct;
var consumer1 = new MqDll.MqConsumer(exchange, direction, serevities1);
MqDll.MqConsumer consumer2;
Task.Run(() => {
Thread.Sleep(10000);
consumer2 = new MqDll.MqConsumer(exchange, direction, serevities2);
});
Console.ReadLine();
}
public class MqConsumer
{
private IConnection connection;
private IModel channel;
//ExchangeType.Fanout
public MqConsumer(string exchange, string direction, List<string> severities = null)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange, direction);
string queueName = "task_queue";
// queueName= channel.QueueDeclare().QueueName;
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Bind(queueName, exchange, severities);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += MsgReceived;
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consumer created");
}
~MqConsumer()
{
channel.Close();
connection.Close();
}
private void Bind(string queuename, string exchange, List<string> severities)
{
if (severities != null)
{
severities.ForEach(x =>
{
channel.QueueBind(queue: queuename,
exchange: exchange,
routingKey: x);
});
}
else
{
channel.QueueBind(queue: queuename,
exchange: exchange,
routingKey: "");
}
}
private void MsgReceived(object model, BasicDeliverEventArgs ea)
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var rkey = ea.RoutingKey;
Console.WriteLine($" [x] Received {message} {rkey}");
Console.WriteLine($" [x] Done {DateTime.Now} | {this.GetHashCode()}");
}
}
publisher`s code
static void Main(string[] args)
{
List<string> serevities = new List<string>() { "qwerty.red", "asdfgh.green" };
string exchange = "topic_logs";
//string direction = ExchangeType.Topic;
string direction = ExchangeType.Direct;
var publisher = new MqDll.MqPublisher(exchange, direction);
Console.WriteLine("Publisher created");
var msg = Console.ReadLine();
while (msg != "q")
{
serevities.ForEach(x =>
{
publisher.Publish("SomeMsg..", "topic_logs", x);
});
msg = Console.ReadLine();
}
}
public class MqPublisher
{
private IConnection connection;
private IModel channel;
public MqPublisher(string exchange, string type)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.ExchangeDeclare(exchange, type);
}
~MqPublisher()
{
channel.Close();
connection.Close();
}
public void Publish(string msg, string exchange = "logs", string severity = "", bool isPersistent = true)
{
var properties = channel.CreateBasicProperties();
properties.Persistent = isPersistent;
channel.BasicPublish(exchange: exchange,
routingKey: severity,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine(" [x] Sent {0}", msg);
}
}
- Or I can create queues to each subscriber and remove binding. Source code is simple and equals code in giude.
Is there a way to combine these two solutions and make one queue to all subscribers, bind subscribers to unique routing key and save messages if direct subscriber (bound to direct routing key) is offline?