package org.apache.camel.component.kafka;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaProducer.class */
public class KafkaProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
    private final KafkaEndpoint endpoint;
    private ExecutorService workerPool;
    private boolean shutdownWorkerPool;
    private volatile boolean closeKafkaProducer;

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaProducer$DelegatingCallback.class */
    private final class DelegatingCallback implements Callback {
        private final List<Callback> callbacks;

        public DelegatingCallback(Callback... callbackArr) {
            this.callbacks = Arrays.asList(callbackArr);
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            this.callbacks.forEach(callback -> {
                callback.onCompletion(recordMetadata, exc);
            });
        }
    }

    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaProducer$KafkaProducerCallBack.class */
    private final class KafkaProducerCallBack implements Callback {
        private final Object body;
        private final AsyncCallback callback;
        private final AtomicInteger count;
        private final List<RecordMetadata> recordMetadatas;

        KafkaProducerCallBack(Object obj, AsyncCallback asyncCallback) {
            this.count = new AtomicInteger(1);
            this.recordMetadatas = new ArrayList();
            this.body = obj;
            this.callback = asyncCallback;
            if (KafkaProducer.this.endpoint.getConfiguration().isRecordMetadata()) {
                if (obj instanceof Exchange) {
                    Exchange exchange = (Exchange) obj;
                    if (exchange.hasOut()) {
                        exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, this.recordMetadatas);
                    } else {
                        exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, this.recordMetadatas);
                    }
                }
                if (obj instanceof Message) {
                    ((Message) obj).setHeader(KafkaConstants.KAFKA_RECORDMETA, this.recordMetadatas);
                }
            }
        }

        public KafkaProducerCallBack(KafkaProducer kafkaProducer, Exchange exchange) {
            this(exchange, null);
        }

        public KafkaProducerCallBack(KafkaProducer kafkaProducer, Message message) {
            this(message, null);
        }

        public KafkaProducerCallBack(KafkaProducer kafkaProducer, Object obj) {
            this(obj, null);
        }

        void increment() {
            this.count.incrementAndGet();
        }

        boolean allSent() {
            if (this.count.decrementAndGet() != 0) {
                return false;
            }
            KafkaProducer.LOG.trace("All messages sent, continue routing.");
            if (this.callback == null) {
                return true;
            }
            this.callback.done(true);
            return true;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                if (this.body instanceof Exchange) {
                    ((Exchange) this.body).setException(exc);
                }
                if ((this.body instanceof Message) && ((Message) this.body).getExchange() != null) {
                    ((Message) this.body).getExchange().setException(exc);
                }
            }
            this.recordMetadatas.add(recordMetadata);
            if (this.count.decrementAndGet() == 0) {
                KafkaProducer.this.workerPool.submit(new Runnable() { // from class: org.apache.camel.component.kafka.KafkaProducer.KafkaProducerCallBack.1
                    @Override // java.lang.Runnable
                    public void run() {
                        KafkaProducer.LOG.trace("All messages sent, continue routing.");
                        if (KafkaProducerCallBack.this.callback != null) {
                            KafkaProducerCallBack.this.callback.done(false);
                        }
                    }
                });
            }
        }
    }

    public KafkaProducer(KafkaEndpoint kafkaEndpoint) {
        super(kafkaEndpoint);
        this.endpoint = kafkaEndpoint;
    }

    Properties getProps() {
        Properties createProducerProperties = this.endpoint.getConfiguration().createProducerProperties();
        this.endpoint.updateClassProperties(createProducerProperties);
        String brokers = this.endpoint.m5getComponent().getKafkaClientFactory().getBrokers(this.endpoint.getConfiguration());
        if (brokers != null) {
            createProducerProperties.put("bootstrap.servers", brokers);
        }
        return createProducerProperties;
    }

    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
        return this.kafkaProducer;
    }

    public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public ExecutorService getWorkerPool() {
        return this.workerPool;
    }

    public void setWorkerPool(ExecutorService executorService) {
        this.workerPool = executorService;
    }

    protected void doStart() throws Exception {
        Properties props = getProps();
        if (this.kafkaProducer == null) {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
                LOG.trace("Creating KafkaProducer");
                this.kafkaProducer = this.endpoint.m5getComponent().getKafkaClientFactory().getProducer(props);
                this.closeKafkaProducer = true;
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                LOG.debug("Created KafkaProducer: {}", this.kafkaProducer);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }
        if (this.endpoint.getConfiguration().isSynchronous() || this.workerPool != null) {
            return;
        }
        this.workerPool = this.endpoint.createProducerExecutor();
        this.shutdownWorkerPool = true;
    }

    protected void doStop() throws Exception {
        if (this.kafkaProducer != null && this.closeKafkaProducer) {
            LOG.debug("Closing KafkaProducer: {}", this.kafkaProducer);
            this.kafkaProducer.close();
            this.kafkaProducer = null;
        }
        if (!this.shutdownWorkerPool || this.workerPool == null) {
            return;
        }
        int shutdownTimeout = this.endpoint.getConfiguration().getShutdownTimeout();
        LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout));
        this.endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(this.workerPool, shutdownTimeout);
        this.workerPool = null;
    }

    protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(final Exchange exchange) throws Exception {
        String topic = this.endpoint.getConfiguration().getTopic();
        Object removeHeader = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
        if (removeHeader != null) {
            LOG.debug("Using override topic: {}", removeHeader);
            topic = removeHeader.toString();
        }
        if (topic == null) {
            topic = URISupport.extractRemainderPath(new URI(this.endpoint.getEndpointUri()), true);
        }
        final List<Header> propagatedHeaders = getPropagatedHeaders(exchange, this.endpoint.getConfiguration());
        Object body = exchange.getIn().getBody();
        Iterator it = null;
        if (body instanceof Iterable) {
            it = ((Iterable) body).iterator();
        } else if (body instanceof Iterator) {
            it = (Iterator) body;
        }
        if (it != null) {
            final Iterator it2 = it;
            final String str = topic;
            return new Iterator<KeyValueHolder<Object, ProducerRecord>>() { // from class: org.apache.camel.component.kafka.KafkaProducer.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it2.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public KeyValueHolder<Object, ProducerRecord> next() {
                    Message message;
                    Object next = it2.next();
                    String str2 = str;
                    Object obj = null;
                    Integer num = null;
                    boolean z = false;
                    boolean z2 = false;
                    Object obj2 = next;
                    if ((next instanceof Exchange) || (next instanceof Message)) {
                        Exchange exchange2 = null;
                        if (next instanceof Exchange) {
                            exchange2 = (Exchange) next;
                            message = exchange2.getIn();
                        } else {
                            message = (Message) next;
                        }
                        if (message.getHeader(KafkaConstants.OVERRIDE_TOPIC) != null) {
                            str2 = (String) message.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
                        }
                        if (message.getHeader(KafkaConstants.PARTITION_KEY) != null) {
                            num = KafkaProducer.this.endpoint.getConfiguration().getPartitionKey() != null ? KafkaProducer.this.endpoint.getConfiguration().getPartitionKey() : (Integer) message.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
                            z = num != null;
                        }
                        if (message.getHeader(KafkaConstants.KEY) != null) {
                            obj = KafkaProducer.this.endpoint.getConfiguration().getKey() != null ? KafkaProducer.this.endpoint.getConfiguration().getKey() : message.getHeader(KafkaConstants.KEY);
                            z2 = (obj != null ? KafkaProducer.this.tryConvertToSerializedType(exchange2, obj, KafkaProducer.this.endpoint.getConfiguration().getKeySerializer()) : null) != null;
                        }
                        obj2 = KafkaProducer.this.tryConvertToSerializedType(exchange2 == null ? exchange : exchange2, message.getBody(), KafkaProducer.this.endpoint.getConfiguration().getValueSerializer());
                    }
                    return (z && z2) ? new KeyValueHolder<>(next, new ProducerRecord(str2, num, (Long) null, obj, obj2, propagatedHeaders)) : z2 ? new KeyValueHolder<>(next, new ProducerRecord(str2, (Integer) null, (Long) null, obj, obj2, propagatedHeaders)) : new KeyValueHolder<>(next, new ProducerRecord(str2, (Integer) null, (Long) null, (Object) null, obj2, propagatedHeaders));
                }

                @Override // java.util.Iterator
                public void remove() {
                    it2.remove();
                }
            };
        }
        Integer partitionKey = this.endpoint.getConfiguration().getPartitionKey() != null ? this.endpoint.getConfiguration().getPartitionKey() : (Integer) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
        boolean z = partitionKey != null;
        Object key = this.endpoint.getConfiguration().getKey() != null ? this.endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
        boolean z2 = (key != null ? tryConvertToSerializedType(exchange, key, this.endpoint.getConfiguration().getKeySerializer()) : null) != null;
        Object tryConvertToSerializedType = tryConvertToSerializedType(exchange, body, this.endpoint.getConfiguration().getValueSerializer());
        return Collections.singletonList(new KeyValueHolder(exchange, (z && z2) ? new ProducerRecord(topic, partitionKey, (Long) null, key, tryConvertToSerializedType, propagatedHeaders) : z2 ? new ProducerRecord(topic, (Integer) null, (Long) null, key, tryConvertToSerializedType, propagatedHeaders) : new ProducerRecord(topic, (Integer) null, (Long) null, (Object) null, tryConvertToSerializedType, propagatedHeaders))).iterator();
    }

    private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration kafkaConfiguration) {
        HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy();
        KafkaHeaderSerializer headerSerializer = kafkaConfiguration.getHeaderSerializer();
        return (List) exchange.getIn().getHeaders().entrySet().stream().filter(entry -> {
            return shouldBeFiltered(entry, exchange, headerFilterStrategy);
        }).map(entry2 -> {
            return getRecordHeader(entry2, headerSerializer);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange);
    }

    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, KafkaHeaderSerializer kafkaHeaderSerializer) {
        byte[] serialize = kafkaHeaderSerializer.serialize(entry.getKey(), entry.getValue());
        if (serialize == null) {
            return null;
        }
        return new RecordHeader(entry.getKey(), serialize);
    }

    public void process(Exchange exchange) throws Exception {
        Message message;
        Exchange exchange2;
        Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder = createRecorder(exchange);
        LinkedList<KeyValueHolder> linkedList = new LinkedList();
        ArrayList arrayList = new ArrayList();
        if (this.endpoint.getConfiguration().isRecordMetadata()) {
            if (exchange.hasOut()) {
                exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, arrayList);
            } else {
                exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, arrayList);
            }
        }
        while (createRecorder.hasNext()) {
            KeyValueHolder<Object, ProducerRecord> next = createRecorder.next();
            ProducerRecord producerRecord = (ProducerRecord) next.getValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{producerRecord.topic(), producerRecord.partition(), producerRecord.key()});
            }
            linkedList.add(new KeyValueHolder(next.getKey(), this.kafkaProducer.send(producerRecord)));
        }
        for (KeyValueHolder keyValueHolder : linkedList) {
            List singletonList = Collections.singletonList(((Future) keyValueHolder.getValue()).get());
            arrayList.addAll(singletonList);
            if ((keyValueHolder.getKey() instanceof Exchange) && (exchange2 = (Exchange) keyValueHolder.getKey()) != null && this.endpoint.getConfiguration().isRecordMetadata()) {
                if (exchange2.hasOut()) {
                    exchange2.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, singletonList);
                } else {
                    exchange2.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, singletonList);
                }
            }
            if ((keyValueHolder.getKey() instanceof Message) && (message = (Message) keyValueHolder.getKey()) != null && this.endpoint.getConfiguration().isRecordMetadata()) {
                message.setHeader(KafkaConstants.KAFKA_RECORDMETA, singletonList);
            }
        }
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder = createRecorder(exchange);
            KafkaProducerCallBack kafkaProducerCallBack = new KafkaProducerCallBack(exchange, asyncCallback);
            while (createRecorder.hasNext()) {
                kafkaProducerCallBack.increment();
                KeyValueHolder<Object, ProducerRecord> next = createRecorder.next();
                ProducerRecord producerRecord = (ProducerRecord) next.getValue();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sending message to topic: {}, partition: {}, key: {}", new Object[]{producerRecord.topic(), producerRecord.partition(), producerRecord.key()});
                }
                ArrayList arrayList = new ArrayList(Arrays.asList(kafkaProducerCallBack));
                if (next.getKey() != null) {
                    arrayList.add(new KafkaProducerCallBack(this, next.getKey()));
                }
                this.kafkaProducer.send(producerRecord, new DelegatingCallback((Callback[]) arrayList.toArray(new Callback[0])));
            }
            return kafkaProducerCallBack.allSent();
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }

    protected Object tryConvertToSerializedType(Exchange exchange, Object obj, String str) {
        byte[] bArr;
        Object obj2 = null;
        if (exchange == null) {
            return obj;
        }
        if (KafkaConstants.KAFKA_DEFAULT_SERIALIZER.equals(str)) {
            obj2 = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, obj);
        } else if ("org.apache.kafka.common.serialization.ByteArraySerializer".equals(str)) {
            obj2 = exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, obj);
        } else if ("org.apache.kafka.common.serialization.ByteBufferSerializer".equals(str)) {
            obj2 = exchange.getContext().getTypeConverter().tryConvertTo(ByteBuffer.class, exchange, obj);
        } else if ("org.apache.kafka.common.serialization.BytesSerializer".equals(str) && (bArr = (byte[]) exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, obj)) != null) {
            obj2 = new Bytes(bArr);
        }
        return obj2 != null ? obj2 : obj;
    }
}
