zvvq技术分享网

对于NodeJS如何操作消息队列RabbitMQ的分析(node 事

作者:zvvq博客网
导读这篇文章主要介绍了关于对 nodejs 如何操作消息队列rabbitmq的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下 一. 什么是消息队列? 消息(Message)是指在应用

这篇文章主要介绍了关于对nodejs如何操作消息队列rabbitmq的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

zvvq.cn

一. 什么是消息队列?

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

内容来自samhan

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。 zvvq.cn

二. 常用的消息队列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。 内容来自samhan666

甚至现在部分NoSQL也可做消息队列,如Redis。 内容来自samhan

三. 消息队列的使用场景?

异步处理 应用解耦 流量削峰

四. 使用案例

上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

内容来自zvvq,别采集哟

zvvq好,好zvvq

图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

本文来自zvvq

五. 如何安装RabbitMQ?

上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。 内容来自samhan

首先安装brew(mac为例)

内容来自zvvq,别采集哟

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

zvvq好,好zvvq

 

安装RabbitMQ zvvq.cn

brew install rabbitmq 内容来自samhan666

 

运行RabbitMQ 内容来自zvvq,别采集哟

进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行 内容来自zvvq

sbin/rabbitmq-server

内容来自samhan666

 

启动插件

zvvq

进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin zvvq

./rabbitmq-plugins enable rabbitmq_management zvvq.cn

 

登陆管理界面 内容来自zvvq,别采集哟

打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ zvvq.cn

内容来自zvvq,别采集哟

网上可以找到好几个相应的Node SDK,这里推荐amqplib zvvq

1.生产者

本文来自zvvq

/** zvvq

* 对RabbitMQ的封装 内容来自zvvq

*/ zvvq.cn

let amqp = require('amqplib');

内容来自zvvq

class RabbitMQ {

zvvq好,好zvvq

constructor() {

内容来自zvvq,别采集哟

this.hosts = [];

本文来自zvvq

this.index = 0;

内容来自zvvq,别采集哟

this.length = this.hosts.length; 内容来自samhan

this.open = amqp.connect(this.hosts[this.index]);

本文来自zvvq

}

内容来自samhan666

sendQueueMsg(queueName, msg, errCallBack) {

内容来自zvvq,别采集哟

let self = this;

copyright zvvq

self.open

内容来自samhan666

.then(function (conn) {

zvvq.cn

return conn.createChannel();

本文来自zvvq

})

zvvq好,好zvvq

.then(function (channel) { 内容来自zvvq

return channel.assertQueue(queueName).then(function (ok) { 内容来自samhan666

return channel.sendToQueue(queueName, new Buffer(msg), {

zvvq.cn

persistent: true copyright zvvq

});

内容来自zvvq,别采集哟

})

本文来自zvvq

.then(function (data) { 内容来自samhan

if (data) { 本文来自zvvq

errCallBack && errCallBack("success"); 内容来自samhan

channel.close();

zvvq.cn

}

zvvq好,好zvvq

})

本文来自zvvq

.catch(function () {

内容来自samhan666

setTimeout(() => { zvvq

if (channel) {

内容来自zvvq

channel.close();

本文来自zvvq

} 内容来自samhan

}, 500) 内容来自zvvq

});

内容来自zvvq

})

内容来自samhan

.catch(function () { 内容来自samhan

let num = self.index++; 内容来自samhan666

if (num <= self.length - 1) { 内容来自zvvq,别采集哟

self.open = amqp.connect(self.hosts[num]);

zvvq.cn

} else { 内容来自samhan666

self.index == 0; zvvq

} zvvq.cn

});

zvvq

}

内容来自samhan

} copyright zvvq

 

2. 消费者

本文来自zvvq

/**

内容来自samhan

* 对RabbitMQ的封装 zvvq好,好zvvq

*/

copyright zvvq

let amqp = require(&#39;amqplib&#39;);

zvvq好,好zvvq

class RabbitMQ { 内容来自samhan666

constructor() {

zvvq好,好zvvq

this.open = amqp.connect(this.hosts[this.index]); 内容来自samhan

}

zvvq.cn

receiveQueueMsg(queueName, receiveCallBack, errCallBack) {

内容来自zvvq

let self = this;

zvvq好,好zvvq

self.open 内容来自samhan

.then(function (conn) {

本文来自zvvq

return conn.createChannel();

zvvq.cn

}) 本文来自zvvq

.then(function (channel) { zvvq.cn

return channel.assertQueue(queueName)

内容来自zvvq

.then(function (ok) {

zvvq好,好zvvq

return channel.consume(queueName, function (msg) {

内容来自samhan

if (msg !== null) {

内容来自zvvq

let data = msg.content.toString();

copyright zvvq

channel.ack(msg);

内容来自zvvq,别采集哟

receiveCallBack && receiveCallBack(data);

zvvq

}

内容来自samhan666

}) 内容来自samhan

.finally(function () { 内容来自samhan666

setTimeout(() => {

内容来自zvvq

if (channel) {

zvvq

channel.close();

zvvq

}

本文来自zvvq

}, 500) zvvq.cn

}); 内容来自samhan

})

zvvq

}) 内容来自zvvq

.catch(function () { copyright zvvq

let num = self.index++;

zvvq好,好zvvq

if (num <= self.length - 1) {

本文来自zvvq

self.open = amqp.connect(self.hosts[num]); 内容来自zvvq,别采集哟

} else {

zvvq.cn

self.index = 0; 内容来自samhan

self.open = amqp.connect(self.hosts[0]); 内容来自zvvq,别采集哟

}

zvvq.cn

});

内容来自zvvq

}

本文来自zvvq

 

3. 通过生产者向MQ发送一个消息,并创建队列 内容来自zvvq,别采集哟

1

内容来自zvvq

2

内容来自samhan666

3

内容来自zvvq

4 copyright zvvq

let mq = new RabbitMQ(); 内容来自samhan666

mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => { 内容来自zvvq,别采集哟

console.log(error)

内容来自zvvq

})

内容来自samhan666

 

执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息: 内容来自zvvq

zvvq

并且RabbbitMQ新增了一个队列testQueue 本文来自zvvq

zvvq.cn

4. 获取指定队列的消息

zvvq

1 本文来自zvvq

2 本文来自zvvq

3 内容来自zvvq,别采集哟

4

内容来自samhan666

let mq = new RabbitMQ();

内容来自samhan

mq.receiveQueueMsg(&#39;testQueue&#39;,(msg) => {    内容来自zvvq

console.log(msg)

内容来自zvvq

})// 输出结果:my first message复制代码

本文来自zvvq

 

此时打开RabbitMQ管理平台,消息数量已经变为0

copyright zvvq

本文来自zvvq

综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息。

zvvq

以上就是本文的全部内容,希望对大家的学习有所帮助,更多相关内容请关注ZVVQ博客分享网!

zvvq.cn

以上就是对于NodeJS如何操作消息队列RabbitMQ的分析的详细内容,更多请关注其它相关文章!

内容来自samhan