package kd.bos.schedule.message.zk;

import com.alibaba.fastjson.JSON;
import java.util.Iterator;
import java.util.TreeMap;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.dlock.DLock;
import kd.bos.exception.ErrorCode;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageSender;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.TaskResult;
import kd.bos.schedule.message.MessageCreator;
import kd.bos.schedule.zk.ActiveKeyValueStore;
import kd.bos.schedule.zk.ZkConfig;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:kd/bos/schedule/message/zk/ZkMessageSender.class */
public class ZkMessageSender implements MessageSender {
    private static final String BROADCAST_FAIL = "SECHDULE_BROADCAST_FAIL";
    protected ZooKeeper zkClient = null;
    protected ActiveKeyValueStore zkStore = null;
    protected ObjectFactory objectFactory = null;
    private static Log log = LogFactory.getLog("kd.bos.schedule.message.zk.ZkMessageSender");
    private static String SEPARATOR = "/";

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void setObjectFactory(ObjectFactory objectFactory) {
        this.objectFactory = objectFactory;
    }

    public void setZooKeeper() {
        this.zkStore = ActiveKeyValueStore.create();
    }

    public ActiveKeyValueStore getZkStore() {
        return this.zkStore;
    }

    public void send(MessageInfo messageInfo) throws KDException {
        if (log.isInfoEnabled()) {
            log.info("send message:" + messageInfo.toString());
        }
        if (messageInfo.getMessageType() != MessageType.BIZJOB && messageInfo.getMessageType() != MessageType.WorkFlowJOB) {
            if (messageInfo.getMessageType() == MessageType.BIZ_TASK_FEEDBACK || messageInfo.getMessageType() == MessageType.WorkFlow_TASK_FEEDBACK) {
                String taskStatusPath = ZkConfig.getTaskStatusPath(messageInfo.getTaskId());
                udpateStorage(messageInfo);
                this.zkStore.write(taskStatusPath, JSON.toJSONString(messageInfo));
                return;
            }
            return;
        }
        String taskStatusPath2 = ZkConfig.getTaskStatusPath(messageInfo.getTaskId());
        MessageInfo createStatusMessage = MessageCreator.createStatusMessage(messageInfo.getTaskId(), "SCHEDULED", (String) null);
        createStatusMessage.setTarget(messageInfo.getTarget());
        this.zkStore.write(taskStatusPath2, JSON.toJSONString(createStatusMessage));
        this.zkStore.write(ZkConfig.getJobPath(messageInfo), JSON.toJSONString(messageInfo));
    }

    private void udpateStorage(MessageInfo messageInfo) {
        TaskResult fectchTaskResult = messageInfo.fectchTaskResult();
        if (TaskResult.ResultTypeEnum.STATUS == fectchTaskResult.getResultType()) {
            getObjectFactory().getTaskDao().updateStatus(messageInfo.getTaskId(), fectchTaskResult.getStatus());
        } else if (TaskResult.ResultTypeEnum.PROGRESS == fectchTaskResult.getResultType()) {
            getObjectFactory().getTaskDao().updateProgress(messageInfo.getTaskId(), fectchTaskResult.getProgress());
        } else if (TaskResult.ResultTypeEnum.CUSTOMDATA == fectchTaskResult.getResultType()) {
            getObjectFactory().getTaskDao().updateCustomData(messageInfo.getTaskId(), fectchTaskResult.getCustomData());
        }
    }

    public void broadcastMsg(MessageInfo messageInfo) throws KDException {
        String appId = messageInfo.fetchJobInfo().getAppId();
        if (StringUtils.isBlank(appId)) {
            throw new KDException(new ErrorCode(BROADCAST_FAIL, "broadcast not configured appId"), new Object[0]);
        }
        String accountId = RequestContext.get().getAccountId();
        ActiveKeyValueStore create = ActiveKeyValueStore.create();
        create.initAppIdExeServerMap(appId);
        broadCast(messageInfo, create.getExeServerMap(), ZkConfig.getJobRootPath() + SEPARATOR + accountId + SEPARATOR + appId);
    }

    private void broadCast(MessageInfo messageInfo, TreeMap<String, Integer> treeMap, String str) {
        DLock create = DLock.create(str);
        Throwable th = null;
        try {
            if (create.tryLock(0L)) {
                String jsonString = SerializationUtils.toJsonString(messageInfo);
                log.info("后台事务广播消息- " + str + "：" + jsonString);
                Iterator<String> it = treeMap.keySet().iterator();
                while (it.hasNext()) {
                    this.zkStore.publishBroadcastMsg(str + SEPARATOR + it.next(), jsonString);
                }
            } else {
                waitTime(1000L);
                broadCast(messageInfo, treeMap, str);
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private void waitTime(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
    }
}
