package kd.bos.schedule.server;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import kd.bos.context.OperationContext;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.dc.api.model.Account;
import kd.bos.encrypt.Encrypters;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.instance.AppGroup;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.support.ConfigForInitConsumer;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.ScheduleDeployMode;
import kd.bos.schedule.api.ScheduleManager;
import kd.bos.schedule.message.AbstractService;
import kd.bos.schedule.message.mq.MQHelper;
import kd.bos.schedule.message.mq.MQObjectFactory;
import kd.bos.schedule.server.leader.LeaderElectionSupport;
import kd.bos.schedule.server.messagecustomer.AsynJobMessageDispatcher;
import kd.bos.schedule.server.messagecustomer.TaskStatusMessageCustomer;
import kd.bos.schedule.server.schedulecreator.SchVisitorStatus;
import kd.bos.schedule.utils.RequestContextUtils;
import kd.bos.schedule.utils.ScheduleAccountUtils;
import kd.bos.schedule.utils.ScheduleGrayGroup;
import kd.bos.schedule.zk.ZkConfig;
import kd.bos.tenant.listener.TenantListener;
import kd.bos.tenant.listener.TenantListenerInfo;
import kd.bos.tenant.listener.TenantListenerManager;
import kd.bos.thread.ThreadLifeCycleManager;
import kd.bos.threads.ThreadPools;
import kd.bos.util.ConfigurationChangeListener;
import kd.bos.util.ConfigurationUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

/* loaded from: input_file:kd/bos/schedule/server/ScheduleService.class */
public class ScheduleService extends AbstractService {
    private AtomicBoolean isLeaderFlag = new AtomicBoolean(false);
    private AtomicBoolean isElectStarted = new AtomicBoolean(false);
    ExecutorResourceManager executorResourceMrg = null;
    Trigger trigger = null;
    LeaderSelector selector = null;
    CuratorFramework client = null;
    private static Log log = LogFactory.getLog("kd.bos.schedule.executor.ScheduleService");
    private static int recoveryDelayTime_ms = Integer.getInteger("Schedule.trigger.recover.delayms", 60000).intValue();
    private static volatile ScheduleService instance = null;
    private static volatile Status curStatus = Status.NotInitialed;

    /* loaded from: input_file:kd/bos/schedule/server/ScheduleService$Status.class */
    public enum Status {
        NotInitialed,
        Running,
        READY,
        InService
    }

    public ScheduleService() {
        if (instance == null) {
            synchronized (this) {
                if (instance == null) {
                    instance = this;
                    init();
                }
            }
        }
    }

    private void init() {
        ThreadPools.executeOnce("ScheduleService init", new Runnable() { // from class: kd.bos.schedule.server.ScheduleService.1
            @Override // java.lang.Runnable
            public void run() {
                for (Account account : ScheduleAccountUtils.getAllAccountsOfCurrentEnv(false)) {
                    try {
                        RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
                        ScheduleService.log.info("ScheduleService init");
                        if (!ScheduleService.isExistField("T_SCH_TASK", "FINSTANCEID")) {
                            DB.execute(DBRoute.base, "IF NOT EXISTS (SELECT 1 FROM KSQL_USERCOLUMNS WHERE KSQL_COL_TABNAME = 't_sch_task' AND KSQL_COL_NAME ='finstanceid') \nALTER TABLE t_sch_task ADD finstanceid VARCHAR(75)");
                        }
                        if (!ScheduleService.isExistField("T_SCH_TASK", "FTRACEID")) {
                            DB.execute(DBRoute.base, "IF NOT EXISTS (SELECT 1 FROM KSQL_USERCOLUMNS WHERE KSQL_COL_TABNAME = 't_sch_task' AND KSQL_COL_NAME ='ftraceid') \nALTER TABLE t_sch_task ADD ftraceid VARCHAR(75)");
                        }
                    } catch (Exception e) {
                        ScheduleService.log.error("SecheduleService init error", e);
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isExistField(String str, String str2) {
        List columnNames = DB.getColumnNames(DBRoute.basedata, str);
        if (columnNames == null || columnNames.isEmpty()) {
            return false;
        }
        Iterator it = columnNames.iterator();
        while (it.hasNext()) {
            if (((String) it.next()).equalsIgnoreCase(str2)) {
                return true;
            }
        }
        return false;
    }

    public static ScheduleService getInstance() {
        if (instance == null) {
            log.error("调度服务未启动，该服务器节点bos.lifecycle.servicelist未注册ScheduleService，或lifecyclemanager启动失败。");
            return null;
        }
        if (Status.NotInitialed == curStatus) {
            RequestContext requestContext = RequestContext.get();
            instance.start();
            RequestContext.set(requestContext);
        }
        return instance;
    }

    public boolean isLeader() {
        return this.isLeaderFlag.get();
    }

    public String getName() {
        return "Schedule Service";
    }

    public ScheduleManager getScheduleManager() {
        if (this.trigger == null) {
            return null;
        }
        return this.trigger.getScheduleManager();
    }

    public Status getCurStatus() {
        return curStatus;
    }

    public void start() {
        synchronized (this) {
            if (Status.NotInitialed != curStatus) {
                return;
            }
            curStatus = Status.Running;
            this.isStart = true;
            super.start();
            OperationContext operationContext = new OperationContext();
            operationContext.setAppId("bos");
            OperationContext.set(operationContext);
            if (enableMasterElection()) {
                beginLeaderElection();
            }
            startDaemonThread();
        }
    }

    private boolean enableMasterElection() {
        if (ZkConfig.getDeployMode() == ScheduleDeployMode.DISABLE || ZkConfig.getDeployMode() == ScheduleDeployMode.EXECUTE_NODE) {
            log.info("该节点不启动scheduleservice服务,节点模式为:" + ZkConfig.getDeployMode());
            return false;
        }
        if (Boolean.getBoolean("Schedule.disableToWork")) {
            log.info("ScheduleService初始化完成，该节点不进行调度选举");
            return false;
        }
        String[] appIds = Instance.getAppIds();
        if (!Boolean.valueOf(Boolean.parseBoolean(System.getProperty("Schedule.executor.qingnode.defaultstart", "false"))).booleanValue() && appIds != null && appIds.length == 1 && "qing".equalsIgnoreCase(appIds[0])) {
            log.info("轻分析节点不进行调度选举");
            return false;
        }
        if (!ConfigForInitConsumer.isConsumerEnable()) {
            log.info("未开启mq服务,该节点不进行调度选举");
            return false;
        }
        if (!StringUtils.isNotBlank(AppGroup.getCurAppGroup()) || !Instance.isAppSplit() || appIds == null || appIds.length <= 0) {
            return true;
        }
        for (String str : appIds) {
            if (AppGroup.isCurrentGrayNode(str)) {
                log.info("当前为灰度节点，不启用schedule-masterservice");
                return false;
            }
        }
        return true;
    }

    @Deprecated
    public LeaderElectionSupport getLeaderElectionSupport() {
        return null;
    }

    public Trigger getTrigger() {
        return this.trigger;
    }

    public ExecutorResourceManager getExecutorResourceManager() {
        return this.executorResourceMrg;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean pauseTriggerService() {
        return Boolean.getBoolean("Schedule.trigger.paused");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void keepAlive() {
        if (this.trigger == null || this.trigger.getStatus() == SchVisitorStatus.Paused) {
            return;
        }
        this.trigger.reSetElectedAccount(getElectedAccount());
    }

    private void addAccountChangedListener() {
        TenantListenerManager.addTenantListener(new TenantListener() { // from class: kd.bos.schedule.server.ScheduleService.2
            public void onTenantAccountsAdded(TenantListenerInfo tenantListenerInfo) {
                try {
                    if ("ZK".equalsIgnoreCase(System.getProperty("mc.type")) && ScheduleService.this.isLeaderFlag.get() && ScheduleService.this.trigger.isRunning()) {
                        ThreadLifeCycleManager.start();
                        for (Account account : tenantListenerInfo.getAccountList()) {
                            ScheduleService.log.info("增加租户：" + account.getAccountId());
                            if (ScheduleService.this.tryLock(account)) {
                                try {
                                    Thread.sleep(ScheduleService.recoveryDelayTime_ms);
                                } catch (InterruptedException e) {
                                }
                                RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
                                if (DBElectionHelper.tryLock()) {
                                    ScheduleService.this.trigger.addAccount(account);
                                }
                            }
                        }
                    }
                } catch (Exception e2) {
                    ScheduleService.log.error("后台事务新增租户异常：" + e2.getMessage());
                }
            }

            public void onTenantAccountsRemoved(TenantListenerInfo tenantListenerInfo) {
                if (ScheduleService.this.isLeaderFlag.get() && ScheduleService.this.trigger.isRunning()) {
                    try {
                        ThreadLifeCycleManager.start();
                        for (Account account : tenantListenerInfo.getAccountList()) {
                            ScheduleService.log.info("移除租户：" + account.getAccountId());
                            RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
                            ScheduleService.this.trigger.deleteAccount(account);
                            DBElectionHelper.releaseLock();
                        }
                    } catch (Exception e) {
                        ScheduleService.log.error("后台事务移除租户异常：" + e.getMessage());
                    }
                }
            }
        });
    }

    private void beginLeaderElection() {
        LeaderSelectorListenerAdapter leaderSelectorListenerAdapter = new LeaderSelectorListenerAdapter() { // from class: kd.bos.schedule.server.ScheduleService.3
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                if (curatorFramework.getConnectionStateErrorPolicy().isErrorState(connectionState)) {
                    if (ScheduleService.this.isLeaderFlag.get()) {
                        ThreadLifeCycleManager.start();
                        ScheduleService.this.stopLeader();
                    }
                    throw new CancelLeadershipException();
                }
            }

            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                ThreadLifeCycleManager.start();
                OperationContext operationContext = new OperationContext();
                operationContext.setAppId("bos");
                OperationContext.set(operationContext);
                try {
                    ScheduleService.this.beginService();
                    ScheduleService.this.zkStore.write(ZkConfig.getMasterServerPath(), ZkConfig.getHostIpAddress());
                    while (ScheduleService.this.isLeaderFlag.get()) {
                        Thread.sleep(ZkConfig.getSelfCheck() * 1000);
                        if (ScheduleService.this.pauseTriggerService() && ScheduleService.this.trigger != null) {
                            ScheduleService.this.trigger.setStatus(SchVisitorStatus.Paused);
                        } else if (ScheduleService.this.trigger != null) {
                            ScheduleService.this.trigger.setStatus(SchVisitorStatus.Running);
                        }
                        ScheduleService.this.keepAlive();
                        if (!ScheduleService.this.trigger.isRunning()) {
                            ScheduleService.this.stopLeader();
                        }
                        if (!ZkConfig.getHostIpAddress().equals(ScheduleService.this.zkStore.read(ZkConfig.getMasterServerPath(), (Watcher) null)) || Instance.isPausedServiceByMonitor()) {
                            ScheduleService.this.stopLeader();
                        }
                    }
                    ScheduleService.this.stopLeader();
                } catch (Exception e) {
                    ScheduleService.log.error("调度服务启动异常:" + e.getMessage());
                    Thread.sleep(ZkConfig.getSelfCheck() * 500);
                }
            }
        };
        if (Boolean.getBoolean("Schedule.zk.client.alone")) {
            this.client = createNewZkClient(System.getProperty("Schedule.zk.server"));
        } else {
            this.client = this.zkStore.getCuratorFramework();
        }
        if (this.selector != null) {
            try {
                this.selector.close();
            } catch (Exception e) {
                log.error("close schedule selector error", e);
            }
        }
        this.selector = new LeaderSelector(this.client, ZkConfig.getMasterServerPath(), leaderSelectorListenerAdapter);
        this.selector.autoRequeue();
        this.selector.start();
        this.isElectStarted.set(true);
    }

    private void startDaemonThread() {
        Thread thread = new Thread(ThreadLifeCycleManager.wrapRunnable(() -> {
            while (this.isStart) {
                LockSupport.parkNanos(ZkConfig.getSelfCheck() * 1000000000);
                if (!this.isElectStarted.get() && enableMasterElection()) {
                    beginLeaderElection();
                }
            }
        }));
        thread.setDaemon(true);
        thread.setName("KD_ScheduleServiceDaemo");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            log.error("scheduleservice.daemon thread error ", th);
        });
        thread.start();
    }

    private CuratorFramework createNewZkClient(String str) {
        String str2 = str;
        String str3 = null;
        String str4 = null;
        String str5 = "digest";
        if (str.indexOf("?") > 0) {
            String[] split = str.split("\\?");
            str2 = split[0];
            int indexOf = str2.indexOf("/");
            if (indexOf > 0) {
                str2 = str2.substring(0, indexOf);
            }
            String str6 = split[1];
            HashMap hashMap = new HashMap(10);
            for (String str7 : str6.contains("&") ? str6.split("&") : new String[]{str6}) {
                if (str7.contains("=")) {
                    String[] split2 = str7.split("=");
                    if (split2.length == 1) {
                        hashMap.put(split2[0], "");
                    } else {
                        hashMap.put(split2[0], split2[1]);
                    }
                }
            }
            str4 = (String) hashMap.get("password");
            if (str4 != null) {
                str4 = Encrypters.decode(str4);
            }
            str3 = (String) hashMap.get("user");
            if (hashMap.get("scheme") != null) {
                str5 = (String) hashMap.get("scheme");
            }
        }
        try {
            CuratorFrameworkFactory.Builder retryPolicy = CuratorFrameworkFactory.builder().connectString(str2).sessionTimeoutMs(Integer.getInteger("curator-default-session-timeout", 60000).intValue()).connectionTimeoutMs(Integer.getInteger("curator-default-connection-timeout", 15000).intValue()).retryPolicy(new RetryForever(Integer.getInteger("zookeeper.client.retry.intervalMs", 1000).intValue()));
            if (str3 != null && str4 != null) {
                final String str8 = str3 + ":" + str4;
                ACLProvider aCLProvider = new ACLProvider() { // from class: kd.bos.schedule.server.ScheduleService.4
                    private List<ACL> acl;

                    public List<ACL> getDefaultAcl() {
                        if (this.acl == null) {
                            ArrayList arrayList = ZooDefs.Ids.CREATOR_ALL_ACL;
                            arrayList.clear();
                            arrayList.add(new ACL(31, new Id("auth", str8)));
                            this.acl = arrayList;
                        }
                        return this.acl;
                    }

                    public List<ACL> getAclForPath(String str9) {
                        return this.acl;
                    }
                };
                retryPolicy.authorization(str5, str8.getBytes());
                retryPolicy.aclProvider(aCLProvider);
            }
            CuratorFramework build = retryPolicy.build();
            build.start();
            return build;
        } catch (Exception e) {
            log.error(e);
            throw new KDException(e, BosErrorCode.configZookeepConfig, new Object[]{e.getMessage()});
        }
    }

    private void clearDirtyData() {
        String runServerName = AbstractService.getRunServerName();
        Iterator it = this.zkStore.getChildren(ZkConfig.getMasterServerPath()).iterator();
        while (it.hasNext()) {
            String str = ZkConfig.getMasterServerPath() + "/" + ((String) it.next());
            if (runServerName.equals(this.zkStore.read(str, (Watcher) null))) {
                log.info("主节点被清除：" + str);
                this.zkStore.delete(str);
            }
        }
    }

    private LeaderElectionSupport createLeaderElectionSupport() {
        LeaderElectionSupport leaderElectionSupport = new LeaderElectionSupport();
        leaderElectionSupport.setRootNodeName(ZkConfig.getMasterServerPath());
        leaderElectionSupport.setHostName(getRunServerName());
        leaderElectionSupport.addListener(eventType -> {
            if (eventType == LeaderElectionSupport.EventType.ELECTED_COMPLETE) {
                beginService();
            } else if (eventType == LeaderElectionSupport.EventType.READY_COMPLETE) {
                curStatus = Status.READY;
            }
        });
        return leaderElectionSupport;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tryLock(Account account) {
        RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
        return DBElectionHelper.tryLock();
    }

    private List<Account> getElectedAccount() {
        ArrayList arrayList = new ArrayList(16);
        for (Account account : ScheduleAccountUtils.getAllAccountsOfCurrentEnv(false)) {
            RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
            try {
                if (ZkConfig.getRunMode() == AbstractService.RunMode.Dev || tryLock(account)) {
                    arrayList.add(account);
                } else {
                    log.info("账套选举失败,已存在锁，当前accountId=" + account.getAccountId() + ",accountName=" + account.getAccountName());
                }
            } catch (Exception e) {
                log.info("账套选举异常,accountId=" + account.getAccountId() + ",accountName=" + account.getAccountName() + ",errMsg=" + e.getMessage());
            }
        }
        return arrayList;
    }

    public void beginService() {
        List<Account> electedAccount = getElectedAccount();
        try {
            MQHelper.startJobDispatchComsumer(AsynJobMessageDispatcher.class.getName());
            this.zkStore.ensureExisted(ZkConfig.getJobMessagePath(), String.valueOf(System.currentTimeMillis()));
            this.zkStore.ensureExisted(ZkConfig.getScheduleChangedMessagePath(), String.valueOf(System.currentTimeMillis()));
            this.zkStore.ensureExisted(ZkConfig.getJobChangedMessagePath(), String.valueOf(System.currentTimeMillis()));
            this.executorResourceMrg = new ExecutorResourceManager();
            this.executorResourceMrg.setObjectFactory(getObjectFactory());
            this.executorResourceMrg.setZooKeeper();
            ExecutorResourceManager.setInstance(this.executorResourceMrg);
            this.trigger = new Trigger(electedAccount);
            MQHelper.startTaskStatusConsumer(TaskStatusMessageCustomer.class.getName());
            this.executorResourceMrg.start();
            this.trigger.start();
            addAccountChangedListener();
            addTriggerServiceListener();
            ScheduleGrayGroup.startConfigListener();
            this.isLeaderFlag.set(true);
            curStatus = Status.InService;
            log.info("开始提供服务:" + getRunServerName());
        } catch (Exception e) {
            log.error("schedule服务启动失败,切换节点", e);
            try {
                stopLeader();
            } catch (Exception e2) {
                log.error(e2);
            }
        }
    }

    private void addTriggerServiceListener() {
        ConfigurationUtil.observeChange("Schedule.trigger.paused", new ConfigurationChangeListener() { // from class: kd.bos.schedule.server.ScheduleService.5
            public void onChange(Object obj, Object obj2) {
                if (!"Schedule.trigger.paused".equals(obj) || ScheduleService.this.trigger == null) {
                    return;
                }
                ThreadLifeCycleManager.start();
                if (!Boolean.parseBoolean(String.valueOf(obj2))) {
                    ScheduleService.this.trigger.setStatus(SchVisitorStatus.Running);
                    ScheduleService.this.keepAlive();
                    try {
                        Thread.sleep(ScheduleService.recoveryDelayTime_ms);
                    } catch (InterruptedException e) {
                    }
                    ScheduleService.this.trigger.resumeService();
                    return;
                }
                ScheduleService.this.trigger.setStatus(SchVisitorStatus.Paused);
                for (Account account : ScheduleService.this.trigger.getElectedAccountsOfCluster().values()) {
                    RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
                    DBElectionHelper.releaseLock();
                }
            }
        });
    }

    public void stop() {
        log.info("Schedule Server begins to stop!");
        if (this.isLeaderFlag.get()) {
            stopLeader();
        }
        if (this.isStart && this.selector != null) {
            this.selector.close();
            this.isElectStarted.set(false);
        }
        this.isStart = false;
        super.stop();
        log.info("Schedule Server is stopped!");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopLeader() {
        this.isLeaderFlag.set(false);
        if (this.trigger != null) {
            this.executorResourceMrg.stop();
            this.trigger.stop();
        }
        ScheduleGrayGroup.stopConfigListener();
    }

    protected ObjectFactory createObjectFactory(String str) {
        if (str.equalsIgnoreCase("ZooKeeper") || str.equalsIgnoreCase("MQ")) {
            return new MQObjectFactory();
        }
        return null;
    }
}
