Spring Boot 实现 SSE 服务端推送事件
Spring Boot HTTP SSE About 6,816 wordsSSE
Sever Send Event
,是HTTP
协议中的一种,Content-Type
为text/event-stream
,能够保持长连接。
示例代码
Spring Boot
@CrossOrigin
@RestController
public class SSEController {
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
Executors.newScheduledThreadPool(10).scheduleWithFixedDelay(() -> {
System.out.println("sseEmitterMap#" + sseEmitterMap);
sseEmitterMap.forEach((s, sseEmitter) -> {
try {
sseEmitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.data(MyEvent.builder().code(222).msg(s + "#" +
LocalDateTime.now() + "#" + Thread.currentThread().getName() + "#"
+ Thread.currentThread().getState()).build())
.reconnectTime(3000L)
.comment("this is comment")
);
if (LocalDateTime.now().getSecond() % 2 == 0) {
sseEmitter.send(
SseEmitter.event()
.id("1")
.name("customEventName")
.data("customData")
);
}
} catch (IOException e) {
System.out.println("Error#" + e);
// sseEmitter.completeWithError(e);
}
});
}, 3, 3, TimeUnit.SECONDS);
}
@GetMapping("/test/sse")
public SseEmitter sseEmitter(@RequestParam("uid") String uid) throws IOException {
SseEmitter sseEmitter = new SseEmitter(-1L);
// SseEmitter sseEmitter = new SseEmitter(5L);
sseEmitter.send(SseEmitter.event().id("1").name("Connected").data(LocalDateTime.now()).reconnectTime(3000));
sseEmitterMap.put(uid, sseEmitter);
sseEmitter.onCompletion(() -> {
System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on completion");
sseEmitterMap.remove(uid);
});
sseEmitter.onTimeout(() -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on timeout#" + sseEmitter.getTimeout()));
sseEmitter.onError(throwable -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on error#" + throwable.toString()));
return sseEmitter;
}
}
JavaScript
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script>
window.onload = function () {
let connectBtn = document.getElementById("connectSSE");
let disconnectBtn = document.getElementById("disconnectSSE");
let userIdElement = document.getElementById("userId");
let userIdInfoElement = document.getElementById("userIdInfo");
let sse;
connectBtn.onclick = function () {
if (!userIdElement.value) {
userIdInfoElement.innerText = "userId is empty";
console.log("userId is empty")
return;
}
userIdInfoElement.innerText = userIdElement.value;
const eventSource = new EventSource('http://localhost:18080/test/sse?uid=' + userIdElement.value);
eventSource.onopen = (event) => {
console.log("onopen", event.readyState, event.target);
sse = event.target;
let element = document.getElementById("onOpenInfo");
element.innerText = JSON.stringify(event.target);
};
eventSource.onmessage = (event) => {
let element = document.getElementById("onMessageInfo");
element.innerText = event.data;
};
eventSource.onerror = (event) => {
console.log("onerror", event);
if (event.readyState === EventSource.CLOSED) {
console.log('connection is closed');
} else {
console.log("Error occured", event);
}
event.target.close();
let element = document.getElementById("onErrorInfo");
element.innerText = JSON.stringify(event);
};
eventSource.addEventListener("customEventName", (event) => {
console.log("Message id is " + event.lastEventId);
});
};
disconnectBtn.onclick = function () {
if (sse) {
sse.close();
}
};
};
</script>
</head>
<body>
<div>
<input id="userId" type="text">
<button id="connectSSE">Connect</button>
<button id="disconnectSSE">Disconnect</button>
</div>
<div>
userId: <span id="userIdInfo"></span>
</div>
<div>
onOpen: <span id="onOpenInfo"></span>
</div>
<div>
onMessage: <span id="onMessageInfo"></span>
</div>
<div>
onError: <span id="onErrorInfo"></span>
</div>
</body>
</html>
SseEmitter
SpringMVC
封装的SSE
实现,Controller
中直接返回SseEmitter
,不调用complete()
方法,即可保持长链接。
超时时间
SseEmitter()
:无参构造,默认超时时间依赖于Web
容器,容器为Tomcat
则超时时间为30
秒。
SseEmitter(Long timeout)
:有参构造,设置超时时间。传入-1L
表示没有超时时间。
无参构造可通过配置mvc
属性来设置超时时间,单位毫秒:
spring:
mvc:
async:
request-timeout: 15000
注意
客户端关闭了连接,不管是调用了event.target.close()
还是关闭了网页,服务端不会触发任何回调。直到服务端调用send
后才会触发onError
和onCompletion
回调。
服务端触发了onCompletion
回调后,连接就断开了。
重试
浏览器会保持连接一直打开。服务端可以通过调用complete
和completeWithError
方法关闭连接,这两个事件会触发客户端的error
回调。
当服务端关闭连接或网络错误时,如果客户端不调用event.target.close()
关闭连接的话,浏览器会发起重新连接。
浏览器默认会等待3
秒再尝试重新建立连接,并且浏览器会保持重试知道获得HTTP
请求返回的200
状态码。
服务端可以通过发送retry
标志位更改默认3
秒的等待时间。服务端可以设置标志位为0
,表示连接关闭则立即发起重试,没有等待时间。
限制
- 只能发送文本(可通过
base64
等方法简单加密)。 - 很多浏览器(包括
Chrome
)限制同一端口最多开启的SSE
连接数,最多为6
个,即每个端口最多可开启6
个连接。超出6
个连接后进入pending
状态。
PS C:\> netstat -an | findstr 18080 | findstr ESTABLISHED
TCP [::1]:18080 [::1]:52001 ESTABLISHED
TCP [::1]:18080 [::1]:52987 ESTABLISHED
TCP [::1]:18080 [::1]:52997 ESTABLISHED
TCP [::1]:18080 [::1]:53006 ESTABLISHED
TCP [::1]:18080 [::1]:53007 ESTABLISHED
TCP [::1]:18080 [::1]:53029 ESTABLISHED
Spring Flux SSE
https://www.baeldung.com/spring-server-sent-events
参考
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓