增加 websocket相关代码

master
bynt 3 years ago
parent 36811b937d
commit c0e303dbf0

@ -24,7 +24,7 @@ public class SellerUserHandlerMethodArgumentResolver implements HandlerMethodArg
}
@Override
public Object resolveArgument(MethodParameter parameter, ModelAndViewContainer container,
public Object resolveArgument(MethodParameter methodParameter, ModelAndViewContainer container,
NativeWebRequest request, WebDataBinderFactory factory) {
// session解析账号
SellerAccount sellerAccount = (SellerAccount) request.getAttribute(AuthConstants.SELLER_ACCOUNT, DefaultConstants.ZERO_NUMBER);

@ -1,5 +1,6 @@
package com.hchbox.hander;
import com.hchbox.hander.message.TaoBaoRefundMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
@ -7,6 +8,7 @@ import org.springframework.stereotype.Service;
*
* @author t102 2015-8-26 3:30:06
*/
@RequiredArgsConstructor
@Service("rateRefundSuccessHandler")
public class RateRefundSuccessHandler implements TaoBaoHandler<TaoBaoRefundMessage> {

@ -24,6 +24,12 @@
<artifactId>trade-service-wwsend-task</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>

@ -43,6 +43,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
</dependency>
</dependencies>

@ -0,0 +1,67 @@
package com.hchbox.config;
import com.hchbox.exception.TaoMiCommException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/**
* @author Enzo
* @date : 2021/7/5
*/
@Slf4j
@Component
public class HttpAuthHandler extends TextWebSocketHandler {
/**
* socket
*
* @param session session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Object token = session.getAttributes().get("nickname");
if (token != null) {
// 用户连接成功,放入在线用户缓存
WsSessionManager.add(token.toString(), session);
} else {
throw new TaoMiCommException("用户登录已经失效!");
}
}
/**
*
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
}
/**
* socket
*
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Object sessionKey = session.getAttributes().get("nickname");
if (sessionKey != null) {
// 用户退出,移除缓存
WsSessionManager.remove(sessionKey.toString());
}
}
}

@ -0,0 +1,20 @@
package com.hchbox.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Enzo
* @date : 2021/7/5
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}

@ -0,0 +1,88 @@
package com.hchbox.config;
import com.hchbox.exception.TaoMiCommException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Enzo
* @date : 2021/7/5
*/
@Slf4j
public class WsSessionManager {
private WsSessionManager() {
}
/**
* session
*/
private static final ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
/**
* session
*
* @param key key
*/
public static void add(String key, WebSocketSession session) {
// 添加 session
SESSION_POOL.put(key, session);
}
/**
* session, session
*
* @param key key
* @return
*/
public static WebSocketSession remove(String key) {
// 删除 session
return SESSION_POOL.remove(key);
}
/**
*
*
* @param key key
*/
public static void removeAndClose(String key) {
WebSocketSession session = remove(key);
if (session != null) {
try {
// 关闭连接
session.close();
} catch (IOException e) {
log.error("close link exception key as {}", key);
throw new TaoMiCommException("the remove session error");
}
}
}
/**
* session
*
* @param key key
* @return session
*/
public static WebSocketSession get(String key) {
// 获得 session
return SESSION_POOL.get(key);
}
public static void sendMessageToUser(String nickname, TextMessage message) {
WebSocketSession session = SESSION_POOL.get(nickname);
if (session != null){
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("the send message error nickname as {}",nickname);
throw new TaoMiCommException("the send message error");
}
}
}
}

@ -0,0 +1,130 @@
package com.hchbox.socket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Enzo
* @date : 2021/7/5
*/
@Slf4j
@Component
@ServerEndpoint(value = "/prosperous/")
public class WebSocketServer {
@PostConstruct
public void init() {
log.info("websocket 加载");
}
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
/**
* concurrent线SetSession
*/
private static final CopyOnWriteArraySet<Session> SESSIONS = new CopyOnWriteArraySet<>();
/**
*
*/
@OnOpen
public void onOpen(Session session) {
SESSIONS.add(session);
// 在线数加1
int cnt = ONLINE_COUNT.incrementAndGet();
log.info("有连接加入,当前连接数为:{}", cnt);
sendMessage(session, "连接成功");
}
/**
*
*/
@OnClose
public void onClose(Session session) {
SESSIONS.remove(session);
int cnt = ONLINE_COUNT.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
*
*
* @param message
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息:{}", message);
sendMessage(session, "收到消息,消息内容:" + message);
}
/**
*
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:{}Session ID {}", error.getMessage(), session.getId());
error.printStackTrace();
}
/**
* session
*
* @param session
* @param message
*/
public static void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(String.format("%s (From ServerSession ID=%s)", message, session.getId()));
} catch (IOException e) {
log.error("发送消息出错:{}", e.getMessage());
e.printStackTrace();
}
}
/**
*
*
* @param message
* @throws IOException
*/
public static void BroadCastInfo(String message) throws IOException {
for (Session session : SESSIONS) {
if (session.isOpen()) {
sendMessage(session, message);
}
}
}
/**
* Session
*
* @param sessionId
* @param message
* @throws IOException
*/
public static void sendMessage(String message, String sessionId) throws IOException {
Session session = null;
for (Session s : SESSIONS) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
sendMessage(session, message);
} else {
log.warn("没有找到你指定ID的会话{}", sessionId);
}
}
}
Loading…
Cancel
Save