package kd.bos.mq.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kd.bos.context.RequestContextCreator;
import kd.bos.db.tx.TX;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.id.IDService;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.delay.DelayControlManager;
import kd.bos.mq.delay.RabbitMQDelayManager;
import kd.bos.mq.support.ConsumerSupport;
import kd.bos.mq.support.KdtxSupport;
import kd.bos.mq.support.Message;
import kd.bos.mq.support.MessageSerde;
import kd.bos.mq.support.PublisherSupport;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;
import kd.bos.trace.reporter.topology.TopologyTagInject;
import kd.bos.util.ExceptionUtils;

/* loaded from: input_file:kd/bos/mq/rabbit/RabbitPublisher.class */
public class RabbitPublisher extends PublisherSupport {
    private static final String RABBIT_TRACE_NAME = "RabbitPublisher";
    private Channel channel;
    private String queueName;
    private String region;
    private String stack;
    private static Log logger = LogFactory.getLog(RabbitPublisher.class);
    private static ThreadLocal<CloseablePublishers> local = new ThreadLocal<CloseablePublishers>() { // from class: kd.bos.mq.rabbit.RabbitPublisher.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public CloseablePublishers initialValue() {
            return new CloseablePublishers();
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/mq/rabbit/RabbitPublisher$CloseablePublishers.class */
    public static class CloseablePublishers implements Closeable {
        private Map<RabbitPublisher, String> publishers = new ConcurrentHashMap(2);

        CloseablePublishers() {
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.publishers.size() > 0) {
                RabbitPublisher.logger.warn("MQ publisher leak! detail:" + getStacks());
            }
            this.publishers.forEach((rabbitPublisher, str) -> {
                try {
                    rabbitPublisher.close();
                } catch (Exception e) {
                    try {
                        ExceptionLogger.log("mqchannel close error", e);
                    } catch (Throwable th) {
                    }
                }
            });
            this.publishers.clear();
        }

        public void put(RabbitPublisher rabbitPublisher) {
            this.publishers.put(rabbitPublisher, "");
        }

        public int size() {
            return this.publishers.size();
        }

        public void remove(RabbitPublisher rabbitPublisher) {
            this.publishers.remove(rabbitPublisher);
        }

        public String getStacks() {
            StringBuilder sb = new StringBuilder();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            this.publishers.forEach((rabbitPublisher, str) -> {
                if (atomicInteger.incrementAndGet() < 100) {
                    sb.append(rabbitPublisher.queueName).append("#").append(rabbitPublisher.region).append(" ,").append(rabbitPublisher.stack);
                }
            });
            return sb.toString();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            this.publishers.forEach((rabbitPublisher, str) -> {
                if (atomicInteger.incrementAndGet() < 100) {
                    sb.append(rabbitPublisher.queueName).append("#").append(rabbitPublisher.region).append(" ,");
                }
            });
            return sb.toString();
        }
    }

    public RabbitPublisher(Channel channel, String str, boolean z, String str2) {
        super(z);
        this.channel = channel;
        this.queueName = str;
        this.region = str2;
        this.stack = ExceptionUtils.getExceptionStackTraceMessage(new Exception());
        CloseablePublishers closeablePublishers = local.get();
        int intValue = Integer.getInteger("mq.publish.maxsizeinthread", 20).intValue();
        if (closeablePublishers.size() >= intValue) {
            logger.warn(("MQ publisher leak! More than " + intValue + " publisher in current thread. ") + "detail:" + closeablePublishers.toString() + " ,stack:" + closeablePublishers.getStacks());
        }
        local.get().put(this);
    }

    @Override // kd.bos.mq.support.PublisherSupport
    protected void _beginTrans() {
        try {
            this.channel.txSelect();
        } catch (IOException e) {
            throw new KDException(BosErrorCode.rabbitmqException, "can't begin transaction", e);
        }
    }

    @Override // kd.bos.mq.support.PublisherSupport
    protected void _commitTrans() {
        try {
            this.channel.txCommit();
        } catch (IOException e) {
            throw new KDException(BosErrorCode.rabbitmqException, "can't commit transaction", e);
        }
    }

    @Override // kd.bos.mq.support.PublisherSupport
    protected void _rollbackTrans() {
        try {
            this.channel.txRollback();
        } catch (IOException e) {
            throw new KDException(BosErrorCode.rabbitmqException, "can't rollback transaction", e);
        }
    }

    @Override // kd.bos.mq.support.PublisherSupport, kd.bos.mq.MessagePublisher
    public void publish(byte[] bArr) {
        publish0(bArr);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publish(String str) {
        publish0(str);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publish(Object obj) {
        publish0(obj);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishInDbTranscation(Object obj) {
        publishInDbTranscation(null, obj);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishInDbTranscation(String str, Object obj) {
        publishDelayInDbTranscation(str, obj, -1);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishDelayInDbTranscation(Object obj, int i) {
        publishDelayInDbTranscation(null, obj, i);
    }

    private void publishDelayInDbTranscation(String str, Object obj, int i) {
        String writtenRouteKey = TX.getWrittenRouteKey();
        publish0(obj, null, true, writtenRouteKey == null ? str : writtenRouteKey, i);
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishDelay(Object obj, int i) {
        publish0(obj, null, false, null, i);
    }

    private void publish0(Object obj) {
        publish0(obj, null, false, null);
    }

    private void publish0(Object obj, String str, boolean z, String str2) {
        publish0(obj, str, z, str2, -1);
    }

    /* JADX WARN: Finally extract failed */
    private void publish0(Object obj, String str, boolean z, String str2, int i) {
        if (!PublisherSupport.isCanPublishMessage(this.queueName)) {
            throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"The current node has been discarded due to grayscale completion,queue name is " + this.queueName});
        }
        try {
            TraceSpan create = Tracer.create(RABBIT_TRACE_NAME, "publish0", true);
            Throwable th = null;
            try {
                create.addTag("queueName", this.queueName);
                Connection connection = this.channel.getConnection();
                TopologyTagInject.setCompentTag(create.getInnerSpan(), connection.getAddress() + ":" + connection.getPort(), "RabbitMQ", "mq");
                this.queueName = PublisherSupport.getStandardQueue(this.queueName);
                String acrossNodePublishQueueNameGray = ConsumerSupport.getAcrossNodePublishQueueNameGray(this.queueName);
                Message message = toMessage(obj);
                message.setConsumeSynchronizeTag(str);
                if (z) {
                    if (str2 == null) {
                        throw new KDException(BosErrorCode.sQLConnection, new Object[]{"routKey can not be null"});
                    }
                    if (KdtxSupport.isDtxEnable()) {
                        startDTXAndRegistry(message, str2, this.region, acrossNodePublishQueueNameGray);
                        if (create != null) {
                            if (0 == 0) {
                                create.close();
                                return;
                            }
                            try {
                                create.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    recordMqTrans(str2, message);
                }
                DelayControlManager.installDelayInfo(message, i);
                byte[] encode = MessageSerde.get().encode(message);
                checkPayload(encode, acrossNodePublishQueueNameGray);
                if (message.getStartDeliverTime() > 0) {
                    RabbitMQDelayManager.publishDelayMessage(encode, this.channel, acrossNodePublishQueueNameGray, DelayControlManager.selectMaxMetaTime(i));
                } else {
                    realPublish(encode, acrossNodePublishQueueNameGray);
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
            } catch (Throwable th4) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th4;
            }
        } catch (Exception e) {
            logger.error("publish0 error", e);
            throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"can't publish"});
        }
    }

    private void realPublish(byte[] bArr, String str) throws IOException {
        AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        if (Config.reusePublisherChannel()) {
            PublisherChannels.getChannel(this.region).basicPublish("", str, build, bArr);
        } else {
            this.channel.basicPublish("", str, build, bArr);
        }
    }

    @Override // kd.bos.mq.support.PublisherSupport, kd.bos.mq.MessagePublisher
    public void $$publishConfirm(byte[] bArr) {
        try {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
            builder.deliveryMode(2);
            AMQP.BasicProperties build = builder.build();
            this.channel.confirmSelect();
            this.channel.basicPublish("", this.queueName, build, bArr);
            if (this.channel.waitForConfirms(Long.getLong("mq.publish.confirm.timeout", 60000L).longValue())) {
            } else {
                throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"publish fail: wait for confirm timeout"});
            }
        } catch (Exception e) {
            logger.error("$$publishConfirm error", e);
            throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"can't publish"});
        }
    }

    private Message toMessage(Object obj) {
        Message message = new Message();
        message.setBody(obj);
        message.setRequestContext(RequestContextCreator.createForMQ());
        message.setInnerId(IDService.get().genLongId());
        message.setMessageTime();
        return message;
    }

    @Override // kd.bos.mq.support.PublisherSupport, kd.bos.mq.MessagePublisher
    public void close() {
        try {
            local.get().remove(this);
            this.channel.close();
        } catch (Exception e) {
        }
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publish(Object obj, String str) {
        throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"SimplePublisher can't publish message with partitionKey directly!"});
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishInDbTranscation(String str, Object obj, String str2) {
        throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"SimplePublisher can't publishInDbTranscation  with partitionKey directly!"});
    }

    @Override // kd.bos.mq.MessagePublisher
    public void publishDelay(Object obj, int i, String str) {
        throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"SimplePublisher can't publishDelay message with appId directly!"});
    }
}
