package kd.bos.kafka.sender;

import java.util.Properties;
import kd.bos.kafka.IKafkaSender;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;

/* loaded from: input_file:kd/bos/kafka/sender/AbstractKafkaSender.class */
public class AbstractKafkaSender<T> implements IKafkaSender<T> {
    private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    private static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
    private static final String SASL_MECHANISM = "sasl.mechanism";
    private static final String SECURITY_PROTOCOL = "security.protocol";
    private static final String DEFAULT_SECURITY_PROTOCOL = "SASL_PLAINTEXT";
    private static final String DEFAULT_SASL_MECHANISM = "PLAIN";
    private static Logger logger = Logger.getLogger(AbstractKafkaSender.class);
    private String topic;
    private Properties props;
    private Producer<String, T> producer;

    public AbstractKafkaSender(String str, String str2, String str3, String str4, String str5) {
        this.topic = str3;
        this.props = getProducerProperties(str, str2, str4, str5);
        this.producer = new KafkaProducer(this.props);
    }

    private static String getKafkaAuthConfig(String str, String str2) {
        return "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + str + "\" password=\"" + str2 + "\";";
    }

    @Override // kd.bos.kafka.IKafkaSender
    public void send(T t) {
        try {
            this.producer.send(new ProducerRecord(this.topic, t));
        } catch (Exception e) {
            logger.warn("AbstractKafkaSender send error", e);
        }
    }

    @Override // kd.bos.kafka.IKafkaSender
    public void close() {
        this.producer.close();
    }

    private Properties getProducerProperties(String str, String str2, String str3, String str4) {
        Properties properties = new Properties();
        properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, str2);
        properties.setProperty("key.serializer", str);
        properties.setProperty("value.serializer", str);
        properties.setProperty("acks", "0");
        properties.setProperty("max.request.size", String.valueOf(10485760));
        if (str3 != null && str4 != null) {
            properties.setProperty(SASL_JAAS_CONFIG, getKafkaAuthConfig(str3, str4));
            properties.setProperty(SASL_MECHANISM, DEFAULT_SASL_MECHANISM);
            properties.setProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL);
        }
        return properties;
    }
}
