package com.jzt.wotu.component.kafka.service;

import com.jzt.wotu.util.KeyStoreHelper;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.admin.Admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jzt/wotu/component/kafka/service/KafkaBrokerServiceImpl.class */
public class KafkaBrokerServiceImpl implements KafkaBrokerService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerServiceImpl.class);
    private final String brokers;
    private final String certificate;
    private final String transportProtocol;
    private String saslMechanism;
    private String saslLoginCallbackHandlerClass;
    private String username;
    private String password;
    private String oauthTokenEndpointURI;

    public KafkaBrokerServiceImpl(String str, String str2, String str3) {
        this.brokers = str;
        this.transportProtocol = str2;
        this.certificate = str3;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void ping() throws KafkaBrokerServiceException {
        try {
            listTopicsOrThrowException();
        } catch (Exception e) {
            LOG.warn("Unable to ping the broker", e);
            throw new KafkaBrokerServiceException(e);
        }
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public Set<String> listTopics() {
        try {
            return listTopicsOrThrowException();
        } catch (Exception e) {
            LOG.error("Unable to read the list of topics on the broker", e);
            return Collections.emptySet();
        }
    }

    private Set<String> listTopicsOrThrowException() throws ExecutionException, InterruptedException {
        KeyStoreHelper store = this.certificate != null ? new KeyStoreHelper(this.certificate, KafkaBrokerService.BROKER_CERTIFICATE).store() : null;
        try {
            Admin create = Admin.create(getKafkaAdminClientConfiguration(this.brokers, this.transportProtocol, store));
            try {
                Set<String> set = (Set) create.listTopics().names().get();
                if (create != null) {
                    create.close();
                }
                return set;
            } finally {
            }
        } finally {
            if (store != null && !store.clean()) {
                LOG.warn("Impossible to delete temporary keystore located at " + store.getKeyStorePath());
            }
        }
    }

    private Properties getKafkaAdminClientConfiguration(String str, String str2, KeyStoreHelper keyStoreHelper) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("connections.max.idle.ms", 10000);
        properties.put("request.timeout.ms", 5000);
        properties.put("security.protocol", str2);
        properties.put("default.api.timeout.ms", 5000);
        if (keyStoreHelper != null) {
            properties.put("ssl.endpoint.identification.algorithm", "");
            properties.put("ssl.keystore.location", keyStoreHelper.getKeyStorePath());
            properties.put("ssl.keystore.password", keyStoreHelper.getPassword());
            properties.put("ssl.key.password", keyStoreHelper.getPassword());
            properties.put("ssl.truststore.location", keyStoreHelper.getKeyStorePath());
            properties.put("ssl.truststore.password", keyStoreHelper.getPassword());
        } else if (ObjectHelper.isNotEmpty(this.username) && ObjectHelper.isNotEmpty(this.password)) {
            properties.put("sasl.mechanism", this.saslMechanism);
            if (KafkaBrokerService.OAUTHBEARER.equals(this.saslMechanism)) {
                properties.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"%s\" oauth.client.secret=\"%s\" oauth.token.endpoint.uri=\"%s\" ;", this.username, this.password, this.oauthTokenEndpointURI));
                if (ObjectHelper.isNotEmpty(this.saslLoginCallbackHandlerClass)) {
                    properties.put("sasl.login.callback.handler.class", this.saslLoginCallbackHandlerClass);
                }
            } else if (KafkaBrokerService.PLAIN.equals(this.saslMechanism)) {
                properties.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\" ;", this.username, this.password));
            }
        }
        return properties;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void setSaslMechanism(String str) {
        this.saslMechanism = str;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void setSaslLoginCallbackHandlerClass(String str) {
        this.saslLoginCallbackHandlerClass = str;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void setUsername(String str) {
        this.username = str;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void setPassword(String str) {
        this.password = str;
    }

    @Override // com.jzt.wotu.component.kafka.service.KafkaBrokerService
    public void setOauthTokenEndpointURI(String str) {
        this.oauthTokenEndpointURI = str;
    }
}
