package org.apache.shenyu.protocol.tcp.connection;

import com.google.common.eventbus.Subscribe;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;

/* loaded from: input_file:org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.class */
public class ActivityConnectionObserver implements ConnectionObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityConnectionObserver.class);
    private final Map<Connection, ConnectionObserver.State> cache = new ConcurrentHashMap();
    private final String name;

    public ActivityConnectionObserver(String str) {
        this.name = str;
    }

    public void onStateChange(Connection connection, ConnectionObserver.State state) {
        if (state == ConnectionObserver.State.CONNECTED) {
            this.cache.put(connection, state);
            LOG.info("{} add connection into cache ={}", this.name, connection);
        } else if (state == ConnectionObserver.State.DISCONNECTING || state == ConnectionObserver.State.RELEASED) {
            this.cache.remove(connection);
            LOG.info("{} remove connection into cache ={}", this.name, connection);
        } else if (this.cache.containsKey(connection)) {
            this.cache.put(connection, state);
        }
    }

    @Subscribe
    public void onRemove(List<DiscoveryUpstreamData> list) {
        LOG.info("shenyu {} ConnectionObserver  do on remove upstreams", this.name);
        for (Connection connection : this.cache.keySet()) {
            if (in(list, connection.channel().remoteAddress())) {
                LOG.info("shenyu dispose {} connection ", connection);
                connection.disposeNow();
            }
        }
    }

    private boolean in(List<DiscoveryUpstreamData> list, SocketAddress socketAddress) {
        return list.stream().anyMatch(discoveryUpstreamData -> {
            String substring = socketAddress.toString().substring(1);
            String url = discoveryUpstreamData.getUrl();
            LOG.info("compare {} , {}", substring, url);
            return StringUtils.equals(substring, url);
        });
    }
}
