package kd.isc.iscb.platform.core.dc.mq.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.rabbitmq.RabbitmqFactory;
import kd.isc.iscb.platform.core.connector.ConnectorError;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueServer;
import kd.isc.iscb.platform.core.dc.mq.MessageReceiver;
import kd.isc.iscb.platform.core.dc.mq.PublishedMessage;
import kd.isc.iscb.platform.core.vc.MappingResultImportJob;
import kd.isc.iscb.util.data.ReadLockFreeMap;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.err.CommonError;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.misc.Pair;
import kd.isc.iscb.util.misc.StringUtil;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/rabbit/RabbitMQ.class */
public class RabbitMQ implements MessageQueueServer {
    private static Log logger = LogFactory.getLog(MessageConsumer.class);
    public static final String SPECIAL_TAG = "$PLATFORM";
    private Map<String, Publisher> publishers = new ReadLockFreeMap();
    private Map<String, Channel> consumers = new HashMap();
    private ConnectionFactory factory;
    private Connection cn;
    private DynamicObject cfg;
    private static final String EXCHANGE_NAME = "exchangeName";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/rabbit/RabbitMQ$ChannelFactory.class */
    public class ChannelFactory {
        private ChannelFactory() {
        }

        Channel newPubChannel(String str, MessageConfirmListener messageConfirmListener) throws IOException, TimeoutException {
            return RabbitMQ.this.innerCreatePublisherChannel(str, messageConfirmListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/rabbit/RabbitMQ$Publisher.class */
    public static class Publisher {
        private static final int MAX_COUNT = 5;
        private LinkedList<Pair<Channel, MessageConfirmListener>> items;
        private int count;
        private ChannelFactory factory;
        private String topic;

        private Publisher(String str, ChannelFactory channelFactory) {
            this.items = new LinkedList<>();
            this.count = 0;
            this.topic = str;
            this.factory = channelFactory;
        }

        private synchronized Pair<Channel, MessageConfirmListener> take() {
            while (this.items.isEmpty() && this.count >= 5) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    throw CommonError.THREAD_INTERRUPTED.wrap(e);
                }
            }
            Pair<Channel, MessageConfirmListener> createChannel = this.items.isEmpty() ? createChannel() : this.items.removeFirst();
            if (!((Channel) createChannel.getKey()).isOpen()) {
                createChannel = createChannel();
            }
            this.count++;
            return createChannel;
        }

        private synchronized void dispose(Pair<Channel, MessageConfirmListener> pair) {
            this.count--;
            this.items.addLast(pair);
            notifyAll();
        }

        private Pair<Channel, MessageConfirmListener> createChannel() {
            try {
                MessageConfirmListener messageConfirmListener = new MessageConfirmListener();
                return new Pair<>(this.factory.newPubChannel(this.topic, messageConfirmListener), messageConfirmListener);
            } catch (Exception e) {
                throw ConnectorError.MQ_PUBLISH_MESSAGE_FAILURE.wrap(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void send(PublishedMessage publishedMessage) {
            Pair<Channel, MessageConfirmListener> take = take();
            try {
                publish(take, publishedMessage);
            } finally {
                dispose(take);
            }
        }

        private void publish(Pair<Channel, MessageConfirmListener> pair, PublishedMessage publishedMessage) {
            Channel channel = (Channel) pair.getKey();
            MessageConfirmListener messageConfirmListener = (MessageConfirmListener) pair.getValue();
            long nextPublishSeqNo = channel.getNextPublishSeqNo();
            messageConfirmListener.monitor(nextPublishSeqNo, publishedMessage);
            try {
                channel.basicPublish(MappingResultImportJob.EMPTY_STR, this.topic, (AMQP.BasicProperties) null, publishedMessage.getData());
                messageConfirmListener.commit(nextPublishSeqNo);
            } catch (Throwable th) {
                messageConfirmListener.rollback(nextPublishSeqNo);
                throw ConnectorError.MQ_PUBLISH_MESSAGE_FAILURE.wrap(th);
            }
        }
    }

    public RabbitMQ(ConnectionFactory connectionFactory, DynamicObject dynamicObject) {
        this.factory = connectionFactory;
        if (SPECIAL_TAG.equals(connectionFactory.getVirtualHost())) {
            this.cn = RabbitmqFactory.getConnection("mq.server");
        } else {
            try {
                this.cn = connectionFactory.newConnection();
            } catch (IOException | TimeoutException e) {
                throw ConnectorError.RABBIT_CONNECT_ERROR.create(e, new String[]{dynamicObject.getString("name"), dynamicObject.getString("number"), StringUtil.getMessage(e)});
            }
        }
        this.cfg = dynamicObject;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public DynamicObject getConfig() {
        return this.cfg;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public long getId() {
        return D.l(this.cfg.getPkValue());
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void attachListener(String str, MessageReceiver messageReceiver) {
        if (this.consumers.containsKey(str)) {
            throw ConnectorError.TOPIC_SUBSCRIBED.create(new String[]{str});
        }
        try {
            this.consumers.put(str, innerCreateConsumerChannel(str, messageReceiver));
        } catch (IOException | TimeoutException e) {
            throw handleError(e);
        }
    }

    private IscBizException handleError(Throwable th) {
        return ConnectorError.RABBIT_CONNECT_ERROR.create(th, new String[]{this.cfg.getString("name"), this.cfg.getString("number"), th.getMessage()});
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void detachListeners() {
        Iterator<Map.Entry<String, Channel>> it = this.consumers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Channel> next = it.next();
            it.remove();
            Channel value = next.getValue();
            try {
                value.close();
            } catch (Throwable th) {
                logger.warn("failed to close channel. channel = " + value, th);
            }
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public void publish(String str, PublishedMessage publishedMessage) {
        getPubChannel(str).send(publishedMessage);
    }

    private Publisher getPubChannel(String str) {
        Publisher publisher = this.publishers.get(str);
        return publisher != null ? publisher : createPubChannel(str);
    }

    private synchronized Publisher createPubChannel(String str) {
        Publisher publisher = this.publishers.get(str);
        if (publisher == null) {
            publisher = new Publisher(str, new ChannelFactory());
            this.publishers.put(str, publisher);
        }
        return publisher;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel innerCreatePublisherChannel(String str, MessageConfirmListener messageConfirmListener) throws IOException, TimeoutException {
        Channel createChannel = preparePubConnection().createChannel();
        initPublisherChannel(createChannel, str, messageConfirmListener);
        return createChannel;
    }

    private Channel innerCreateConsumerChannel(String str, MessageReceiver messageReceiver) throws IOException, TimeoutException {
        Channel createChannel = prepareSubConnection().createChannel();
        initConsumerChannel(createChannel, str, messageReceiver);
        return createChannel;
    }

    private Connection preparePubConnection() throws IOException, TimeoutException {
        if (this.cn == null || !this.cn.isOpen()) {
            this.cn = this.factory.newConnection();
        }
        return this.cn;
    }

    private Connection prepareSubConnection() throws IOException, TimeoutException {
        if (this.cn == null || !this.cn.isOpen()) {
            this.cn = this.factory.newConnection();
        }
        return this.cn;
    }

    private void initPublisherChannel(Channel channel, String str, MessageConfirmListener messageConfirmListener) throws IOException, TimeoutException {
        try {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.queueDeclare(str, true, false, false, (Map) null);
            channel.queueBind(str, EXCHANGE_NAME, "routingKey");
            channel.confirmSelect();
            channel.addConfirmListener(messageConfirmListener);
        } catch (Throwable th) {
            channel.close();
            throw handleError(th);
        }
    }

    private void initConsumerChannel(Channel channel, String str, MessageReceiver messageReceiver) throws IOException, TimeoutException {
        try {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.queueDeclare(str, true, false, false, (Map) null);
            channel.queueBind(str, EXCHANGE_NAME, "routingKey");
            channel.basicConsume(str, false, new MessageConsumer(channel, messageReceiver));
        } catch (Throwable th) {
            channel.close();
            throw handleError(th);
        }
    }
}
