package com.odianyun.crm.socket.handler;

import com.alibaba.fastjson.JSON;
import com.odianyun.architecture.caddy.SystemContext;
import com.odianyun.crm.model.guide.event.WechatSendMessageEvent;
import com.odianyun.crm.model.guide.po.WechatSendMessagePO;
import com.odianyun.crm.socket.dto.WebSocketMessage;
import com.odianyun.crm.socket.enums.MessageType;
import com.odianyun.crm.socket.handler.messageHandler.BaseSendMessageHandler;
import com.odianyun.crm.socket.handler.messageHandler.MessageHandler;
import com.odianyun.exception.factory.OdyExceptionFactory;
import com.odianyun.mq.common.netty.component.DefaultThreadFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

@Component
/* loaded from: input_file:WEB-INF/lib/crm-web-starter-web-jzt-2.10.0-test-20230704.025205-25.jar:com/odianyun/crm/socket/handler/CrmWebSocketHandler.class */
public class CrmWebSocketHandler extends AbstractWebSocketHandler implements ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CrmWebSocketHandler.class);
    protected static final Map<String, WebSocketSession> SESSIONID_TO_SESSION = Maps.newHashMap();
    protected static final Map<String, Long> SESSIONID_TO_PING_TIME = Maps.newHashMap();
    private static final Map<MessageType, MessageHandler> MESSAGE_HANDLERS = Maps.newHashMap();

    @Value("${crm.websocket.timeout:60000}")
    public Long timeout = 60000L;

    @Value("${crm.websocket.pingInterval:10000}")
    public Long pingInterval = 10000L;
    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory());

    @PostConstruct
    public void createAutoCloseSessionThread() {
        this.executorService.scheduleWithFixedDelay(() -> {
            SESSIONID_TO_SESSION.forEach((str, webSocketSession) -> {
                try {
                    webSocketSession.sendMessage(new PingMessage());
                } catch (IOException e) {
                    OdyExceptionFactory.log(e);
                    removeSession(webSocketSession.getId());
                    logger.error("发送ping消息失败", (Throwable) e);
                }
            });
        }, this.pingInterval.longValue(), this.pingInterval.longValue(), TimeUnit.MILLISECONDS);
        this.executorService.scheduleWithFixedDelay(() -> {
            Long valueOf = Long.valueOf(System.currentTimeMillis());
            SESSIONID_TO_SESSION.forEach((str, webSocketSession) -> {
                if (valueOf.longValue() - SESSIONID_TO_PING_TIME.getOrDefault(str, valueOf).longValue() > this.timeout.longValue()) {
                    removeSession(str);
                }
            });
        }, this.pingInterval.longValue(), this.pingInterval.longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        for (MessageHandler messageHandler : applicationContext.getBeansOfType(MessageHandler.class).values()) {
            MESSAGE_HANDLERS.put(messageHandler.getType(), messageHandler);
        }
    }

    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler, org.springframework.web.socket.WebSocketHandler
    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
        SESSIONID_TO_SESSION.put(webSocketSession.getId(), webSocketSession);
        SESSIONID_TO_PING_TIME.put(webSocketSession.getId(), Long.valueOf(System.currentTimeMillis()));
    }

    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler
    protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage textMessage) throws Exception {
        SystemContext.setContextMap((Map) webSocketSession.getAttributes().get("context"));
        String payload = textMessage.getPayload();
        if (logger.isDebugEnabled()) {
            logger.debug("收到websocket的消息：{}", payload);
        }
        WebSocketMessage webSocketMessage = (WebSocketMessage) JSON.parseObject(payload, WebSocketMessage.class);
        if (webSocketMessage == null) {
            sendMessage(webSocketSession, "请求消息不能为空");
            return;
        }
        MessageType of = MessageType.of(webSocketMessage.getType());
        if (of == null) {
            String format = String.format("消息类型不存在，类型：%s，内容：%s", webSocketMessage.getType(), payload);
            webSocketMessage.setMessage(format);
            sendMessage(webSocketSession, JSON.toJSONString(webSocketMessage));
            logger.warn(format);
            return;
        }
        MessageHandler messageHandler = MESSAGE_HANDLERS.get(of);
        if (messageHandler != null) {
            try {
                messageHandler.processMessage(webSocketSession, webSocketMessage);
            } catch (Exception e) {
                OdyExceptionFactory.log(e);
                webSocketMessage.setMessage(e.getMessage());
                sendMessage(webSocketSession, JSON.toJSONString(webSocketMessage));
                logger.error("处理消息异常", (Throwable) e);
            }
        }
    }

    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler, org.springframework.web.socket.WebSocketHandler
    public void handleTransportError(WebSocketSession webSocketSession, Throwable th) throws Exception {
        logger.error("连接出现错误", th);
        removeSession(webSocketSession.getId());
    }

    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler, org.springframework.web.socket.WebSocketHandler
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
        removeSession(webSocketSession.getId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler
    public void handlePongMessage(WebSocketSession webSocketSession, PongMessage pongMessage) throws Exception {
        super.handlePongMessage(webSocketSession, pongMessage);
        SESSIONID_TO_PING_TIME.put(webSocketSession.getId(), Long.valueOf(System.currentTimeMillis()));
    }

    @Override // org.springframework.web.socket.handler.AbstractWebSocketHandler, org.springframework.web.socket.WebSocketHandler
    public boolean supportsPartialMessages() {
        return false;
    }

    private void sendMessage(WebSocketSession webSocketSession, String str) {
        try {
            webSocketSession.sendMessage(new TextMessage(str));
        } catch (IOException e) {
            OdyExceptionFactory.log(e);
            removeSession(webSocketSession.getId());
        }
    }

    private void removeSession(String str) {
        WebSocketSession remove = SESSIONID_TO_SESSION.remove(str);
        SESSIONID_TO_PING_TIME.remove(str);
        MESSAGE_HANDLERS.values().forEach(messageHandler -> {
            messageHandler.remove(str);
        });
        if (remove == null || !remove.isOpen()) {
            return;
        }
        try {
            remove.close();
        } catch (IOException e) {
            OdyExceptionFactory.log(e);
        }
    }

    public void processWechatSendMessageEvent(WechatSendMessageEvent wechatSendMessageEvent) throws Exception {
        WechatSendMessagePO data = wechatSendMessageEvent.getData();
        MESSAGE_HANDLERS.values().forEach(messageHandler -> {
            if (messageHandler instanceof BaseSendMessageHandler) {
                for (String str : ((BaseSendMessageHandler) messageHandler).getNeedSendMessageSessionId(data)) {
                    WebSocketSession webSocketSession = SESSIONID_TO_SESSION.get(str);
                    if (webSocketSession == null) {
                        removeSession(str);
                        return;
                    }
                    WebSocketMessage webSocketMessage = new WebSocketMessage();
                    webSocketMessage.setType(messageHandler.getType().toString());
                    webSocketMessage.setData(data);
                    sendMessage(webSocketSession, JSON.toJSONString(webSocketMessage));
                }
            }
        });
    }
}
