package kd.bos.schedule.message.zk;

import com.alibaba.fastjson.JSON;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.api.MessageHandler;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.Subscriber;
import kd.bos.schedule.zk.ActiveKeyValueStore;
import kd.bos.schedule.zk.WatchDataForever;
import kd.bos.schedule.zk.ZkConfig;
import org.apache.zookeeper.CreateMode;

/* loaded from: input_file:kd/bos/schedule/message/zk/ZkSubscriber.class */
public class ZkSubscriber implements Subscriber {
    private static Log log = LogFactory.getLog("kd.bos.schedule.message.zk.ZkSubscriber");
    private ObjectFactory objectFactory = null;
    protected Map<String, WatchDataForever> watcherMap = new HashMap();
    protected ActiveKeyValueStore zkStore = null;

    public void setZooKeeper() {
        this.zkStore = ActiveKeyValueStore.create();
    }

    public void subscribe(String str, MessageHandler messageHandler) throws KDException {
        messageHandler.setObjectFactory(getObjectFactory());
        String taskStatusPath = ZkConfig.getTaskStatusPath(str);
        this.zkStore.ensureExisted(taskStatusPath, "{}");
        WatchDataForever watchDataForever = new WatchDataForever(taskStatusPath, messageHandler);
        this.watcherMap.put(str, watchDataForever);
        this.zkStore.write(taskStatusPath + "/watcher", String.valueOf(System.currentTimeMillis()), CreateMode.EPHEMERAL_SEQUENTIAL);
        watchDataForever.start();
    }

    public void unSubscribe(String str) throws KDException {
        if (this.watcherMap.containsKey(str)) {
            this.watcherMap.get(str).stop();
            String taskStatusPath = ZkConfig.getTaskStatusPath(str);
            List<String> children = this.zkStore.getChildren(taskStatusPath);
            if (!children.isEmpty()) {
                this.zkStore.delete(taskStatusPath + "/" + children.get(0));
            }
            List<String> children2 = this.zkStore.getChildren(taskStatusPath);
            if (children2 == null || children2.isEmpty()) {
                String read = this.zkStore.read(taskStatusPath, null);
                this.zkStore.delete(taskStatusPath);
                log.info("成功删除任务状态节点" + taskStatusPath);
                String jobPath = ZkConfig.getJobPath((MessageInfo) JSON.parseObject(read, MessageInfo.class));
                this.zkStore.delete(jobPath);
                log.info("成功删除Job节点" + jobPath);
            }
            this.watcherMap.remove(str);
        }
    }

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void setObjectFactory(ObjectFactory objectFactory) {
        this.objectFactory = objectFactory;
    }
}
