<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private MyWsHandler myWsHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry .addHandler(myWsHandler, "myWs") //允许跨域 .setAllowedOrigins("*"); } }
/** * ws消息处理类 */ @Component @Slf4j public class MyWsHandler extends AbstractWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("建立ws连接"); WsSessionManager.add(session.getId(),session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("发送文本消息"); // 获得客户端传来的消息 String payload = message.getPayload(); log.info("server 接收到消息 " + payload); session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { log.info("发送二进制消息"); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("异常处理"); WsSessionManager.removeAndClose(session.getId()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("关闭ws连接"); WsSessionManager.removeAndClose(session.getId()); } }
/** * 消息生成job */ @Slf4j @Component public class MessageJob { @Autowired WsService wsService; /** * 每5s发送 */ @Scheduled(cron = "0/5 * * * * *") public void run(){ try { wsService.broadcastMsg("自动生成消息 " + LocalDateTime.now().toString()); } catch (IOException e) { e.printStackTrace(); } } }
/** * ws操作相关服务 */ @Service @Slf4j public class WsService { /** * 发送消息 * @param session * @param text * @return * @throws IOException */ public void sendMsg(WebSocketSession session, String text) throws IOException { session.sendMessage(new TextMessage(text)); } /** * 广播消息 * @param text * @return * @throws IOException */ public void broadcastMsg(String text) throws IOException { for (WebSocketSession session: WsSessionManager.SESSION_POOL.values()) { session.sendMessage(new TextMessage(text)); } } }
@Slf4j public class WsSessionManager { /** * 保存连接 session 的地方 */ public static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>(); /** * 添加 session * * @param key */ public static void add(String key, WebSocketSession session) { // 添加 session SESSION_POOL.put(key, session); } /** * 删除 session,会返回删除的 session * * @param key * @return */ public static WebSocketSession remove(String key) { // 删除 session return SESSION_POOL.remove(key); } /** * 删除并同步关闭连接 * * @param key */ public static void removeAndClose(String key) { WebSocketSession session = remove(key); if (session != null) { try { // 关闭连接 session.close(); } catch (IOException e) { // todo: 关闭出现异常处理 e.printStackTrace(); } } } /** * 获得 session * * @param key * @return */ public static WebSocketSession get(String key) { // 获得 session return SESSION_POOL.get(key); } }
@SpringBootApplication @EnableScheduling public class SpringWsApplication { public static void main(String[] args) { SpringApplication.run(SpringWsApplication.class, args); } }
<!DOCTYPE HTML> <html> <head> <title>My WebSocket</title> </head> <body> <input id="text" type="text" /> <button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> <div id="message"></div> </body> <script type="text/javascript"> let ws = null; //判断当前浏览器是否支持WebSocket if ('WebSocket' in window) { ws = new WebSocket("ws://localhost:8333/myWs"); } else { alert('当前浏览器 Not support websocket') } //连接发生错误的回调方法 ws.onerror = function () { setMessageInnerHTML("WebSocket连接发生错误"); }; //连接成功建立的回调方法 ws.onopen = function(event) { console.log("ws调用连接成功回调方法") //ws.send("") } //接收到消息的回调方法 ws.onmessage = function(message) { console.log("接收消息:" + message.data); if (typeof(message.data) == 'string') { setMessageInnerHTML(message.data); } } //ws连接断开的回调方法 ws.onclose = function(e) { console.log("ws连接断开") //console.log(e) setMessageInnerHTML("ws close"); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { console.log(innerHTML) document.getElementById('message').innerHTML += '接收的消息:' + innerHTML + '<br/>'; } //关闭连接 function closeWebSocket() { ws.close(); } //发送消息 function send(msg) { if(!msg){ msg = document.getElementById('text').value; document.getElementById('message').innerHTML += "发送的消息:" + msg + '<br/>'; ws.send(msg); } } </script> </html>