package kd.fi.pa.stream.pipe;

import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.fi.pa.common.listener.IExceptionListener;
import kd.fi.pa.engine.task.IDataWorkTaskManager;
import kd.fi.pa.stream.datablock.IAsyncStreamDataBlock;
import kd.fi.pa.stream.datablock.SimpleAsyncStreamDataBlock;

/* loaded from: input_file:kd/fi/pa/stream/pipe/AsyncStreamPipe.class */
public class AsyncStreamPipe<E> {
    private static final Log logger = LogFactory.getLog(AsyncStreamPipe.class);
    public static final int PIPE_NOT_RUNNING = 0;
    public static final int PIPE_STARTING = 1;
    public static final int PIPE_RUNNING = 2;
    public static final int PIPE_STOPPING = 3;
    protected ConcurrentLinkedQueue<IAsyncStreamDataBlock<E>> dataQueue;
    protected Consumer<IAsyncStreamDataBlock<E>> dataConsumer;
    protected AtomicInteger pipeStatus;
    protected AtomicInteger activeConsumerCnt;
    protected IExceptionListener exceptionListener;
    protected int consumerThreadCnt;
    protected List<Future> consumerThreadRefs;

    public AsyncStreamPipe(int i) {
        this.dataQueue = new ConcurrentLinkedQueue<>();
        this.pipeStatus = new AtomicInteger(0);
        this.activeConsumerCnt = new AtomicInteger(0);
        this.consumerThreadCnt = i;
        this.consumerThreadRefs = new LinkedList();
    }

    public AsyncStreamPipe() {
        this(1);
    }

    public AsyncStreamPipe(int i, Consumer<IAsyncStreamDataBlock<E>> consumer) {
        this(i);
        attach(consumer);
    }

    public final void attach(Consumer<IAsyncStreamDataBlock<E>> consumer) {
        if (isRunning()) {
            try {
                close();
            } catch (InterruptedException e) {
            }
        }
        updatePipeStatus(1);
        this.dataConsumer = consumer;
        for (int i = 0; i < this.consumerThreadCnt; i++) {
            this.consumerThreadRefs.add(IDataWorkTaskManager.getInstance().submit(() -> {
                return Integer.valueOf(doDataConsume(consumer));
            }, true));
        }
        updatePipeStatus(2);
    }

    protected void updatePipeStatus(int i) {
        this.pipeStatus.set(i);
    }

    public boolean isStopped() {
        int i = this.pipeStatus.get();
        return i == 0 || i == 3;
    }

    public boolean isRunning() {
        int i = this.pipeStatus.get();
        return i == 2 || i == 1;
    }

    protected int doDataConsume(Consumer<IAsyncStreamDataBlock<E>> consumer) {
        IAsyncStreamDataBlock<E> poll;
        this.activeConsumerCnt.incrementAndGet();
        synchronized (this.activeConsumerCnt) {
            this.activeConsumerCnt.notifyAll();
        }
        while (isRunning()) {
            try {
                poll = this.dataQueue.poll();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                if (this.exceptionListener != null) {
                    this.exceptionListener.onError(e);
                }
            }
            if (poll != null) {
                if (this.dataQueue.isEmpty()) {
                    synchronized (this.dataQueue) {
                        this.dataQueue.notifyAll();
                    }
                }
                consumer.accept(poll);
            } else {
                synchronized (this.dataQueue) {
                    this.dataQueue.wait(5000L);
                }
            }
        }
        int decrementAndGet = this.activeConsumerCnt.decrementAndGet();
        synchronized (this.activeConsumerCnt) {
            this.activeConsumerCnt.notifyAll();
        }
        return decrementAndGet;
    }

    protected static void closeClosable(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
            }
        }
    }

    public void close(boolean z) throws InterruptedException {
        if (isRunning()) {
            if (z) {
                this.dataQueue.clear();
            } else {
                while (!this.dataQueue.isEmpty()) {
                    synchronized (this.dataQueue) {
                        this.dataQueue.wait(5000L);
                    }
                }
            }
            updatePipeStatus(3);
            Iterator<Future> it = this.consumerThreadRefs.iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (InterruptedException | ExecutionException e) {
                }
            }
            updatePipeStatus(0);
        }
    }

    public void close() throws InterruptedException {
        close(false);
    }

    public void putToQueue(Collection<E> collection) throws InterruptedException {
        Iterator<E> it = collection.iterator();
        while (it.hasNext()) {
            putToQueue((AsyncStreamPipe<E>) it.next());
        }
    }

    public void putToQueue(E e) throws InterruptedException {
        putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(e));
    }

    public void putToQueue(E e, int i, int i2, boolean z) throws InterruptedException {
        putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(e, i, i2, z));
    }

    public void putToQueue(IAsyncStreamDataBlock<E> iAsyncStreamDataBlock) throws InterruptedException {
        this.dataQueue.offer(iAsyncStreamDataBlock);
        synchronized (this.dataQueue) {
            this.dataQueue.notifyAll();
        }
    }

    public IAsyncStreamDataBlock<E> peekFromQueue() {
        return this.dataQueue.peek();
    }

    public int queueSize() {
        return this.dataQueue.size();
    }

    public boolean isEmpty() {
        return this.dataQueue.isEmpty();
    }

    public IExceptionListener getExceptionListener() {
        return this.exceptionListener;
    }

    public void setExceptionListener(IExceptionListener iExceptionListener) {
        this.exceptionListener = iExceptionListener;
    }

    public Consumer<IAsyncStreamDataBlock<E>> getDataConsumer() {
        return this.dataConsumer;
    }

    public int getConsumerThreadCnt() {
        return this.consumerThreadCnt;
    }

    public void setConsumerThreadCnt(int i) {
        this.consumerThreadCnt = i;
    }

    public int getActiveConsumerCnt() {
        return this.activeConsumerCnt.get();
    }

    public void setDataConsumer(Consumer<IAsyncStreamDataBlock<E>> consumer) {
        this.dataConsumer = consumer;
    }

    public int getPipeStatus() {
        return this.pipeStatus.get();
    }
}
