package kd.isc.iscb.platform.core.dc.mq.mqs;

import com.huawei.it.eip.ump.client.consumer.Consumer;
import com.huawei.it.eip.ump.client.producer.Producer;
import com.huawei.it.eip.ump.client.producer.SendResult;
import com.huawei.it.eip.ump.common.exception.UmpException;
import com.huawei.it.eip.ump.common.message.Message;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.isc.iscb.platform.core.connector.ConnectorError;
import kd.isc.iscb.platform.core.connector.k3cloud.K3CloudConstant;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueServer;
import kd.isc.iscb.platform.core.dc.mq.MessageReceiver;
import kd.isc.iscb.platform.core.dc.mq.PublishedMessage;
import kd.isc.iscb.platform.core.dc.mq.rocketmq.RocketMqServer;
import kd.isc.iscb.platform.core.vc.MappingResultImportJob;
import kd.isc.iscb.util.data.ReadLockFreeMap;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.misc.StringUtil;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/mqs/MqsServer.class */
public class MqsServer implements MessageQueueServer {
    private static final Log LOG = LogFactory.getLog(MqsServer.class);
    private final DynamicObject cfg;
    private final Map<String, Producer> producers = new ReadLockFreeMap();
    private final Map<String, Consumer> consumers = new HashMap();

    public MqsServer(DynamicObject dynamicObject) {
        this.cfg = dynamicObject;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public DynamicObject getConfig() {
        return this.cfg;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public long getId() {
        return D.l(this.cfg.getPkValue());
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void attachListener(String str, MessageReceiver messageReceiver) {
        if (this.consumers.containsKey(str)) {
            throw new UnsupportedOperationException("MQS主题（" + str + "）已订阅！");
        }
        try {
            this.consumers.put(str, createConsumer(str, messageReceiver));
        } catch (UmpException e) {
            throw ConnectorError.MQS_CONNECT_ERROR.create(e, new String[]{this.cfg.getString("name"), this.cfg.getString("number"), StringUtil.getMessage(e)});
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void detachListeners() {
        Iterator<Map.Entry<String, Consumer>> it = this.consumers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Consumer> next = it.next();
            it.remove();
            Consumer value = next.getValue();
            if (value != null) {
                try {
                    value.shutdown();
                } catch (Exception e) {
                    LOG.warn("failed to close consumer. consumer = " + value, e);
                }
            }
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public void publish(String str, PublishedMessage publishedMessage) {
        RocketMqServer.TopicTagData invoke = new RocketMqServer.TopicTagData(str, null).invoke();
        String topic = invoke.getTopic();
        String tag = invoke.getTag();
        Producer producer = getProducer(topic);
        Message message = new Message();
        message.setBusinessId(D.s(Long.valueOf(publishedMessage.getId())));
        message.setTags(tag);
        message.setBody(publishedMessage.getData());
        try {
            SendResult send = producer.send(message);
            if (!send.isSuccess()) {
                throw new IscBizException(send.toString());
            }
            try {
                publishedMessage.setSuccess();
            } catch (Exception e) {
                LOG.warn("onCompletion setSuccess failed. id=" + publishedMessage.getId(), e);
            }
        } catch (Exception e2) {
            try {
                publishedMessage.setFailed(e2);
            } catch (Exception e3) {
                LOG.warn("onCompletion setFailed failed. id=" + publishedMessage.getId(), e3);
            }
            throw ConnectorError.MQ_PUBLISH_MESSAGE_FAILURE.wrap(e2);
        }
    }

    private Producer getProducer(String str) {
        Producer producer = this.producers.get(str);
        return producer != null ? producer : createPublisher(str);
    }

    private synchronized Producer createPublisher(String str) {
        Producer producer = this.producers.get(str);
        if (producer == null) {
            producer = new Producer();
            producer.setUmpNamesrvUrls(D.s(this.cfg.get("bootstrap_servers")).replace(" ", MappingResultImportJob.EMPTY_STR).replace(',', ';'));
            producer.setAppId(D.s(this.cfg.get(K3CloudConstant.USER)));
            producer.setAppSecret(D.s(this.cfg.get(K3CloudConstant.PCODE)));
            producer.setInstanceName(str + System.currentTimeMillis());
            producer.setTopic(str);
            producer.setEncryptTransport(this.cfg.getBoolean("encrypt_transport"));
            try {
                producer.start();
                this.producers.put(str, producer);
            } catch (UmpException e) {
                try {
                    producer.shutdown();
                } catch (UmpException e2) {
                    LOG.warn("failed to close Producer. Producer = " + producer, e);
                }
                throw ConnectorError.MQS_CONNECT_ERROR.create(e, new String[]{this.cfg.getString("name"), this.cfg.getString("number"), e.getMessage()});
            }
        }
        return producer;
    }

    private Consumer createConsumer(String str, MessageReceiver messageReceiver) throws UmpException {
        RocketMqServer.TopicTagData invoke = new RocketMqServer.TopicTagData(str, "*").invoke();
        String topic = invoke.getTopic();
        String tag = invoke.getTag();
        Consumer consumer = new Consumer();
        consumer.setUmpNamesrvUrls(D.s(this.cfg.get("bootstrap_servers")).replace(" ", MappingResultImportJob.EMPTY_STR).replace(',', ';'));
        consumer.setAppId(D.s(this.cfg.get(K3CloudConstant.USER)));
        consumer.setAppSecret(D.s(this.cfg.get(K3CloudConstant.PCODE)));
        consumer.setInstanceName(topic + System.currentTimeMillis());
        consumer.setTopic(topic);
        consumer.setTags(tag);
        consumer.setEncryptTransport(this.cfg.getBoolean("encrypt_transport"));
        consumer.setSubGroup(str.replace('@', '-'));
        consumer.subscribe(new MqsMessageListener(messageReceiver));
        consumer.start();
        return consumer;
    }
}
