feat: sse 示例

This commit is contained in:
dunwu 2024-04-24 07:33:33 +08:00
parent 6b003ceac2
commit 060d1972ff
9 changed files with 282 additions and 0 deletions

View File

@ -26,6 +26,7 @@
<module>https</module>
<module>connections</module>
<module>websocket</module>
<module>sse</module>
<module>fastjson</module>
<module>view</module>
<module>client</module>

42
codes/web/sse/pom.xml Normal file
View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
</parent>
<groupId>io.github.dunwu.spring</groupId>
<artifactId>spring-web-sse</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Spring::Web::SSE</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,39 @@
package example.spring.web.sse;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-04-16
*/
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SseController {
public static final String PREFIX = "user:";
public static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
@GetMapping(value = "/connect/{userId}", produces = "text/event-stream;charset=UTF-8")
public SseEmitter connect(@PathVariable String userId) {
return SseUtil.connect(PREFIX + userId);
}
@GetMapping("/close/{userId}")
public boolean close(@PathVariable String userId) {
return SseUtil.close(PREFIX + userId);
}
@GetMapping("/send/{userId}")
public boolean send(@PathVariable String userId, @RequestParam("msg") String msg) {
SseUtil.send(PREFIX + userId, "收到消息:" + msg);
return true;
}
}

View File

@ -0,0 +1,91 @@
package example.spring.web.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-04-16
*/
@Slf4j
public class SseUtil {
public static final long SSE_TIMEOUT = 30000L;
private static final AtomicInteger COUNT = new AtomicInteger(0);
private static final Map<String, SseEmitter> SSE_MAP = new ConcurrentHashMap<>();
public static synchronized SseEmitter connect(String key) {
if (SSE_MAP.containsKey(key)) {
return SSE_MAP.get(key);
}
try {
SseEmitter sseEmitter = new SseEmitter(SSE_TIMEOUT);
sseEmitter.onCompletion(handleCompletion(key));
sseEmitter.onError(handleError(key));
sseEmitter.onTimeout(handleTimeout(key));
SSE_MAP.put(key, sseEmitter);
COUNT.getAndIncrement();
log.info("【SSE】创建连接成功key: {}, 当前连接数:{}", key, COUNT.get());
return sseEmitter;
} catch (Exception e) {
log.error("【SSE】创建连接异常key: {}", key, e);
return null;
}
}
public static synchronized boolean close(String key) {
SseEmitter sseEmitter = SSE_MAP.get(key);
if (sseEmitter == null) {
return false;
}
sseEmitter.complete();
SSE_MAP.remove(key);
COUNT.getAndDecrement();
log.info("【SSE】key: {} 断开连接!当前连接数:{}", key, COUNT.get());
return true;
}
private static Runnable handleCompletion(String key) {
return () -> {
log.info("【SSE】连接结束key: {}", key);
close(key);
};
}
private static Consumer<Throwable> handleError(String key) {
return t -> {
log.warn("【SSE】连接异常key: {}", key, t);
close(key);
};
}
private static Runnable handleTimeout(String key) {
return () -> {
log.info("【SSE】连接超时key: {}", key);
close(key);
};
}
public static void send(String key, Object message) {
if (SSE_MAP.containsKey(key)) {
try {
SseEmitter sseEmitter = SSE_MAP.get(key);
sseEmitter.send(message);
} catch (Exception e) {
log.error("【SSE】发送消息异常key: {}, message: {}", key, message, e);
close(key);
}
} else {
log.warn("【SSE】发送消息失败key: {}, message: {}", key, message);
}
}
}

View File

@ -0,0 +1,15 @@
package example.spring.web.sse;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.CrossOrigin;
@CrossOrigin
@SpringBootApplication
public class WebSseApplication {
public static void main(String[] args) {
SpringApplication.run(WebSseApplication.class, args);
}
}

View File

@ -0,0 +1 @@
spring.mvc.async.request-timeout = 30000

View File

@ -0,0 +1,12 @@
${AnsiColor.BRIGHT_YELLOW}${AnsiStyle.BOLD}
________ ___ ___ ________ ___ __ ___ ___
|\ ___ \|\ \|\ \|\ ___ \|\ \ |\ \|\ \|\ \
\ \ \_|\ \ \ \\\ \ \ \\ \ \ \ \ \ \ \ \ \\\ \
\ \ \ \\ \ \ \\\ \ \ \\ \ \ \ \ __\ \ \ \ \\\ \
\ \ \_\\ \ \ \\\ \ \ \\ \ \ \ \|\__\_\ \ \ \\\ \
\ \_______\ \_______\ \__\\ \__\ \____________\ \_______\
\|_______|\|_______|\|__| \|__|\|____________|\|_______|
${AnsiColor.CYAN}${AnsiStyle.BOLD}
:: Java :: (v${java.version})
:: Spring Boot :: (v${spring-boot.version})
${AnsiStyle.NORMAL}

View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8" ?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%boldYellow(%thread)] [%highlight(%-5level)] %boldGreen(%c{36}.%M) - %boldBlue(%m%n)
</pattern>
</encoder>
</appender>
<logger name="example.spring" level="INFO" />
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>

View File

@ -0,0 +1,66 @@
<!DOCTYPE html>
<html lang='en'>
<head>
<title>SSE 示例</title>
<meta charset='UTF-8'>
</head>
<body>
<h1>SSE 示例</h1>
<div>
userId: <input type='text' id='userId' value=''>
<button id='connectBtn' onclick='connect()'>connect</button>
<br />
msg: <input type='text' id='msg'> <br />
<button id='sendBtn' onclick='send()'>send</button>
<button id='closeBtn' onclick='disconnect()'>close</button>
</div>
<div id='result'></div>
</body>
<script>
let eventSource
const connect = () => {
let userId = document.getElementById('userId').value
eventSource = new EventSource(`/sse/connect/${userId}`)
eventSource.onmessage = function(event) {
console.log('msg', event.data)
document.getElementById('result').innerHTML += '<span>' + event.data + '</span><br />'
}
eventSource.onopen = function(event) {
console.log('onopen', eventSource.readyState)
document.getElementById('result').innerHTML = ''
}
eventSource.onerror = function(error) {
console.error('onerror', error)
}
}
const send = async () => {
let userId = document.getElementById('userId').value
let msg = document.getElementById('msg').value
const response = await fetch(`/sse/send/${userId}?msg=${msg}`)
response.text().then((data) => {
if (data !== 'true') {
console.error('发送失败')
}
})
}
const disconnect = () => {
eventSource.close()
console.log('连接关闭')
}
</script>
</html>