博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ --- Publish/Subscribe(发布/订阅)
阅读量:4330 次
发布时间:2019-06-06

本文共 6736 字,大约阅读时间需要 22 分钟。

目录

前言

在第二篇文章中介绍了 Work Queues(工作队列),它适用于把一个消息发送给一个 Consumer 去处理的场景,也就是说每个 Consumer 都是处理不同的消息,如果某个 Consumer 挂了才会把未完成或没有处理的消息转发给其它的 Consumer 去处理。假如要把一个消息发送给多个 Consumer 去处理,也就是说每个 Consumer 都是处理相同的消息,这种场景就需要用到发布/订阅(Publish/Subscribe)模式,即 Producer 为发布者,Consumer 为订阅者。

匿名交换机(Nameless exchange

Exchange 被称为交换机,担任中转站的职责,其工作原理是 Producer 先把消息发送给 Exchange,然后再由 Exchange 指定消息按照路由规则进入队列转发给 Consumer 去处理。在前两篇文章的示例中其实已经用到了默认的 Exchange,即匿名交换机(Nameless exchange)。

/*exchange:指定空字符串表示默认的Exchange标识 | routingKey:指定消息发送到哪个队列*/channel.BasicPublish(exchange, routingKey, basicProperties, body)

交换机的类型(Exchange types

Exchange 必须要知道如何处理它接收到的每个消息,是否让消息进入指定的队列?是否让消息进入多个队列?是否直接忽略消息?这些都取决于它的四种类型。

1.Direct:处理路由键。需要将一个队列绑定到 Exchange 上,只允许该消息与一个特定的路由键名称完整匹配才能被转发给队列。

/*Producer*///定义交换机channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Direct//类型);channel.BasicPublish(   exchange: "Tua",   routingKey: "Mr.Tua",//路由键名称   basicProperties: null,   body: body);
/*Consumer*///定义交换机channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Direct);//绑定交换机和队列,指明如何转发消息给队列channel.QueueBind(   queue: queueName,   exchange: "Tua",   routingKey: "Mr.Tua"//此处和生产者的路由键作比较,相同则转发消息,反之则不转发);

2.Fanout:不处理路由键。只要将队列绑定到 Exchange 上,Producer 发送到 Exchange 的消息都会被广播到所有的队列上。 

/*Producer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Fanout);channel.BasicPublish(   exchange: "Tua",   routingKey: string.Empty,//此处若有值则会忽略   basicProperties: null,   body: body);
/*Consumer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Fanout);//绑定交换机和队列,指明如何转发消息给队列channel.QueueBind(   queue: queueName,   exchange: "Tua",   routingKey: string.Empty//此处若有值则会忽略);

3.Topic:将路由键和某种命名规则进行匹配,只允许匹配上的消息才能被转发给队列。符号“#”匹配一个或多个词,符号“*”匹配一个词。

/*Producer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Topic);channel.BasicPublish(   exchange: "Tua",   routingKey: "Mr.Tua.Tua"   basicProperties: null,   body: body);
/*Consumer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Topic);//绑定交换机和队列,指明如何转发消息给队列channel.QueueBind(   queue: queueName,   exchange: "Tua",   routingKey: "Mr.#"//符号#可以匹配上Mr.Tua.Tua,符号*只能匹配上Mr.Tua);

4.Headers:它不依赖路由键,而是在接收方绑定队列和 Exchange 时指定键值对和发送方 headers 属性中的键值对进行匹配,键值对的值可以是任何类型,只允许匹配上的消息才能被转发给队列。匹配规则 x-match 有两种类型:all 表示所有的键值对匹配;any 表示至少有一个键值对匹配。

/*Producer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Headers);var headers = new Dictionary
();//定义键值对集合headers.Add("Name", "Mr.Tua");headers.Add("Age", 102);var basicProperties = channel.CreateBasicProperties();basicProperties.Headers = headers;//绑定消息中的headers属性值channel.BasicPublish( exchange: "Tua", routingKey: string.Empty, basicProperties: basicProperties, body: body);
/*Consumer*/channel.ExchangeDeclare(   exchange: "Tua",   type: ExchangeType.Headers);var headers = new Dictionary
();//定义键值对集合headers.Add("x-match", "any");//定义匹配规则headers.Add("Name", "Mr.Tua.Tua");headers.Add("Age", 102);channel.QueueBind( queue: queueName, exchange: "Tua", routingKey: string.Empty, arguments: headers//指定键值对);

 临时队列(Temporary queues

当希望在生产者和消费者之间共享队列时,为队列命名是很重要的。如果希望接收所有最新的消息时,那么就可以不用关心队列名称了,需要一个新的空队列,而且当 Consumer 和 Rabbit 服务连接断开时需要自动删除该队列,这就是临时队列(Temporary queues)的作用。

string queueName = channel.QueueDeclare().QueueName;//创建一个随机命名的唯一非持久可以自动删除的临时队列

完整示例

现在修改上一章的代码,通过 Fanout Exchange 使两个 Consumer 接收相同的消息:

using RabbitMQ.Client;using System;using System.Text;namespace Producer{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory            {                HostName = "10.202.228.107",                UserName = "Tua",                Password = "Tua",                Port = 5672            };            using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    channel.ExchangeDeclare                    (                        exchange: "Tua",                        type: ExchangeType.Fanout                    );                    for (int m = 0; m < 10; m++)                    {                        string marks = string.Empty;                        for (int n = 0; n <= m; n++)                        {                            marks += ">";                        }                        string msg = "Mr.Tua" + marks + marks.Length + "s";                        var body = Encoding.UTF8.GetBytes(msg);                        channel.BasicPublish                        (                            exchange: "Tua",                            routingKey: "",                            basicProperties: null,                            body: body                        );                        Console.WriteLine("Producer sent message: {0}", msg);                    }                    Console.ReadLine();                }            }        }    }}
Producer
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Linq;using System.Text;using System.Threading;namespace Consumer{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory            {                HostName = "localhost"            };            using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    channel.ExchangeDeclare                    (                        exchange: "Tua",                        type: ExchangeType.Fanout                    );                    string queueName = channel.QueueDeclare().QueueName;                    channel.QueueBind                    (                        queue: queueName,                        exchange: "Tua",                        routingKey: ""                    );                    var consumer = new EventingBasicConsumer(channel);                    consumer.Received += (sender, e) =>                    {                        var body = e.Body;                        var msg = Encoding.UTF8.GetString(body);                        int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count();                        Console.WriteLine("Consumer received message: {0}", msg);                        Thread.Sleep(marks * 1000);                        Console.WriteLine("OK");                    };                    channel.BasicConsume                    (                        queue: queueName,                        noAck: true,                        consumer: consumer                    );                    Console.ReadLine();                }            }        }    }}
Consumer

 

转载于:https://www.cnblogs.com/poepoe/p/7340374.html

你可能感兴趣的文章
用深度学习做命名实体识别(一):什么是命名实体识别?
查看>>
手把手教你用深度学习做物体检测(二):数据标注
查看>>
一个困惑已久的问题...
查看>>
手把手教你用深度学习做物体检测(三):模型训练
查看>>
手把手教你用深度学习做物体检测(四):模型使用
查看>>
用深度学习做命名实体识别(二):文本标注工具brat
查看>>
公司开始抓考勤了,就算你是程序员也必须每天准点到!
查看>>
手把手教你用深度学习做物体检测(六):YOLOv2介绍
查看>>
用深度学习做命名实体识别(四)——模型训练
查看>>
用深度学习做命名实体识别(五)-模型使用
查看>>
使用pycharm或idea提交项目到github
查看>>
想研究BERT模型?先看看这篇文章吧!
查看>>
还再@微信官方要国旗?这才是正确的打开方式~
查看>>
机器学习常用性能度量中的Accuracy、Precision、Recall、ROC、F score等都是些什么东西?...
查看>>
目标检测中常提到的IoU和mAP究竟是什么?
查看>>
eclipse运行mapreduce的wordcount
查看>>
linux命令帮助 man bash
查看>>
springmvc 参数解析绑定原理
查看>>
java成神之——集合框架之Array
查看>>
IIS7开启目录浏览功能
查看>>