package kd.bos.dts.service;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kd.bos.context.RequestContext;
import kd.bos.dc.api.model.Account;
import kd.bos.dts.Constant;
import kd.bos.dts.DtsMsgSender;
import kd.bos.dts.DtsUtils;
import kd.bos.dts.check.address.DestinationCheckerFactory;
import kd.bos.dts.check.consistence.Checker;
import kd.bos.dts.define.DestinationRuleConfig;
import kd.bos.dts.exception.ExceptionLogger;
import kd.bos.dts.init.DtsSnapConfigDao;
import kd.bos.dts.init.MonitorRecorder;
import kd.bos.dts.log.DateUtil;
import kd.bos.dts.log.DtsStatusReporterFactory;
import kd.bos.dts.log.StoreageHelper;
import kd.bos.dts.oplog.Oplog;
import kd.bos.dts.oplog.Status;
import kd.bos.dts.retry.RetryService;
import kd.bos.dts.retry.RetryStroage;
import kd.bos.dts.syncconfig.SyncConfigCompareInfo;
import kd.bos.instance.Instance;
import kd.bos.orm.datasync.ConfigureItemStatus;
import kd.bos.orm.datasync.DestinationTransRule;
import kd.bos.orm.datasync.DestinationType;
import kd.bos.orm.datasync.DtsAccountPower;
import kd.bos.orm.datasync.agent.UpgraderStatus;
import kd.bos.thread.ThreadLifeCycleManager;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;

/* loaded from: input_file:kd/bos/dts/service/DtsConfigMonitor.class */
public class DtsConfigMonitor {
    private static AtomicBoolean hasInited = new AtomicBoolean(false);
    public static final String DTS_CONFIG_CHANGE_KEY = "dts_config_change_key";
    public static final String DTS_CONSISTENCE_CHECK_BEGIN = "dts.consistence.check.begin";
    public static final String DTS_CONSISTENCE_CHECK_INTERVAL = "dts.consistence.check.interval";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/dts/service/DtsConfigMonitor$DtsConfigMonitorHolder.class */
    public static class DtsConfigMonitorHolder {
        private static DtsConfigMonitor instance = new DtsConfigMonitor();

        private DtsConfigMonitorHolder() {
        }
    }

    public static DtsConfigMonitor get() {
        return DtsConfigMonitorHolder.instance;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void init() {
        if (hasInited.compareAndSet(false, true)) {
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(3);
            DtsConfigMonitor dtsConfigMonitor = get();
            newScheduledThreadPool.scheduleAtFixedRate(ThreadLifeCycleManager.wrapRunnable(() -> {
                if (DtsUtils.dtsEnable() && DtsService.isMaster()) {
                    dtsConfigMonitor.runCompareTask();
                }
            }), 1L, 600L, TimeUnit.SECONDS);
            newScheduledThreadPool.scheduleAtFixedRate(ThreadLifeCycleManager.wrapRunnable(() -> {
                if (DtsUtils.dtsEnable() && DtsService.isMaster()) {
                    dtsConfigMonitor.runCheckDestinationTask();
                }
            }), 10L, 30L, TimeUnit.SECONDS);
            newScheduledThreadPool.scheduleAtFixedRate(ThreadLifeCycleManager.wrapRunnable(() -> {
                if (DtsUtils.dtsEnable() && DtsService.isMaster()) {
                    dtsConfigMonitor.runCheckRetryServiceTask();
                }
            }), 10L, 60L, TimeUnit.SECONDS);
            ScheduledExecutorService newScheduledThreadPool2 = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: kd.bos.dts.service.DtsConfigMonitor.1
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "CheckAndModifyTask-");
                    thread.setDaemon(true);
                    return thread;
                }
            });
            int intValue = Integer.getInteger(DTS_CONSISTENCE_CHECK_BEGIN, 1).intValue();
            if (intValue < 0) {
                intValue = Integer.MAX_VALUE;
            }
            newScheduledThreadPool2.scheduleAtFixedRate(ThreadLifeCycleManager.wrapRunnable(() -> {
                if (DtsUtils.dtsEnable() && DtsService.isMaster()) {
                    dtsConfigMonitor.runCheckAndModifyTask();
                }
            }), intValue, Integer.getInteger(DTS_CONSISTENCE_CHECK_INTERVAL, 4).intValue(), TimeUnit.HOURS);
            ScheduledExecutorService newScheduledThreadPool3 = Executors.newScheduledThreadPool(1);
            long timeMillis = DateUtil.getTimeMillis("03:00:00") - System.currentTimeMillis();
            newScheduledThreadPool3.scheduleAtFixedRate(ThreadLifeCycleManager.wrapRunnable(() -> {
                if (DtsUtils.dtsEnable() && DtsService.isMaster()) {
                    dtsConfigMonitor.runDeleteDtsMonitorLogTask();
                }
            }), timeMillis > 0 ? timeMillis : 86400000 + timeMillis, 86400000L, TimeUnit.MILLISECONDS);
        }
    }

    private void runCheckAndModifyTask() {
        List<Account> allAccountsOfCurrentEnv = DtsUtils.getAllAccountsOfCurrentEnv();
        long currentTimeMillis = System.currentTimeMillis();
        allAccountsOfCurrentEnv.forEach(account -> {
            ThreadLifeCycleManager.start();
            RequestContext create = RequestContext.create();
            create.setAccountId(account.getAccountId());
            create.setTenantId(account.getTenantId());
            try {
                if (UpgraderStatus.status.isUpgradering()) {
                    return;
                }
                if (DtsAccountPower.isAccountDtsEnable()) {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    DtsSnapConfigDao.instance.getConfigItems().forEach((str, syncConfigInfo) -> {
                        if (ConfigureItemStatus.working != syncConfigInfo.getStatus()) {
                            return;
                        }
                        DestinationTransRule destinationTransRule = new DestinationTransRule();
                        destinationTransRule.setBusinessType(syncConfigInfo.getBusinessType());
                        destinationTransRule.setRegion(syncConfigInfo.getRegion());
                        destinationTransRule.setType(DestinationType.getType(syncConfigInfo.getDestinationtype()));
                        destinationTransRule.setMappingrule(syncConfigInfo.getMappingrule());
                        DestinationRuleConfig destinationRuleConfig = DestinationRuleConfig.get(syncConfigInfo.getEntitynumber(), destinationTransRule);
                        try {
                            Map<String, Object> checkAndRepair = Checker.checkAndRepair(destinationRuleConfig);
                            StringBuilder sb = new StringBuilder();
                            sb.append(destinationRuleConfig.getShowName()).append("[").append(checkAndRepair).append("],");
                            ExceptionLogger.log(DtsConfigMonitor.class, String.format("%s DTS CheckAndModifyTask result: %s ", account.getAccountId(), sb.toString()));
                        } catch (Exception e) {
                            ExceptionLogger.log(DtsConfigMonitor.class, String.format("%s run DTS CheckAndModifyTask error ", destinationRuleConfig.getShowName()), e);
                        }
                    });
                    ExceptionLogger.log(DtsConfigMonitor.class, String.format("%s DTS CheckAndModifyTask spends %s ms", account.getAccountId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)));
                }
            } catch (Exception e) {
                ExceptionLogger.log(DtsConfigMonitor.class, "run DTS CheckAndModifyTask error ", e);
            } finally {
                ThreadLifeCycleManager.end();
            }
        });
        ExceptionLogger.log(DtsConfigMonitor.class, String.format("Cluster %s`s DTS CheckAndModifyTask spends %s ms", Instance.getClusterName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
    }

    public void runCompareTask() {
        DtsUtils.getAllAccountsOfCurrentEnv().forEach(account -> {
            ThreadLifeCycleManager.start();
            RequestContext create = RequestContext.create();
            create.setAccountId(account.getAccountId());
            create.setTenantId(account.getTenantId());
            if (UpgraderStatus.status.isUpgradering()) {
                return;
            }
            try {
                queryAndSend();
            } catch (Exception e) {
                ExceptionLogger.log(DtsConfigMonitor.class, "DtsConfigMonitor error ", e);
            } finally {
                ThreadLifeCycleManager.end();
            }
        });
    }

    public void runCheckRetryServiceTask() {
        DtsUtils.getAllAccountsOfCurrentEnv().forEach(account -> {
            ThreadLifeCycleManager.start();
            RequestContext create = RequestContext.create();
            create.setAccountId(account.getAccountId());
            create.setTenantId(account.getTenantId());
            if (UpgraderStatus.status.isUpgradering()) {
                return;
            }
            try {
                if (DtsAccountPower.isAccountDtsEnable()) {
                    RetryService.checkAndModify();
                }
            } catch (Exception e) {
                ExceptionLogger.log(DtsConfigMonitor.class, "DtsConfigMonitor runCheckRetryServiceTask error ", e);
            } finally {
                ThreadLifeCycleManager.end();
            }
        });
    }

    public void runCheckDestinationTask() {
        DtsUtils.getAllAccountsOfCurrentEnv().forEach(account -> {
            ThreadLifeCycleManager.start();
            RequestContext create = RequestContext.create();
            create.setAccountId(account.getAccountId());
            create.setTenantId(account.getTenantId());
            if (UpgraderStatus.status.isUpgradering()) {
                return;
            }
            try {
                checkDestination();
            } catch (Exception e) {
                ExceptionLogger.log(DtsConfigMonitor.class, "DtsConfigMonitor checkDestination error ", e);
            } finally {
                ThreadLifeCycleManager.end();
            }
        });
    }

    private void checkDestination() {
        if (DtsAccountPower.isAccountDtsEnable()) {
            DtsSnapConfigDao.instance.getDestinationInfoForWork().forEach(addressStatus -> {
                String destinationtype = addressStatus.getDestinationtype();
                String region = addressStatus.getRegion();
                String status = addressStatus.getStatus();
                if (ConfigureItemStatus.working.getName().equals(status)) {
                    if (DestinationCheckerFactory.getChecker(destinationtype, region).isAvailable()) {
                        return;
                    }
                    DtsSnapConfigDao.instance.suspend(destinationtype, region);
                } else if (ConfigureItemStatus.suspend.getName().equals(status) && DestinationCheckerFactory.getChecker(destinationtype, region).isAvailable()) {
                    DtsSnapConfigDao.instance.resume(destinationtype, region, RetryStroage.get().queryRetryList());
                }
            });
        }
    }

    public void queryAndSend() {
        queryAndSend(new MonitorRecorder() { // from class: kd.bos.dts.service.DtsConfigMonitor.2
            @Override // kd.bos.dts.init.MonitorRecorder
            public void recordFind(List<SyncConfigCompareInfo> list) {
            }

            @Override // kd.bos.dts.init.MonitorRecorder
            public void recordSend(SyncConfigCompareInfo syncConfigCompareInfo) {
            }

            @Override // kd.bos.dts.init.MonitorRecorder
            public void recordNotAvailable(SyncConfigCompareInfo syncConfigCompareInfo) {
            }
        });
    }

    public void queryAndSend(MonitorRecorder monitorRecorder) {
        if (DtsAccountPower.isAccountDtsEnable()) {
            List<SyncConfigCompareInfo> dtsConfigChanged = DtsSnapConfigDao.instance.getDtsConfigChanged();
            dtsConfigChanged.forEach(syncConfigCompareInfo -> {
                if (DestinationCheckerFactory.getChecker(syncConfigCompareInfo.getDestinationtype(), syncConfigCompareInfo.getRegion()).isAvailable()) {
                    sendConfig(syncConfigCompareInfo);
                    monitorRecorder.recordSend(syncConfigCompareInfo);
                    return;
                }
                monitorRecorder.recordNotAvailable(syncConfigCompareInfo);
                String str = "DTS destination address not avaiable for  " + syncConfigCompareInfo;
                ExceptionLogger.log(DtsConfigMonitor.class, str);
                RuntimeException runtimeException = new RuntimeException(str);
                Oplog.get().error(syncConfigCompareInfo, ExceptionLogger.getStack(runtimeException));
                DtsStatusReporterFactory.get().confInitReportError(runtimeException, syncConfigCompareInfo.getDestinationEntityConfig());
            });
            if (dtsConfigChanged.size() == 0) {
                monitorRecorder.recordFind(dtsConfigChanged);
            }
        }
    }

    private void sendConfig(SyncConfigCompareInfo syncConfigCompareInfo) {
        HashMap hashMap = new HashMap();
        hashMap.put(DTS_CONFIG_CHANGE_KEY, syncConfigCompareInfo);
        DtsMsgSender.send(Constant.DTS_REGION, Constant.DTS_CONFIG_QUEUE, hashMap);
        Oplog.get().recordTask(syncConfigCompareInfo, Status.submit_sync_task);
    }

    private void runDeleteDtsMonitorLogTask() {
        DtsUtils.getAllAccountsOfCurrentEnv().forEach(account -> {
            ThreadLifeCycleManager.start();
            RequestContext create = RequestContext.create();
            create.setAccountId(account.getAccountId());
            create.setTenantId(account.getTenantId());
            try {
                if (DtsAccountPower.isAccountDtsEnable()) {
                    if (!Boolean.getBoolean("dts.log.keep")) {
                        try {
                            DtsSnapConfigDao.instance.deleteDataSyncLog();
                        } catch (Exception e) {
                            ExceptionLogger.error(DtsConfigMonitor.class, "dts delDtsMonitorLog error", e);
                        }
                    }
                    Date addDay = DateUtil.addDay(DateUtil.getNowDayBegin(), -Integer.parseInt(System.getProperty("dts.monitor.log.day", "7")));
                    DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(new String[]{StoreageHelper.indexName});
                    deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery("dts_timestap").lte(Long.valueOf(addDay.getTime())));
                    try {
                        ExceptionLogger.log(DtsConfigMonitor.class, "dts delete monitor log time:{}, num={}:", new Object[]{new Date(), Long.valueOf(StoreageHelper.getRestClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT).getStatus().getDeleted())});
                    } catch (IOException e2) {
                        ExceptionLogger.error(DtsConfigMonitor.class, "dts delDtsMonitorLog error", e2);
                    }
                }
            } catch (Exception e3) {
                ExceptionLogger.log(DtsConfigMonitor.class, "DtsConfigMonitor runDeleteDtsMonitorLogTask error ", e3);
            } finally {
                ThreadLifeCycleManager.end();
            }
        });
    }
}
