Event Bus 用来完成父子进程之间的消息传递,包括同步的调用,异步的发送,以及广播的能力,支持进程、线程、同一进程内调用三种模式。
基础使用,在不同的进程间引入。
如下,创建 Thread 模式的 EventBus。
/**
 * in main
 */
import { ThreadEventBus } from '@midwayjs/event-bus';
// 创建一个 bus
const bus = new ThreadEventBus({
  isWorker: false,
});
// 添加一个 worker
const worker = new Worker();
bus.addWorker(worker);
// 等待连接和启动
await bus.start();
// 发布消息
bus.publish('hello world');
/**
 * in worker
 */
import { ThreadEventBus } from '@midwayjs/event-bus';
// 创建 bus 客户端
const bus = new ThreadEventBus({
  isWorker: true,
});
// 启动客户端,自动向 master 发送消息
await bus.start();
// 订阅消息
bus.subscribe(message => {
  console.log(message.body);
  // => 'hello world'
});可以创建 Cluster 模式的 EventBus,接口相同。
import { ChildProcessEventBus } from '@midwayjs/event-bus';
// in main
const bus = new ChildProcessEventBus({
  isWorker: false,
});
await bus.start();
// in worker
const bus = new ChildProcessEventBus({
  isWorker: true,
});
// 启动客户端,自动向 master 发送消息
await bus.start();为了方便同进程研发,也提供了相同接口的 LocalEventBus。
import { LocalEventBus } from '@midwayjs/event-bus';
// in main
const bus = new LocalEventBus({
  isWorker: false,
});
await bus.start();
// in worker
const bus = new LocalEventBus({
  isWorker: true,
});
await bus.start();注意,本地模式下,需要在不同文件中,且 worker 只能有一个。
start
bus 启动后,需要调用此方法,才能正常使用。
main 调用后会等待 worker 启动,worker 调用后会向 main 发送消息。
// in main & worker
await bus.start();如果 worker 启动错误,可以传递异常给 main。
// worker
await bus.start(new Error('worker error'));
// main
bus.onError(error => {
  console.log(error.message);
  // => 'worker error'
});Publish & Subscribe
异步的发送和监听消息,支持 main 和 worker 双向发送, API 一致。
bus.subscribe(message => {
  // message.body === {data: 'abc'}
});
bus.publish({
  data: 'abc'
});指定某个 topic 发送消息。
bus.subscribe(message => {
  // message.body === {data: 'abc'}
}, {
  topic: 'test'
});
bus.publish({
  data: 'abc'
}, {
  topic: 'test'
});publish 错误。
const err = new Error('test');
bus.publish(err);publishAsync
同步的发送消息,会等待订阅方返回,包含超时参数,默认 5s。支持 main <-> worker 双向异步通信。
从 main 发送到 worker:
// in worker
bus.subscribe((message, responder) => {
  // message.body === {data: 'abc'}
  responder?.send({
    data: 'hello world',
  });
  // send error
  responder.error(new Error('test'));
});
// in main
const result = await bus.publishAsync({
  data: 'abc'
}, {
  timeout: 5000,
});
// result => {data: 'hello world'}从 worker 发送到 main:
// in main
bus.subscribe((message, responder) => {
  // message.body === {data: 'abc'}
  responder?.send({
    data: 'hello world',
  });
});
// in worker
const result = await bus.publishAsync({
  data: 'abc'
}, {
  timeout: 5000,
});
// result => {data: 'hello world'}使用 try/catch 捕获错误:
try {
  await bus.publishAsync({
    data: 'abc'
  });
} catch (err) {
  // err.message => 'test'
}publishChunk
发送消息,会等待订阅方返回多次,包含超时参数,默认 5s。
此 API 仅限于 main 向 worker 发送消息。
publishChunk 方法返回的是一个异步迭代器。
// subscribe
bus.subscribe((message, responder) => {
  responder.send('hello');
  responder.send(' world');
  responder.end();
});
// invoke
const iterator = bus.publishChunk({
  data: 'abc'
}, {
  timeout: 5000,
});
let result = '';
for await (const data of iterator) {
  result += data;
}使用 try/catch 捕获错误。
const iterator = bus.publishChunk({
  data: 'abc'
});
try {
  let result = '';
  for await (const data of iterator) {
    result += data;
  }
} catch (err) {
  // err.message => 'test'
}broadcast
广播消息(一对多)。
// worker
bus.subscribe(message => {
  // message.body === {data: 'abc'}
});
// main
bus.broadcast({
  data: 'abc'
});也支持 worker 向其他 main 或者 worker 广播。
// worker
bus.broadcast('hello world');默认情况下,worker 广播之后,main 和 当前 worker 都不会收到这条消息
可以通过设置属性,让当前 Worker 也同时接收这条消息。
// worker
bus.broadcast('hello world', {
  includeSelfFromWorker: true,
});可以通过设置属性,让 main 也同时接收这条消息。
// worker
bus.broadcast('hello world', {
  includeMainFromWorker: true,
});包含以下属性
messageId用来确定消息的唯一 idworkerId消息发送的 worker idtype消息发送的类型,包括 inited(初始化),request(main 向 worker 发送),response(worker 向 main 发送),invoke(同步的 main 向 worker 发送),broadcast(广播)几种类型error意外的报错信息messageOptions在发送消息时额外的参数,比如超时时间等,具体看类型定义
使用环境变量 NODE_DEBUG=midway:event-bus* 开启调试输出。
所有的消息必须可序列化,不能传递复杂对象,类,方法等。
MIT