package kd.bos.algo.dataset.cache.fs;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kd.bos.algo.AlgoException;
import kd.bos.algo.CacheHint;
import kd.bos.algo.Row;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.AbstractRow;
import kd.bos.algo.dataset.cache.DataSetCacheMeta;
import kd.bos.algo.dataset.cache.DataSetCacheSpi;
import kd.bos.algo.dataset.cache.SimpleMetaImpl;
import kd.bos.algo.exception.AlgoExceedAllowMaxRowsException;
import kd.bos.algo.serde.RowSerde;
import kd.bos.algo.storage.FileSystemStorage;
import kd.bos.algo.util.Tuple2;
import kd.bos.metric.Meter;
import kd.bos.metric.MetricSystem;
import kd.bos.metric.Timer;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;

/* loaded from: input_file:kd/bos/algo/dataset/cache/fs/FSSpiImpl.class */
public class FSSpiImpl implements DataSetCacheSpi {
    private FileSystemStorage storage;
    private RowSerde rowSerde;
    private Timer timerByName;
    private Meter meterByName;
    private Meter errorByName;
    private RowMeta rowMeta;
    private CacheHint hint;
    private String cacheId;
    private String baseUrl;
    private int pageSize;
    private int rowCount;
    private int pageId;
    private ArrayList<Row> buffer = new ArrayList<>();
    private MainThread mainThread = null;
    private Timer timer = MetricSystem.timer("kd.metrics.algo.cache.saveTimer");
    private Meter meter = MetricSystem.meter("kd.metrics.algo.cache.saveMeter");
    private Meter errorMeter = MetricSystem.meter("kd.metrics.algo.cache.saveMeter.error");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/cache/fs/FSSpiImpl$MainThread.class */
    public class MainThread extends Thread {
        private LinkedBlockingQueue<Page> queue = new LinkedBlockingQueue<>(4);
        private boolean started = false;
        private boolean end = false;
        private CountDownLatch latch = new CountDownLatch(1);
        private RowMeta rowMeta;
        private CacheHint hint;
        private String key1;
        private String baseUrl;
        private AlgoException error;

        public MainThread(String str, String str2, RowMeta rowMeta, CacheHint cacheHint) {
            this.key1 = str;
            this.baseUrl = str2;
            this.rowMeta = rowMeta;
            this.hint = cacheHint;
        }

        void end() {
            this.end = true;
        }

        void addPage(Page page) {
            if (this.error != null) {
                this.latch.countDown();
                throw this.error;
            }
            try {
                this.queue.put(page);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.started) {
                return;
            }
            start();
            this.started = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Page page = null;
            while (true) {
                try {
                    try {
                        page = this.queue.poll(10L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (page == null) {
                        if (!this.end) {
                            continue;
                        } else if (this.error != null || this.queue.isEmpty()) {
                            break;
                        }
                    } else if (this.error != null) {
                        break;
                    } else {
                        processOne(page);
                    }
                } finally {
                    this.latch.countDown();
                }
            }
        }

        private void processOne(Page page) {
            try {
                FSSpiImpl.this.writePage(this.baseUrl, this.rowMeta, page, this.hint);
            } catch (AlgoException e) {
                this.error = e;
                this.end = true;
            } catch (Exception e2) {
                this.error = new AlgoException(e2);
                this.end = true;
            }
        }

        public void cancelOnException(AlgoException algoException) {
            this.error = algoException;
            this.end = true;
            waitDone();
            FSSpiImpl.this.delete(this.key1);
        }

        public void waitDone() {
            try {
                this.latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/cache/fs/FSSpiImpl$Page.class */
    public static class Page {
        private List<Row> list;
        private int pageId;

        Page(int i, List<Row> list) {
            this.pageId = i;
            this.list = list;
        }
    }

    public FSSpiImpl(FileSystemStorage fileSystemStorage) {
        this.storage = fileSystemStorage;
        this.timerByName = MetricSystem.timer("kd.metrics.algo.cache.saveTimer." + fileSystemStorage.getName());
        this.meterByName = MetricSystem.meter("kd.metrics.algo.cache.saveMeter." + fileSystemStorage.getName());
        this.errorByName = MetricSystem.meter("kd.metrics.algo.cache.saveMeter.error." + fileSystemStorage.getName());
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void open(RowMeta rowMeta, CacheHint cacheHint) {
        this.rowMeta = rowMeta;
        this.hint = cacheHint;
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        try {
            Tuple2<String, String> generateIdAndUrl = generateIdAndUrl(cacheHint.getTimeout());
            this.cacheId = generateIdAndUrl.t1;
            this.baseUrl = generateIdAndUrl.t2;
            this.pageSize = cacheHint.getPageSize();
        } catch (IOException e) {
            this.errorMeter.mark();
            this.errorByName.mark();
            throw new AlgoException("can't save dataset cache: " + e.getMessage(), e);
        }
    }

    private boolean isExceedAllowMaxRows() {
        if (this.rowCount <= this.hint.getAllowMaxRows()) {
            return false;
        }
        if (!this.hint.isThrowExceptionWhenExceedAllowMaxRows()) {
            return true;
        }
        AlgoExceedAllowMaxRowsException algoExceedAllowMaxRowsException = new AlgoExceedAllowMaxRowsException("DataSetCache exceed allow max rows:[rowCount:" + this.rowCount + ",allowMaxRows:" + this.hint.getAllowMaxRows() + "]");
        if (this.mainThread == null) {
            return true;
        }
        this.mainThread.cancelOnException(algoExceedAllowMaxRowsException);
        throw algoExceedAllowMaxRowsException;
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void append(Iterator<Row> it) {
        while (it.hasNext()) {
            append(it.next());
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void append(Row row) {
        if (isExceedAllowMaxRows()) {
            return;
        }
        this.buffer.add(((AbstractRow) row).persist());
        this.rowCount++;
        if (this.rowCount % this.pageSize == 0) {
            int i = this.pageId;
            this.pageId = i + 1;
            Page page = new Page(i, this.buffer);
            if (this.mainThread == null) {
                this.mainThread = new MainThread(this.cacheId, this.baseUrl, this.rowMeta, this.hint);
            }
            this.mainThread.addPage(page);
            this.buffer = new ArrayList<>();
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta finish() {
        if (this.buffer.size() > 0) {
            Page page = new Page(this.pageId, this.buffer);
            if (this.mainThread != null) {
                this.mainThread.addPage(page);
            } else {
                writePage(this.baseUrl, this.rowMeta, page, this.hint);
            }
        }
        if (this.mainThread != null) {
            this.mainThread.end();
            this.mainThread.waitDone();
        }
        this.buffer = null;
        SimpleMetaImpl simpleMetaImpl = new SimpleMetaImpl(this.cacheId, this.rowMeta, this.rowCount, this.hint.getPageSize(), this.hint.getStorageType());
        writeMeta(this.baseUrl, simpleMetaImpl, this.hint);
        return simpleMetaImpl;
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta save(RowMeta rowMeta, Iterator<Row> it, CacheHint cacheHint) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        Timer.Context time = this.timer.time();
        Timer.Context time2 = this.timerByName.time();
        try {
            try {
                Tuple2<String, String> generateIdAndUrl = generateIdAndUrl(cacheHint.getTimeout());
                String str = generateIdAndUrl.t1;
                String str2 = generateIdAndUrl.t2;
                int writeRows = writeRows(str, str2, rowMeta, it, cacheHint);
                this.meter.mark(writeRows);
                this.meterByName.mark(writeRows);
                SimpleMetaImpl simpleMetaImpl = new SimpleMetaImpl(str, rowMeta, writeRows, cacheHint.getPageSize(), cacheHint.getStorageType());
                writeMeta(str2, simpleMetaImpl, cacheHint);
                time.stop();
                time2.stop();
                return simpleMetaImpl;
            } catch (IOException e) {
                this.errorMeter.mark();
                this.errorByName.mark();
                throw new AlgoException("can't save dataset cache: " + e.getMessage(), e);
            }
        } catch (Throwable th) {
            time.stop();
            time2.stop();
            throw th;
        }
    }

    private void writeMeta(String str, SimpleMetaImpl simpleMetaImpl, CacheHint cacheHint) {
        BufferedOutputStream bufferedOutputStream = null;
        try {
            try {
                bufferedOutputStream = new BufferedOutputStream(this.storage.create(getMetaUrl(str), cacheHint.getTimeout()));
                new ObjectOutputStream(bufferedOutputStream).writeObject(simpleMetaImpl);
                bufferedOutputStream.flush();
                if (bufferedOutputStream != null) {
                    try {
                        bufferedOutputStream.flush();
                        bufferedOutputStream.close();
                    } catch (IOException e) {
                    }
                }
            } catch (Throwable th) {
                if (bufferedOutputStream != null) {
                    try {
                        bufferedOutputStream.flush();
                        bufferedOutputStream.close();
                    } catch (IOException e2) {
                    }
                }
                throw th;
            }
        } catch (IOException e3) {
            throw new AlgoException("error when write dataset cache meta.", e3);
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta getMeta(String str) {
        InputStream inputStream = null;
        try {
            try {
                inputStream = this.storage.open(getMetaUrl(getUrl(str)));
                DataSetCacheMeta dataSetCacheMeta = (DataSetCacheMeta) new ObjectInputStream(inputStream).readObject();
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                    }
                }
                return dataSetCacheMeta;
            } catch (Throwable th) {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e2) {
                    }
                }
                throw th;
            }
        } catch (Exception e3) {
            throw new AlgoException("error when get dataset cache meta.", e3);
        }
    }

    private int writeRows(String str, String str2, RowMeta rowMeta, Iterator<Row> it, CacheHint cacheHint) {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        int pageSize = cacheHint.getPageSize();
        int i2 = 0;
        while (it.hasNext()) {
            Row next = it.next();
            if (!isExceedAllowMaxRows()) {
                i2 = i / pageSize;
                arrayList.add(((AbstractRow) next).persist());
                i++;
                if (i % pageSize == 0) {
                    Page page = new Page(i2, arrayList);
                    if (this.mainThread == null) {
                        this.mainThread = new MainThread(str, str2, rowMeta, cacheHint);
                    }
                    this.mainThread.addPage(page);
                    arrayList = new ArrayList();
                }
            }
        }
        if (arrayList.size() > 0) {
            Page page2 = new Page(i2, arrayList);
            if (this.mainThread != null) {
                this.mainThread.addPage(page2);
            }
        }
        if (this.mainThread != null) {
            this.mainThread.end();
            this.mainThread.waitDone();
        } else if (arrayList.size() > 0) {
            writePage(str2, rowMeta, new Page(i2, arrayList), cacheHint);
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int writePage(String str, RowMeta rowMeta, Page page, CacheHint cacheHint) {
        return writePage(str, page.pageId, rowMeta, page.list.iterator(), cacheHint);
    }

    private int writePage(String str, int i, RowMeta rowMeta, Iterator<Row> it, CacheHint cacheHint) {
        BufferedOutputStream bufferedOutputStream = null;
        try {
            try {
                bufferedOutputStream = new BufferedOutputStream(this.storage.create(getPageUrl(str, i), cacheHint.getTimeout()));
                DataOutputStream dataOutputStream = new DataOutputStream(bufferedOutputStream);
                int i2 = 0;
                while (it.hasNext() && i2 < cacheHint.getPageSize()) {
                    i2++;
                    writeRow(rowMeta, it.next(), dataOutputStream);
                }
                this.rowSerde.flush(dataOutputStream);
                int i3 = i2;
                if (bufferedOutputStream != null) {
                    try {
                        bufferedOutputStream.flush();
                        bufferedOutputStream.close();
                    } catch (IOException e) {
                    }
                }
                return i3;
            } catch (IOException e2) {
                throw new AlgoException("can't write page: " + e2.getMessage(), e2);
            }
        } catch (Throwable th) {
            if (bufferedOutputStream != null) {
                try {
                    bufferedOutputStream.flush();
                    bufferedOutputStream.close();
                } catch (IOException e3) {
                }
            }
            throw th;
        }
    }

    private void writeRow(RowMeta rowMeta, Row row, DataOutputStream dataOutputStream) {
        this.rowSerde.write(rowMeta, row, dataOutputStream);
    }

    private Row readRow(RowMeta rowMeta, DataInputStream dataInputStream) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        return this.rowSerde.read(rowMeta, dataInputStream);
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void delete(DataSetCacheMeta dataSetCacheMeta) {
        delete(dataSetCacheMeta.getId());
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void delete(String str) {
        delete(str, true);
    }

    private void delete(String str, boolean z) {
        try {
            this.storage.delete(getUrl(str));
        } catch (Exception e) {
            if (z) {
                throw AlgoException.wrap(e);
            }
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public List<Row> getList(DataSetCacheMeta dataSetCacheMeta, int i, int i2) {
        TraceSpan create = Tracer.create("DataSet", "Cache.FSSpi.getList");
        Throwable th = null;
        try {
            String id = dataSetCacheMeta.getId();
            create.addTag("id", id);
            create.addTag("begin", String.valueOf(i));
            create.addTag("length", String.valueOf(i2));
            String url = getUrl(id);
            RowMeta rowMeta = dataSetCacheMeta.getRowMeta();
            int rowCount = dataSetCacheMeta.getRowCount();
            if (rowCount == 0) {
                List<Row> emptyList = Collections.emptyList();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return emptyList;
            }
            int pageSize = dataSetCacheMeta.getPageSize();
            int i3 = i / pageSize;
            int i4 = i % pageSize;
            String pageUrl = getPageUrl(url, i3);
            InputStream inputStream = null;
            DataInputStream dataInputStream = null;
            ArrayList arrayList = new ArrayList();
            try {
                try {
                    inputStream = this.storage.open(pageUrl);
                    dataInputStream = new DataInputStream(inputStream);
                    int i5 = 0;
                    int i6 = 0;
                    while (i6 < i2) {
                        if (i + i6 >= rowCount) {
                            break;
                        }
                        Row readRow = readRow(rowMeta, dataInputStream);
                        if (i5 < i4) {
                            i5++;
                        } else {
                            arrayList.add(readRow);
                            i5++;
                            i6++;
                            if (i5 % pageSize == 0 && i + i6 < rowCount) {
                                i4 = 0;
                                inputStream.close();
                                i3++;
                                inputStream = this.storage.open(getPageUrl(url, i3));
                                dataInputStream = new DataInputStream(inputStream);
                            }
                        }
                    }
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                        }
                    }
                    if (dataInputStream != null) {
                        dataInputStream.close();
                    }
                    return arrayList;
                } catch (Exception e2) {
                    throw new AlgoException("error when get dataset cache iterator.", e2);
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e3) {
                        throw th3;
                    }
                }
                if (dataInputStream != null) {
                    dataInputStream.close();
                }
                throw th3;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    protected String generateId() {
        return UUID.randomUUID().toString().replace("-", "");
    }

    public String getUrl(String str) {
        return "datasetcache/" + str;
    }

    public Tuple2<String, String> generateIdAndUrl(long j) throws IOException {
        String generateId;
        String url;
        do {
            generateId = generateId();
            url = getUrl(generateId);
        } while (!this.storage.createDirectory(url, j));
        this.storage.createDirectory(url, j);
        return new Tuple2<>(generateId, url);
    }

    public String getMetaUrl(String str) {
        return str + "/meta";
    }

    public String getTimeoutUrl(String str) {
        return str + "/timeout";
    }

    public String getPageUrl(String str, int i) {
        return str + "/page" + i;
    }
}
