/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel.impl;

import com.aliyun.odps.commons.transport.Request;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TunnelTableSchema;
import com.aliyun.odps.tunnel.hasher.OdpsHasher;
import com.aliyun.odps.tunnel.hasher.TypeHasher;
import com.aliyun.odps.tunnel.impl.Slot;
import com.aliyun.odps.tunnel.impl.UpsertRecord;
import com.aliyun.odps.tunnel.impl.UpsertSessionImpl;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import com.aliyun.odps.tunnel.streams.UpsertStream;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

public class UpsertStreamImpl
implements UpsertStream {
    private long maxBufferSize;
    private long slotBufferSize;
    private final CompressOption compressOption;
    private final URI endpoint;
    private final UpsertSessionImpl session;
    private Map<Integer, Slot> buckets;
    private List<Integer> hashKeys = new ArrayList<Integer>();
    private TunnelTableSchema schema;
    private final Map<Integer, ProtobufRecordPack> bucketBuffer = new HashMap<Integer, ProtobufRecordPack>();
    private long totalBufferSize = 0L;
    private final Bootstrap bootstrap;
    private CountDownLatch latch;
    private Status status = Status.NORMAL;
    private UpsertStream.Listener listener = null;

    public UpsertStreamImpl(Builder builder) throws IOException, TunnelException {
        this.compressOption = builder.getCompressOption();
        this.slotBufferSize = builder.getSlotBufferSize();
        this.maxBufferSize = builder.getMaxBufferSize();
        this.session = builder.session;
        this.endpoint = this.session.getEndpoint();
        this.buckets = this.session.getBuckets();
        this.schema = this.session.getRecordSchema();
        for (Integer slot : this.buckets.keySet()) {
            this.bucketBuffer.put(slot, new ProtobufRecordPack(this.schema, new Checksum(), 0, new CompressOption()));
        }
        this.hashKeys = this.session.getHashKeys();
        this.bootstrap = this.session.getBootstrap();
        this.listener = builder.getListener();
    }

    @Override
    public void upsert(Record record) throws IOException, TunnelException {
        this.write(record, Operation.UPSERT, null);
    }

    @Override
    public void delete(Record record) throws IOException, TunnelException {
        this.write(record, Operation.DELETE, null);
    }

    @Override
    public void flush() throws IOException, TunnelException {
        this.flush(true);
    }

    @Override
    public void close() throws IOException, TunnelException {
        if (this.status == Status.NORMAL) {
            this.flush();
            this.status = Status.CLOSED;
        }
    }

    private void write(Record record, Operation op, List<String> validColumns) throws TunnelException, IOException {
        this.checkStatus();
        ArrayList<Integer> hashValues = new ArrayList<Integer>();
        for (int key : this.hashKeys) {
            Object value = record.get(key);
            if (value == null) {
                throw new TunnelException("Hash key " + key + " can not be null!");
            }
            OdpsHasher hasher = TypeHasher.getHasher(this.schema.getColumn(key).getTypeInfo().getTypeName().toLowerCase(), this.session.getHasher());
            hashValues.add(hasher.hash(value));
        }
        int bucket = TypeHasher.CombineHashVal(hashValues) % this.buckets.size();
        if (!this.bucketBuffer.containsKey(bucket)) {
            throw new TunnelException("Tunnel internal error! Do not have bucket for hash key " + bucket);
        }
        ProtobufRecordPack pack = this.bucketBuffer.get(bucket);
        UpsertRecord r = (UpsertRecord)record;
        r.setOperation(op == Operation.UPSERT ? (byte)85 : 68);
        r.setValueCols(validColumns == null ? new ArrayList<Integer>() : validColumns.stream().map(arg_0 -> ((TunnelTableSchema)this.schema).getColumnIndex(arg_0)).collect(Collectors.toList()));
        long bytes = pack.getTotalBytes();
        pack.append((Record)r.getRecord());
        bytes = pack.getTotalBytes() - bytes;
        this.totalBufferSize += bytes;
        if (pack.getTotalBytes() > this.slotBufferSize) {
            this.flush(false);
        } else if (this.totalBufferSize > this.maxBufferSize) {
            this.flush(true);
        }
    }

    private void flush(boolean flushAll) throws TunnelException, IOException {
        boolean success;
        ArrayList<FlushResultHandler> handlers = new ArrayList<FlushResultHandler>();
        int retry = 0;
        Map<Integer, Slot> bucketMap = this.session.getBuckets();
        if (bucketMap.size() != this.buckets.size()) {
            throw new TunnelException("session slot map is changed");
        }
        this.buckets = bucketMap;
        do {
            success = true;
            handlers.clear();
            try {
                this.checkStatus();
                this.latch = new CountDownLatch(this.bucketBuffer.size());
                for (Map.Entry<Integer, ProtobufRecordPack> entry : this.bucketBuffer.entrySet()) {
                    ProtobufRecordPack pack = entry.getValue();
                    if (pack.getSize() > 0L) {
                        if (pack.getTotalBytes() > this.slotBufferSize || flushAll) {
                            int k = entry.getKey();
                            long bytes = pack.getTotalBytes();
                            pack.checkTransConsistency(false);
                            pack.complete();
                            bytes = pack.getTotalBytes() - bytes;
                            if (!flushAll) {
                                this.totalBufferSize += bytes;
                            }
                            Request request = this.session.buildRequest("PUT", k, this.buckets.get(k), pack.getTotalBytes(), pack.getSize(), this.compressOption);
                            String host = request.getURI().getHost();
                            int port = request.getURI().getPort();
                            if (port == -1) {
                                port = request.getURI().getScheme().equalsIgnoreCase("https") ? 443 : 80;
                            }
                            FlushResultHandler handler = new FlushResultHandler(pack, this.latch, this.listener, retry);
                            Channel channel = this.bootstrap.connect(host, port).sync().channel();
                            channel.pipeline().addLast(new ChannelHandler[]{handler});
                            handlers.add(handler);
                            channel.writeAndFlush((Object)this.buildFullHttpRequest(request, pack.getProtobufStream()));
                            continue;
                        }
                        this.latch.countDown();
                        continue;
                    }
                    this.latch.countDown();
                }
                this.latch.await();
            }
            catch (InterruptedException e) {
                throw new TunnelException("flush interrupted", e);
            }
            for (FlushResultHandler handler : handlers) {
                if (handler.getException() != null) {
                    success = false;
                    if (handler.isNeedRetry()) continue;
                    this.status = Status.ERROR;
                    throw handler.getException();
                }
                if (flushAll) continue;
                this.totalBufferSize -= handler.getFlushResult().flushSize;
            }
            ++retry;
        } while (!success);
        if (flushAll) {
            this.totalBufferSize = 0L;
        }
    }

    private void checkStatus() throws TunnelException {
        if (Status.CLOSED == this.status) {
            throw new TunnelException("Stream is closed!");
        }
        if (Status.ERROR == this.status) {
            throw new TunnelException("Stream has error!");
        }
    }

    private HttpRequest buildFullHttpRequest(Request request, ByteArrayOutputStream content) {
        String uri = request.getURI().toString().replace(this.endpoint.toString(), "");
        DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, uri, Unpooled.wrappedBuffer((byte[])content.toByteArray()));
        request.getHeaders().forEach((arg_0, arg_1) -> UpsertStreamImpl.lambda$buildFullHttpRequest$0((HttpRequest)req, arg_0, arg_1));
        req.headers().set((CharSequence)HttpHeaderNames.HOST, (Object)request.getURI().getHost());
        return req;
    }

    private static /* synthetic */ void lambda$buildFullHttpRequest$0(HttpRequest req, String key, String value) {
        req.headers().set(key, (Object)value);
    }

    private class FlushResultHandler
    extends ChannelInboundHandlerAdapter {
        private UpsertStream.FlushResult flushResult = new UpsertStream.FlushResult();
        private ProtobufRecordPack pack;
        private TunnelException exception = null;
        CountDownLatch latch;
        long start;
        UpsertStream.Listener listener;
        int retry;
        boolean needRetry = false;

        public UpsertStream.FlushResult getFlushResult() {
            return this.flushResult;
        }

        public TunnelException getException() {
            return this.exception;
        }

        public boolean isNeedRetry() {
            return this.needRetry;
        }

        FlushResultHandler(ProtobufRecordPack pack, CountDownLatch latch, UpsertStream.Listener listener, int retry) {
            this.flushResult.recordCount = pack.getSize();
            this.pack = pack;
            this.flushResult.flushSize = pack.getTotalBytes();
            this.latch = latch;
            this.start = System.currentTimeMillis();
            this.listener = listener;
            this.retry = retry;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                FullHttpResponse response = (FullHttpResponse)msg;
                this.flushResult.traceId = response.headers().get("x-odps-request-id");
                if (response.status() == HttpResponseStatus.OK) {
                    this.pack.reset();
                    if (this.listener != null) {
                        try {
                            this.listener.onFlush(this.flushResult);
                        }
                        catch (Exception exception) {}
                    }
                } else {
                    this.exception = new TunnelException(this.flushResult.traceId, (InputStream)new ByteBufInputStream(response.content()), response.status().code());
                    if (this.listener != null) {
                        try {
                            this.listener.onFlushFail(this.exception.getMessage(), this.retry);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
            }
            catch (Exception e) {
                this.exception = new TunnelException(e.getMessage(), e);
                try {
                    this.needRetry = this.listener.onFlushFail(e.getMessage(), this.retry);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            finally {
                this.latch.countDown();
                ctx.close();
                this.flushResult.flushTime = System.currentTimeMillis() - this.start;
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            this.exception = new TunnelException(cause.getMessage(), cause);
            this.latch.countDown();
            ctx.close();
            this.flushResult.flushTime = System.currentTimeMillis() - this.start;
        }
    }

    public static class Builder
    implements UpsertStream.Builder {
        private UpsertSessionImpl session;
        private long maxBufferSize = 0x4000000L;
        private long slotBufferSize = 0x100000L;
        private CompressOption compressOption = new CompressOption();
        private UpsertStream.Listener listener = null;

        public Builder setSession(UpsertSessionImpl session) {
            this.session = session;
            return this;
        }

        public UpsertSessionImpl getSession() {
            return this.session;
        }

        @Override
        public long getMaxBufferSize() {
            return this.maxBufferSize;
        }

        @Override
        public Builder setMaxBufferSize(long maxBufferSize) {
            this.maxBufferSize = maxBufferSize;
            return this;
        }

        @Override
        public long getSlotBufferSize() {
            return this.slotBufferSize;
        }

        @Override
        public Builder setSlotBufferSize(long slotBufferSize) {
            this.slotBufferSize = slotBufferSize;
            return this;
        }

        @Override
        public CompressOption getCompressOption() {
            return this.compressOption;
        }

        @Override
        public Builder setCompressOption(CompressOption compressOption) {
            this.compressOption = compressOption;
            return this;
        }

        @Override
        public UpsertStream.Listener getListener() {
            return this.listener;
        }

        @Override
        public Builder setListener(UpsertStream.Listener listener) {
            this.listener = listener;
            return this;
        }

        @Override
        public UpsertStream build() throws IOException, TunnelException {
            return new UpsertStreamImpl(this);
        }
    }

    private static enum Status {
        NORMAL,
        ERROR,
        CLOSED;

    }

    private static enum Operation {
        UPSERT,
        DELETE;

    }
}

