package kd.mmc.mrp.framework.mq;

import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kd.bos.cache.ThreadCache;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.exception.KDBizException;
import kd.bos.instance.Instance;
import kd.bos.mq.MQFactory;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.MessagePublisher;
import kd.bos.mq.config.ConsumerDef;
import kd.bos.mq.config.QueueDef;
import kd.bos.mq.support.QueueManager;
import kd.bos.mq.support.dynamic.DynamicQueueManagerFactory;
import kd.mmc.mrp.exception.MRPManuStopException;
import kd.mmc.mrp.framework.CalEnv;
import kd.mmc.mrp.framework.IMRPEnvProvider;
import kd.mmc.mrp.framework.IMRPExecuteLogRecorder;
import kd.mmc.mrp.framework.consts.MRPRuntimeConsts;
import kd.mmc.mrp.framework.mq.event.MRPEvent;
import kd.mmc.mrp.framework.runner.MRPCalcManager;
import kd.mmc.mrp.integrate.KDCloudLogRecorder;
import kd.mmc.mrp.model.MetaConsts;
import kd.mmc.mrp.model.enums.EnvCfgItem;
import kd.mmc.mrp.model.enums.MultiThreadCacheKey;
import org.apache.log4j.Logger;

/* loaded from: input_file:kd/mmc/mrp/framework/mq/MRPMQManager.class */
public class MRPMQManager implements IMRPEventManager {
    private static Logger logger = Logger.getLogger(MRPMQManager.class);
    private CalEnv ctx;
    private volatile StackTraceElement[] errStack;
    private volatile String errMsg;
    private MessagePublisher publisher;
    private Map<String, MessagePublisher> controlPublisher;
    private final Set<Thread> waiters = new HashSet(1);
    private volatile boolean failed = false;
    private Map<Long, CountDownLatch> thread2Latch = new ConcurrentHashMap();
    private Map<Long, OnResponse> resps = new ConcurrentHashMap();
    private Map<String, KDCloudLogRecorder.Wrapper> wrappers = new ConcurrentHashMap();
    private Map<String, Long> evt2Thread = new ConcurrentHashMap();
    private final Map<Long, Map<String, MRPEvent>> calEvents = new ConcurrentHashMap();
    private String id = UUID.randomUUID().toString();

    public MRPMQManager(CalEnv calEnv, OnResponse onResponse) {
        this.ctx = calEnv;
        calEnv.addService(IMRPEventManager.class, this);
        this.publisher = MQFactory.get().createSimplePublisher(MRPRuntimeConsts.MRP_MQ_REGION_NAME, MRPRuntimeConsts.MRP_MQ_QUEUE_NAME);
        this.controlPublisher = new ConcurrentHashMap(2);
    }

    private MessagePublisher getMessagePublisher() {
        return this.publisher;
    }

    private String getManuStop() {
        return ResManager.loadKDString("手工关闭了。", "MRPMQManager_1", "mmc-mrp-mservice", new Object[0]);
    }

    private MessagePublisher getControlPublisher(MRPEvent mRPEvent) {
        String controllerQueueName = getControllerQueueName(mRPEvent);
        return this.controlPublisher.computeIfAbsent(controllerQueueName, str -> {
            return MQFactory.get().createSimplePublisher(MRPRuntimeConsts.MRP_MQ_REGION_NAME, controllerQueueName);
        });
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void registControlQueue(IMRPEnvProvider iMRPEnvProvider, Class<? extends MessageConsumer> cls) {
        if (Instance.isLightWeightDeploy()) {
            return;
        }
        String mRPContextId = iMRPEnvProvider.getMRPContextId();
        ArrayList arrayList = new ArrayList();
        ConsumerDef consumerDef = new ConsumerDef();
        consumerDef.setClassName(cls.getName());
        arrayList.add(consumerDef);
        QueueDef queueDef = new QueueDef();
        queueDef.setAppid(MRPRuntimeConsts.MRP_MQ_APP_ID);
        queueDef.setName("kd.mmc.mrp.calcnode.mrp_queue." + MRPRuntimeConsts.NODE_ID + "." + mRPContextId);
        queueDef.setConsumers(arrayList);
        DynamicQueueManagerFactory.get(MRPRuntimeConsts.MRP_MQ_REGION_NAME).add(queueDef, true);
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void destroyMRPControllerMQ(String str) {
        if (Instance.isLightWeightDeploy()) {
            return;
        }
        logger.warn("mrprunner-clear-mq-queue-data, mrpctx: " + str);
        String str2 = "kd.mmc.mrp.calcnode.mrp_queue." + MRPRuntimeConsts.NODE_ID + "." + str;
        QueueManager.getRealQueueName(MRPRuntimeConsts.MRP_MQ_REGION_NAME, str2);
        logger.warn(String.format("mrprunner-clear-mq-queue-data, mrpctx: %s, deleteQueue isOk: %s", str, Boolean.valueOf(DynamicQueueManagerFactory.get(MRPRuntimeConsts.MRP_MQ_REGION_NAME).deleteQueue(str2))));
        destroyPublisher(str);
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void destroyPublisher(String str) {
        if (this.publisher != null) {
            this.publisher.close();
            this.publisher = null;
        }
        if (this.controlPublisher != null) {
            Iterator<MessagePublisher> it = this.controlPublisher.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.controlPublisher.clear();
        }
    }

    private String getControllerQueueName(MRPEvent mRPEvent) {
        return "kd.mmc.mrp.calcnode.mrp_queue." + mRPEvent.getNodeId() + "." + mRPEvent.getMrpContextId();
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void wait4Response() {
        CountDownLatch countDownLatch;
        Long valueOf = Long.valueOf(Thread.currentThread().getId());
        Map<String, MRPEvent> map = this.calEvents.get(valueOf);
        if (map == null) {
            return;
        }
        if (isFailed()) {
            throwError(new RuntimeException(this.errMsg));
        }
        synchronized (map) {
            if (map.size() > 0) {
                MRPEvent next = map.values().iterator().next();
                logger.info(String.format("PUB:MRPMQManager:nodeId:%s;wait:%d;eventName:%s;context:%s", next.getNodeId(), Integer.valueOf(map.size()), next.getClass().getSimpleName(), next.getMrpContextId()));
            }
            countDownLatch = new CountDownLatch(map.size());
            this.thread2Latch.put(valueOf, countDownLatch);
        }
        try {
            try {
                Long l = (Long) this.ctx.getCfgValue(EnvCfgItem.MQ_TIMEOUT);
                this.waiters.add(Thread.currentThread());
                if (!countDownLatch.await(l.longValue(), TimeUnit.SECONDS)) {
                    logger.warn(String.format("MRPMQManager-wait-timeout-count:%s,context:%s", Long.valueOf(countDownLatch.getCount()), this.ctx.getMRPContextId()));
                }
                destroyEvent(valueOf);
                this.resps.remove(valueOf);
            } catch (InterruptedException e) {
                throwError(e);
                destroyEvent(valueOf);
                this.resps.remove(valueOf);
            }
        } catch (Throwable th) {
            destroyEvent(valueOf);
            this.resps.remove(valueOf);
            throw th;
        }
    }

    private void destroyEvent(Long l) {
        this.calEvents.remove(l);
        this.thread2Latch.remove(l);
        this.failed = false;
        this.errMsg = null;
        this.errStack = null;
        if (this.calEvents.isEmpty()) {
            MRPCalcManager.destroyEventManager(this.id);
            this.evt2Thread.clear();
        }
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void publishCalcEvent(MRPEvent mRPEvent) {
        publishCalcEvent(mRPEvent, true);
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void publishCalcEvent(MRPEvent mRPEvent, boolean z) {
        mRPEvent.setEventManagerId(this.id);
        mRPEvent.setPublishTime(System.currentTimeMillis());
        MessagePublisher messagePublisher = getMessagePublisher();
        boolean booleanValue = ((Boolean) this.ctx.getCfgValue(EnvCfgItem.USE_DISTRIBUTION_MODE)).booleanValue();
        if (z) {
            Long valueOf = Long.valueOf(Thread.currentThread().getId());
            synchronized (this.calEvents) {
                this.calEvents.computeIfAbsent(valueOf, l -> {
                    return new ConcurrentHashMap();
                }).put(mRPEvent.getEventId(), mRPEvent);
                this.evt2Thread.put(mRPEvent.getEventId(), valueOf);
                this.wrappers.put(mRPEvent.getEventId(), IMRPExecuteLogRecorder.LOCAL.get());
            }
            MRPCalcManager.registEventManager(this.ctx.getMRPContextId(), this);
        }
        if (booleanValue) {
            if (messagePublisher != null) {
                messagePublisher.publish(mRPEvent);
            }
        } else {
            try {
                this.waiters.add(Thread.currentThread());
                Class.forName("kd.mmc.mrp.calcnode.framework.thread.MRPCalcWorker").getMethod("resolveAlone", MRPEvent.class).invoke(null, mRPEvent);
            } catch (Exception e) {
                throwError(e);
            }
        }
    }

    private void throwError(Exception exc) {
        ThreadCache.put(MetaConsts.MRPCalcDetailFields.Entry_RequireBillEXCEPTION, exc);
        if (this.errMsg != null || isFailed()) {
            if (StringUtils.equals(this.errMsg, getManuStop())) {
                throw new MRPManuStopException();
            }
            KDBizException kDBizException = new KDBizException(this.errMsg);
            if (this.errStack != null) {
                kDBizException.setStackTrace(this.errStack);
            }
            throw kDBizException;
        }
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void publishControlEvent(MRPEvent mRPEvent) {
        String controllerQueueName = getControllerQueueName(mRPEvent);
        if (QueueManager.getQueueDefWithRealQueueName(MRPRuntimeConsts.MRP_MQ_REGION_NAME, QueueManager.getRealQueueName(MRPRuntimeConsts.MRP_MQ_REGION_NAME, controllerQueueName)) == null) {
            QueueDef queueDef = new QueueDef();
            queueDef.setAppid(MRPRuntimeConsts.MRP_MQ_APP_ID);
            queueDef.setName(controllerQueueName);
            QueueManager.add(MRPRuntimeConsts.MRP_MQ_REGION_NAME, queueDef, false);
        }
        getControlPublisher(mRPEvent).publish(mRPEvent);
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void onMessage(MRPEvent mRPEvent) {
        Map<String, MRPEvent> map;
        Long l = this.evt2Thread.get(mRPEvent.getEventId());
        synchronized (this.calEvents) {
            map = this.calEvents.get(l);
        }
        synchronized (map) {
            MRPEvent remove = map.remove(mRPEvent.getEventId());
            if (remove != null) {
                logger.info(String.format("REC:MRPMQManager:nodeId:%s;eventId:%s;failed:%s;eventSize:%d;eventName:%s;context:%s", remove.getNodeId(), remove.getEventId(), String.valueOf(this.failed), Integer.valueOf(map.size()), remove.getClass().getSimpleName(), remove.getMrpContextId()));
                if (!this.failed) {
                    this.failed = MetaConsts.FALSE_STR.equalsIgnoreCase(String.valueOf(mRPEvent.getParam(MultiThreadCacheKey.KEY_SERVICE_OK)));
                    if (this.failed) {
                        this.errMsg = (String) mRPEvent.getParam(MultiThreadCacheKey.KEY_SERVICE_EXCEPTION_DETAIL);
                        String str = (String) mRPEvent.getParam(MultiThreadCacheKey.KEY_SERVICE_EXCEPTION);
                        if (str != null) {
                            this.errStack = (StackTraceElement[]) JSON.parseArray(str, StackTraceElement.class).toArray(new StackTraceElement[0]);
                        }
                        interrupt();
                    }
                    mRPEvent.setFailed(this.failed);
                    mRPEvent.setRemainEvtCnt(map.size());
                    mRPEvent.setResponseResolver(remove.getResponseResolver());
                    mRPEvent.setSubStepIdx(remove.getSubStepIdx());
                    OnResponse onResponse = getOnResponse(l);
                    onResponse.setLogWrapper(this.wrappers.remove(mRPEvent.getEventId()));
                    onResponse.response(mRPEvent);
                    CountDownLatch countDownLatch = this.thread2Latch.get(l);
                    if (countDownLatch != null && !this.failed) {
                        countDownLatch.countDown();
                    }
                }
            }
        }
    }

    private void interrupt() {
        Iterator<Thread> it = this.waiters.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public int getRemainEvtCount(String str) {
        Map<String, MRPEvent> map;
        int size;
        Long l = this.evt2Thread.get(str);
        synchronized (this.calEvents) {
            map = this.calEvents.get(l);
        }
        synchronized (map) {
            size = map.size();
        }
        return size;
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void markFailed(StackTraceElement[] stackTraceElementArr, Object obj) {
        logger.warn("mrprunner-failed-mark", new Throwable("mrprunner-failed-mark"));
        this.failed = true;
        if (stackTraceElementArr != null && stackTraceElementArr.length > 0) {
            this.errStack = stackTraceElementArr;
        }
        if (obj != null) {
            this.errMsg = obj.toString();
        }
        interrupt();
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void markManuTerminated() {
        this.errMsg = getManuStop();
        this.ctx.setShutdown();
        interrupt();
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public String getId() {
        return this.id;
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public void setOnResponse(OnResponse onResponse) {
        this.resps.put(Long.valueOf(Thread.currentThread().getId()), onResponse);
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public OnResponse getOnResponse() {
        OnResponse onResponse;
        Long valueOf = Long.valueOf(Thread.currentThread().getId());
        synchronized (this) {
            onResponse = getOnResponse(valueOf);
        }
        return onResponse;
    }

    private OnResponse getOnResponse(Long l) {
        OnResponse onResponse = this.resps.get(l);
        if (onResponse == null) {
            onResponse = new DefaultOnResponse((IMRPEnvProvider) this.ctx, this.ctx.createLogRecorder());
            this.resps.put(l, onResponse);
        }
        return onResponse;
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public StackTraceElement[] getErrStack() {
        return this.errStack;
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public String getErrMsg() {
        return this.errMsg;
    }

    @Override // kd.mmc.mrp.framework.mq.IMRPEventManager
    public boolean isFailed() {
        return this.failed;
    }
}
