package org.apache.shenyu.plugin.response.strategy;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.shenyu.common.enums.RpcTypeEnum;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;

/* loaded from: input_file:org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.class */
public class NettyClientMessageWriter implements MessageWriter {
    private final List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_NDJSON);

    @Override // org.apache.shenyu.plugin.response.strategy.MessageWriter
    public Mono<Void> writeWith(ServerWebExchange serverWebExchange, ShenyuPluginChain shenyuPluginChain) {
        return shenyuPluginChain.execute(serverWebExchange).doOnError(th -> {
            cleanup(serverWebExchange);
        }).then(Mono.defer(() -> {
            Connection connection = (Connection) serverWebExchange.getAttribute("nettyClientResponseConnection");
            if (Objects.isNull(connection)) {
                return WebFluxResultUtils.result(serverWebExchange, ShenyuResultWrap.error(serverWebExchange, ShenyuResultEnum.SERVICE_RESULT_ERROR));
            }
            ServerHttpResponse response = serverWebExchange.getResponse();
            NettyDataBufferFactory bufferFactory = response.bufferFactory();
            ByteBufFlux retain = connection.inbound().receive().retain();
            Objects.requireNonNull(bufferFactory);
            Flux map = retain.map(bufferFactory::wrap);
            Mono writeAndFlushWith = isStreamingMediaType(response.getHeaders().getContentType()) ? response.writeAndFlushWith(map.map((v0) -> {
                return Flux.just(v0);
            })) : response.writeWith(map);
            serverWebExchange.getAttributes().put("RESPONSE_MONO", writeAndFlushWith);
            Optional.ofNullable((Consumer) serverWebExchange.getAttribute("WATCHER_HTTP_STATUS")).ifPresent(consumer -> {
                consumer.accept(response.getStatusCode());
            });
            return writeAndFlushWith.onErrorResume(th2 -> {
                return releaseIfNotConsumed(map, th2);
            });
        })).doOnCancel(() -> {
            cleanup(serverWebExchange);
        });
    }

    @Override // org.apache.shenyu.plugin.response.strategy.MessageWriter
    public List<String> supportTypes() {
        return Lists.newArrayList(new String[]{RpcTypeEnum.HTTP.getName(), RpcTypeEnum.SPRING_CLOUD.getName(), RpcTypeEnum.WEB_SOCKET.getName()});
    }

    private void cleanup(ServerWebExchange serverWebExchange) {
        Connection connection = (Connection) serverWebExchange.getAttribute("nettyClientResponseConnection");
        if (Objects.nonNull(connection)) {
            connection.dispose();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Mono<T> releaseIfNotConsumed(Flux<NettyDataBuffer> flux, Throwable th) {
        return flux != null ? flux.map((v0) -> {
            return DataBufferUtils.release(v0);
        }).then(Mono.error(th)) : Mono.error(th);
    }

    private boolean isStreamingMediaType(@Nullable MediaType mediaType) {
        if (Objects.nonNull(mediaType)) {
            Stream<MediaType> stream = this.streamingMediaTypes.stream();
            Objects.requireNonNull(mediaType);
            if (stream.anyMatch(mediaType::isCompatibleWith)) {
                return true;
            }
        }
        return false;
    }
}
