rabbit.js:在Node.JS中使用RabbitMQ操作消息队列详解
rabbit.js 是一个专门为 RabbitMQ 所设计的 JavaScript API 函数库,可以让你在 Node.js 中很轻松的操作各种类型的信息队列(Message Queue)。

rabbit.js 以 amqplib 为基础,将原本复杂的设置又再简化,让一般性的用户更方便,如果是使用一般常见的模式(pattern),只要几行源程序就可以运作了。
安装
如果要在 Node.js 中使用 RabbitMQ,首先就是要将基本的 Node.js 与 RabbitMQ 服务先安装好,安装说明请参考:
在 Windows、Mac OS X 与 Linux 中安装 Node.js 网页应用程序开发环境
在 Ubuntu Linux 中安装与使用 RabbitMQ 信息伫列
接着再使用 Node.js 的 npm 套件管理程序安装 rabbot.js:
npm install rabbit.js
使用 rabbit.js
rabbit.js 在使用上比 RabbitMQ 官方所提供的教学范例还要简略,只要建立 RabbitMQ 的连线与对应类型的 socket 即可,以下是使用的方式教学。
建立连线
首先以 createContext() 指定 RabbitMQ 的 URL 地址,建立一个新的连线:
- var context = require('rabbit.js').createContext('amqp://localhost');
在连线建立之后,context 会送出 'ready' 这个事件(event)。如果连线发生问题时,则会送出 'error' 事件,并且附带一个 Error 对象,设计者可以过这些事件判断连线是否正常。
建立 Socket
建立连线之后,还要再建立指定类型的 socket,而要使用哪一种类型就要看自己的需求而定。这里以 Publish/Subscribe 这个模型为范例,在一个 JavaScript 程序中同时操作 publish 与 subscribe:
- var pub = context.socket('PUBLISH');
- var sub = context.socket('SUBSCRIBE');
这里建立两个 socket,一个作为 publish(pub),另一个作为 subscribe(sub)。
接下来,让两个 socket 连接到同一个 exchange:
- pub.connect('alerts');
- sub.connect('alerts');
socket 实际上就是 Stream,所以当它的缓冲区有数据可以读取时,你可以使用 read() 函数或是通过 'data' 事件来读取,而若要写入数据,则可使用 write() 函数。
如果传送的数据都是字符串,则可以使用 setEncoding() 来设置字符串的编码:
- sub.setEncoding('utf8');
- sub.on('data', function(note) { console.log("Alarum! %s", note); });
或是在写入数据时加上一个指定编码的参数:
- pub.write("Emergency. There's an emergency going on", 'utf8');
Stream 所提供的 pipe() 函数可以很方便的将输出导向至其他的串流:
- sub.pipe(process.stdout);
一个 socket 可以同时连接到多个 exchange,例如:
- var sub2 = context.socket('SUBSCRIBE');
- sub2.connect('system');
- sub2.connect('notifications');
这里的 sub2 会同时接收来自于 system 与 notifications 这两个 exchange 的所有信息。
如果一个 socket 同时连接多个 exchange 时,无法辨识收到的信息是来自于哪一个 exchange,如果需要区分信息来源,就要改用多个 socket。
如果要关闭 socket,可以调用 close() 函数,它会清理配置给该 socket 的所有资源,并在完成时送出 'close' 事件。若 socket 的类型是属于可以写入数据的,则也可以使用 end([chunk [, encoding]]) 函数来写入最后一笔数据,当数据写入后,就会自动关闭 socket,如果调用 end() 不加上任何参数,它的效果就跟 close() 函数相同。
Socket 类型
在使用 Context.socket() 建立 socket 时,第一个参数的作用是指定 socket 的类型,以下是所有支援的类型:
- PUBLISH / SUBSCRIBE(PUB / SUB)
PUSH / PULL(使用范例请参考 net)
REQUEST / REPLY(REQ / REP)(使用范例请参考 ordering)
PUSH / WORKER
Topic
PUB 与 SUB 这两个类型的 socket 可以依据指定的 topic 来传送与接收信息。
PUB socket 可以使用 setsockopt('topic', string) 来设置 socket 的 topic,或是在发送信息时,使用 publish(topic, message, [encoding]) 直接指定该信息的 topic。
SUB socket 则可以在调用 connect() 函数时,在第二个参数上指定 topic。
指定完 topic 之后,还要设置 'routing' 的类型,才能让他运作,可用的 'routing' 类型如下:
'fanout':这是默认的选项,不管 topic 是什么,将所有的信息配送至所有的 SUB socket。
'direct':只配送完全符合 topic 名称的信息。
'topic':依据 AMQP 的万用字符规则来比对 topic 名称,比对方法请参考 RabbitMQ 的 Topics。
Socket 设置
若要设置或更改 socket 的设置,可以通过 Socket.setsockopt() 函数,或是在调用 Context.socket() 时,将选项的设置值放在第二个参数上。
以下是一些可用的 socket 选项:
routing
适用于 PUB 与 SUB 这两个类型的 socket,跟 topic 配合之后,可以决定如何配送信息。routing 在 socket 建立的时候就要指定好,连接至相同位置的 sockets 其 routing 也必须吻合。
topic
用于设置 PUB socket 所发送信息的 topic。
expiration
专门用于可写入的 socket(如 PUB、PUSH、REQ、REP),设置信息的有效期间,单位为千分之一秒,例如:
pub.setsockopt('expiration', 60 * 1000)
这样由 pub 所发送的信息如果在 60 秒内没有被接收,那么服务器就会将此信息丢弃。信息有效期间的功能在 RabbitMQ 3.0.0 以后才有被支援。
prefetch
适用于 WORKER 与 REP socket,设置 RabbitMQ 在信息处理完成之前,只能配送多少笔信息给 socket。例如:
var worker = ctx.socket('WORKER', {prefetch: 1});
这样 RabbitMQ 就会一次只配送一个工作给 worker,等待工作处理完成并调用 ack() 之后,才会再继续配送下一个工作。 如果 prefetch 设置为 0(默认的选项),RabbitMQ 就会不设置任何限制,将所有的信息都配送给 socket。
persistent
设置信息在 RabbitMQ 服务器重新启动之后,是否还要保存,可用的值为 true 与 false。在 REQ / REP 的状况下,系统只会保存 requests,而在 PUB / SUB 的状况则没有作用。
本文来源 我爱IT技术网 http://www.52ij.com/jishu/12413.html 转载请保留链接。
- 评论列表(网友评论仅供网友表达个人看法,并不表明本站同意其观点或证实其描述)
-
