package org.pentaho.di.trans.streaming.common;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.SubtransExecutor;
import org.pentaho.di.trans.streaming.api.StreamWindow;

/* loaded from: input_file:org/pentaho/di/trans/streaming/common/FixedTimeStreamWindow.class */
public class FixedTimeStreamWindow<I extends List> implements StreamWindow<I, Result> {
    private final RowMetaInterface rowMeta;
    private final long millis;
    private final int batchSize;
    private SubtransExecutor subtransExecutor;
    private final Consumer<Map.Entry<List<I>, Result>> postProcessor;

    public FixedTimeStreamWindow(SubtransExecutor subtransExecutor, RowMetaInterface rowMetaInterface, long j, int i) {
        this(subtransExecutor, rowMetaInterface, j, i, entry -> {
        });
    }

    public FixedTimeStreamWindow(SubtransExecutor subtransExecutor, RowMetaInterface rowMetaInterface, long j, int i, Consumer<Map.Entry<List<I>, Result>> consumer) {
        this.subtransExecutor = subtransExecutor;
        this.rowMeta = rowMetaInterface;
        this.millis = j;
        this.batchSize = i;
        this.postProcessor = consumer;
    }

    @Override // org.pentaho.di.trans.streaming.api.StreamWindow
    public Iterable<Result> buffer(Observable<I> observable) {
        return (this.millis > 0 ? this.batchSize > 0 ? observable.buffer(this.millis, TimeUnit.MILLISECONDS, this.batchSize) : observable.buffer(this.millis, TimeUnit.MILLISECONDS) : observable.buffer(this.batchSize)).observeOn(Schedulers.io()).filter(list -> {
            return !list.isEmpty();
        }).map(this::sendBufferToSubtrans).takeWhile(entry -> {
            return ((Result) entry.getValue()).getNrErrors() == 0;
        }).doOnNext(this.postProcessor).map((v0) -> {
            return v0.getValue();
        }).blockingIterable();
    }

    private Map.Entry<List<I>, Result> sendBufferToSubtrans(List<I> list) throws KettleException {
        return (Map.Entry) this.subtransExecutor.execute((List) list.stream().map(list2 -> {
            return list2.toArray(new Object[0]);
        }).map(objArr -> {
            return new RowMetaAndData(this.rowMeta, objArr);
        }).collect(Collectors.toList())).map(result -> {
            return new AbstractMap.SimpleImmutableEntry(list, result);
        }).orElse(new AbstractMap.SimpleImmutableEntry(list, new Result()));
    }
}
