package kd.bos.algox.flink.core.inout;

import kd.bos.algo.DataSet;
import kd.bos.algo.input.DbInput;
import kd.bos.algox.InputExecutor;
import kd.bos.algox.RowX;
import kd.bos.algox.flink.core.InputSemaphore;
import kd.bos.algox.flink.core.InputSemaphoreFactory;
import kd.bos.context.RequestContext;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;

/* loaded from: input_file:kd/bos/algox/flink/core/inout/DbInputExecutor.class */
public class DbInputExecutor extends InputExecutor<DbInput> {
    private static final Log log = LogFactory.getLog(DbInputExecutor.class);
    private transient DataSet dataSet;
    private InputSemaphore semaphore;

    public DbInputExecutor(DbInput dbInput) {
        super(dbInput, dbInput.getRowMeta());
        this.semaphore = null;
    }

    public void close() {
        if (this.dataSet != null) {
            try {
                this.dataSet.close();
            } catch (Throwable th) {
                log.error("error when close DataSet", th);
            }
        }
    }

    public RowX next(RowX rowX) {
        if (!this.dataSet.hasNext()) {
            return null;
        }
        convertToRowX(rowX, this.dataSet.next());
        return rowX;
    }

    public void open() {
        try {
            this.semaphore = InputSemaphoreFactory.getSemaphonre();
            this.semaphore.acquire(1);
        } catch (InterruptedException e) {
            log.error("get semaphore error", e);
        }
        RequestContext.set(((DbInput) this.input).getRequestContext());
        this.dataSet = DB.queryDataSet(((DbInput) this.input).getAlgoKey(), DBRoute.of(((DbInput) this.input).getRouteKey()), ((DbInput) this.input).getSql(), ((DbInput) this.input).getParams());
        if (this.semaphore != null) {
            this.semaphore.release(1);
        }
    }

    public boolean hasNext() {
        return this.dataSet.hasNext();
    }
}
