主要涉及到的内容是
RxJS
、StompJS
、SockJS
。推荐两个部分的参考文章,可以更加深刻理解原理。
Rxjs
简介
在Rxjs
官网的定义是:RxJS
是一个库,它通过使用 observable
序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer
、 Schedulers
、 Subjects
) 和受 [Array#extras
] 启发的操作符 (map
、filter
、reduce
、every
, 等等),这些数组操作符可以把异步事件作为集合来处理。
可以把
RxJS
当做是用来处理事件的 Lodash 。
其主要可以解决:
- 如何控制大量代码的复杂度。
- 如何保持代码的可读性。
- 如何处理异步操作。
官方概念
-
Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
-
Observer (观察者): 一个回调函数的集合,它知道如何去监听由
Observable
提供的值。 -
Subscription (订阅): 表示
Observable
的执行,主要用于取消Observable
的执行。 -
Operators (操作符): 采用函数式编程风格的纯函数 (
pure function
),使用像map
、filter
、concat
、flatMap
等这样的操作符来处理集合。 -
Subject (主体): 相当于
EventEmitter
,并且是将值或事件多路推送给多个Observer
的唯一方式。 -
Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout
或requestAnimationFrame
或其他。
Observable 和 Observer
-
Observable
即“可以被观察的东西”,可以理解为数据源。 -
Observer
即“观察者”,可以理解为对数据元进行操作的角色。 -
两者通过
Observable
的subscribe
函数进行连接。
简单的 Observable
import { Observable } from "rxjs";
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
});
const observer = {
next: (x) => console.log("got value " + x),
};
observable.subscribe(observer);
import { Observable } from "rxjs";
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
});
observable.subscribe((x) =>
console.log("got value " + x)
);
这是一个经典的观察者模式的实现,observable
是通过Rxjs
的Observable
实例化而来,在其回调参数里面连续调用三次next
输出3
个值。
下面observer
是观察者对象,里面定义了next
方法,next
只是log
输入值。
很明显这里的next
函数是observable
回调中连续调用的next
。并且通过observable
的subscribe
方法将observable
与observer
。
代码依次输出
got value 1
got value 2
got value 3
Subject
Subject
是多播的,普通的Observables
是单播的(已订阅的观察者都拥有 Observable
的独立执行)
每个 Subject 都是 Observable(生产者)
对于 Subject
,你可以提供一个观察者并使用 subscribe
方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable
执行是来自普通的 Observable
还是 Subject
。在 Subject
的内部,subscribe
不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener
的工作方式。
每个 Subject 都是观察者(消费者)
Subject
是一个有如下方法的对象: next(v)
、error(e)
和 complete()
。要给 Subject
提供新值,只要调用 next(theValue)
,它会将值多播给已注册监听该 Subject
的观察者们。
import { Subject } from "rxjs";
// 实例化Subject。
var subject = new Subject();
// 添加了两个观察者。
subject.subscribe({
next: (v) => console.log("subjectA: " + v),
});
subject.subscribe({
next: (v) => console.log("subjectB: " + v),
});
// 给观察者下发一些值。
subject.next(Math.random());
subject.next(Math.random());
输出结果:
subjectA: 0.09127189658320534
subjectB: 0.09127189658320534
subjectA: 0.11287039832667811
subjectB: 0.11287039832667811
很明显多播的是同一个值。
BehaviorSubject
BehaviorSubject
接收不到以前的消息,只能接受当前最新的订阅消息。
官方定义:
Subject
的其中一个变体就是BehaviorSubject
,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从BehaviorSubject
那接收到“当前值”。BehaviorSubjects
适合用来表示“随时间推移的值”。举例来说,生日的流是一个Subject
,但年龄的流应该是一个BehaviorSubject
。
import { BehaviorSubject } from "rxjs";
let obs = new BehaviorSubject(0);
obs.subscribe((res) => {
console.log("subscription a :", res);
});
obs.next(Math.random());
obs.next(Math.random());
obs.subscribe((res) => {
console.log("subscription b :", res);
});
obs.next(Math.random());
输出结果:
subscription a : 0
subscription a : 0.22790040632502007
subscription a : 0.6354154698314571
subscription b : 0.6354154698314571
subscription a : 0.7149988636217499
subscription b : 0.7149988636217499
BehaviorSubject
使用值0
进行初始化,当第一个观察者订阅时会得到0
。第二个观察者订阅时会得到值就会使最新的订阅值。
ReplaySubject
replay
译为“回访”,顾名思义可以拿到以前的值,在实例化时,可以通过传入参数来规定可以记录前多少个值。
官方定义:
ReplaySubject
类似于BehaviorSubject
,它可以发送旧值给新的订阅者,但它还可以记录Observable
执行的一部分。
import { ReplaySubject } from "rxjs";
var subject = new ReplaySubject(3); // 为新的订阅者缓冲3个值
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(5);
输出结果:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
实例
这里以弹窗为例子。
import { BehaviorSubject } from "rxjs";
// 弹窗中心-总线
export const governPopEvents =
new BehaviorSubject();
在popEvent.vue
中写出弹窗的布局,slot
插槽等一些内容,以及订阅governPopEvents
。
addPopEvent() {
let tempPopEventsListener = governPopEvents.subscribe(data => {
// 判断打开或者关闭弹窗。
// 分别处理一些事件。
});
// 添加到监听池里去。
// 为了销毁的时候全部unsubscribe。
this.popEventsArr.push(tempPopEventsListener);
// 或是其他的业务操作。
}
在popEvent.js
中主要有两个函数,派发打开事件和派发关闭事件。
dispatchPopOpen(popData) {
// 处理需要传入弹窗的数据
// 派发事件
governPopEvents.next({
type: 'OPEN',
popData: popData
});
},
dispatchPopClose(popData) {
// 处理一些附加数据
// 派发事件
governPopEvents.next({
type: 'CLOSE',
popData: popData,
});
}
当然,最好也有处理弹窗关闭的相关事件,每次订阅就把其放进一个数组,当需要销毁时,就依次执行关闭窗口的回调函数,再取消订阅。
在需要调用弹窗的时候,只需引入popEvent.js
,定义一些数据之后调用 this.dispatchPopOpen(data)
传入相关数据即可启动调用弹窗。并且在created
阶段的时候,可以调用弹窗关闭相关事件addPopCloseEvents(() => {})
,并且设置回调函数,完成关闭弹窗的一些回调。
参考文章
WebSocket
对于 WebSocket
应该多多少少都知道些东西,在计算机网络当中详细介绍了 Socket
相关的内容,并且做了相关的实验,最经典的就是聊天室的实验,而 WebSocket
的出现,使得浏览器具备了实时双向通信的能力,对于 WebSocket
最主要的便是如何建立连接、交换数据、以及数据帧的相关格式,当然还有最重要的协议的安全性,它使用 TCP
作为传输层协议和使用 HTTP Upgrade
机制来握手。其优点不言而喻,与 HTTP/HTTPS
协议比起来,双向通信使得其更加的高效、灵活。
建立 WebSocket
在建立连接开始之前,浏览器会先发起 HTTP
请求,通过请求头(header
)携带参数过去,请求建立 WebSocket
连接。
GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
Origin
—— 客户端页面的源,WebSocket
对象是原生支持跨源的。没有特殊的header
或其他限制。旧的服务器无法处理WebSocket
,因此不存在兼容性问题。但Origin header
很重要,因为它允许服务器决定是否使用WebSocket
与该网站通信。Connection: Upgrade
—— 表示客户端想要更改协议。Upgrade: websocket
—— 请求的协议是“websocket
”。Sec-WebSocket-Key
—— 浏览器随机生成的安全密钥。Sec-WebSocket-Version
——WebSocket
协议版本,当前为13
。如果服务器同意切换为WebSocket
协议,服务器返回响应码101
:
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept:Oy4NRAQ13jhfONC7bP8dTKb4PTU=
这里 Sec-WebSocket-Accept
是 Sec-WebSocket-Key
,是使用特殊的算法重新编码的。浏览器使用它来确保响应与请求相对应。
StompJS
STOMP
Simple Text Oriented Message Protocol
——面向消息的简单文本协议。
介绍
This library allows you to connect to a STOMP broker over WebSocket. This library supports full STOMP specifications and all current protocol variants. Most popular messaging brokers support STOMP and STOMP over WebSockets either natively or using plugins. In general JavaScript engines in browsers are not friendly to binary protocols, so using STOMP is a better option because it is a text oriented protocol.
Regular Web Socket
通过官方的文档可以了解到,STOMP JavaScript
客户端可以通过使用 ws://
的 URL
与 STOMP
服务端进行连接。
var url = "ws://localhost:61614/stomp";
var client = Stomp.client(url);
Stomp.client(url, protocols)
也可以用来覆盖库中(['v10.stomp', 'v11.stomp]'
)提供的默认子协议(subprotocols
),第二个参数可以是一个字符串或一个字符串数组去指定多个 subprotocols
。
Custom Web Socket
浏览器提供了不同版本的 WebSocket
的协议,一些老的浏览器不支持 WebSocket
或者使用别的名字。默认下,stomp.js
使用浏览器原生的 WebSocket class
去创建 WebSocket
。
However it is possible to use other type of WebSockets by using the Stomp.over(ws) method. This method expects an object that conforms to the WebSocket definition.
从这里可以看出,虽然在许多老的浏览器上不支持 WebSocket
但是可以通过 Stomp.over(ws)
方法使用其他类型的 WebSocket
,例如,可以使用由 SockJS
实现的WebSocket
。
如果使用原生的 Websocket
就使用 Stomp.client(url)
,如果需要使用其他类型的 Websocket
(例如由 SockJS
包装的 Websocket
)就使用 Stomp.over(ws)
。除了初始化有差别,Stomp API
在这两种方式下是相同的。
其实还有在
Node
环境当中,如何建立连接,这写的话,可以参考下下面的文章:STOMP Over WebSocket。
连接服务器
一旦 Stomp
客户端建立了,必须调用它的 connect()
方法去连接 Stomp
服务端进行验证。这个方法需要两个参数,用户的登录和密码凭证。这种情况下,客户端会使用 Websocket
打开连接,并发送一个 CONNECT frame
。
这个连接是异步进行的,你无法保证这个方法时是有效连接的,因此为了知道连接的结果,还需要一个回调函数。
var connect_callback = function () {
// called back after the client is connected and authenticated to the STOMP server
};
当连接发生错误的时候,也可以添加一个处理错误的回调,当然大多数时候,connect()
方法可接受不同数量的参数来提供简单的 API:
client.connect(login, passcode, connectCallback);
client.connect(
login,
passcode,
connectCallback,
errorCallback
);
client.connect(
login,
passcode,
connectCallback,
errorCallback,
host
);
上面的参数从名称上来看,其实很简单,login
和 passcode
是 strings
,connectCallback
和 errorCallback
则是 functions
,有些代理(brokers
)还需要传递一个 host 参数。
除此之外如果需要附加一个 headers
,connect
方法还可以接收另外两种参数方式。
client.connect(headers, connectCallback);
client.connect(
headers,
connectCallback,
errorCallback
);
如果采用 header
进行连接,这时候需要自行将 login
、password
(有些时候还有 host
)添加到 headers
当中。
var headers = {
login: "mylogin",
passcode: "mypasscode",
// additional header
"client-id": "my-client-id",
};
client.connect(headers, connectCallback);
断开连接时,调用 disconnect
方法,这个方法也是异步的,当断开成功后会接收一个额外的回调函数的参数。
Heart-beating 和发送消息
如果 STOMP broker
(代理)接收 STOMP 1.1
版本的帧,heart-beating
是默认启用的。
heart-beating
(心跳)也就是频率,incoming
是接收频率,outgoing
是发送频率。通过改变这两个配置项,更改到客户端的心跳。
连接之后,发送消息则很简单,只需要通过调用 send
方法设置两个可选的参数:headers
、body
。
client.send(
"/queue/test",
{ priority: 9 },
"Hello, STOMP"
);
// client会发送一个STOMP发送帧给/queue/test,这个帧包含一个设置了priority为9的header和内容为“Hello, STOMP”的body。
订阅和接收消息
这一部分应该算是 STOMP
的主要作用,为了在浏览器中接手消息,STOMP
客户端必须先订阅一个目标频道。可以使用 subscribe()
方法去订阅,这个方法有两个必须存在的参数,destination
,callback
,以及一个可选的参数 headers
。相应的对应是目标频道,回调函数。每次服务端向客户端发送消息时,客户端都会轮流调用回调函数,参数为对应消息的 STOMP
帧对象(Frame object
)。
其他
STOMP
消息的 body
必须为字符串。如果你需要发送/接收 JSON
对象,你可以使用 JSON.stringify()
和 JSON.parse()
去转换 JSON 对象。当然,还有事务、调试等一些其他的使用场景,这些都可以参考 STOMP
的官方文档进行设置。
实例
在 StompJS
的官方文档中给出了详细的实例,因此这里简单描述一下项目中所采用的 StompJS
、SockJS
以及 RxJS
是如何配合工作的。
- 通过
RxJS
创建socketEvent
。 - 使用
socket.vue
时,模块会进行订阅socketEvent
,当收到不同的广播信号时,执行不同的操作,这里以初始化为例。 - 首先是
SockJS
创建出Custom WebSocket
(并非原生的WebSocket
,前面文章提到过)。 - 再使用
Stomp.over
创建出Stomp
客户端对象stompClient
。 - 通过
stompClient.connet(_headers, cb, errorCb)
连接服务端。 - 再第五步的回调函数中,
stompClient.subscribe(url, cb)
订阅频道(可能是RabbitMQ
,可能是其他的消息队列,取决于后端使用什么)。 - 此时就完成了所有流程,当监听到后端往消息队列中添加数据时,调用第六步的回调函数去拿取数据。此时通过
socketEvent.next
将数据多播到订阅模块。 - 后面就是一些处理数据,以及进行一些定义的补偿机制,不做过多的简述。