/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.FSMCaller;
import com.alipay.sofa.jraft.ReadOnlyService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.NodeMetrics;
import com.alipay.sofa.jraft.entity.ReadIndexState;
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
import com.alipay.sofa.jraft.error.OverloadException;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.util.Bytes;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.DisruptorMetricSet;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.Metric;
import com.google.protobuf.ZeroByteStringHelper;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadOnlyServiceImpl
implements ReadOnlyService,
FSMCaller.LastAppliedLogIndexListener {
    private Disruptor<ReadIndexEvent> readIndexDisruptor;
    private RingBuffer<ReadIndexEvent> readIndexQueue;
    private RaftOptions raftOptions;
    private NodeImpl node;
    private final Lock lock = new ReentrantLock();
    private FSMCaller fsmCaller;
    private volatile CountDownLatch shutdownLatch;
    private ScheduledExecutorService scheduledExecutorService;
    private NodeMetrics nodeMetrics;
    private volatile RaftException error;
    private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap();
    private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyServiceImpl.class);

    private void handleReadIndex(ReadOnlyOption option, List<ReadIndexEvent> events) {
        RpcRequests.ReadIndexRequest.Builder rb = RpcRequests.ReadIndexRequest.newBuilder().setGroupId(this.node.getGroupId()).setServerId(this.node.getServerId().toString()).setReadOnlyOptions(ReadOnlyOption.convertMsgType(option));
        List<ReadIndexState> states = events.stream().filter(it -> option.equals((Object)it.readOnlyOptions)).map(it -> {
            rb.addEntries(ZeroByteStringHelper.wrap(it.requestContext.get()));
            return new ReadIndexState(it.requestContext, it.done, it.startTime);
        }).collect(Collectors.toList());
        if (states.isEmpty()) {
            return;
        }
        RpcRequests.ReadIndexRequest request = rb.build();
        this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
    }

    private void executeReadIndexEvents(List<ReadIndexEvent> events) {
        if (events.isEmpty()) {
            return;
        }
        this.handleReadIndex(ReadOnlyOption.ReadOnlySafe, events);
        this.handleReadIndex(ReadOnlyOption.ReadOnlyLeaseBased, events);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetPendingStatusError(Status st) {
        this.lock.lock();
        try {
            Iterator<List<ReadIndexStatus>> it = this.pendingNotifyStatus.values().iterator();
            while (it.hasNext()) {
                List<ReadIndexStatus> statuses = it.next();
                for (ReadIndexStatus status : statuses) {
                    this.reportError(status, st);
                }
                it.remove();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public boolean init(ReadOnlyServiceOptions opts) {
        this.node = opts.getNode();
        this.nodeMetrics = this.node.getNodeMetrics();
        this.fsmCaller = opts.getFsmCaller();
        this.raftOptions = opts.getRaftOptions();
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
        this.readIndexDisruptor = DisruptorBuilder.newInstance().setEventFactory(new ReadIndexEventFactory()).setRingBufferSize(this.raftOptions.getDisruptorBufferSize()).setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)).setWaitStrategy((WaitStrategy)new BlockingWaitStrategy()).setProducerType(ProducerType.MULTI).build();
        this.readIndexDisruptor.handleEventsWith(new EventHandler[]{new ReadIndexEventHandler()});
        this.readIndexDisruptor.setDefaultExceptionHandler(new LogExceptionHandler(this.getClass().getSimpleName()));
        this.readIndexQueue = this.readIndexDisruptor.start();
        if (this.nodeMetrics.getMetricRegistry() != null) {
            this.nodeMetrics.getMetricRegistry().register("jraft-read-only-service-disruptor", (Metric)new DisruptorMetricSet(this.readIndexQueue));
        }
        this.fsmCaller.addLastAppliedLogIndexListener(this);
        this.scheduledExecutorService.scheduleAtFixedRate(() -> this.onApplied(this.fsmCaller.getLastAppliedIndex()), this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
        return true;
    }

    @Override
    public synchronized void setError(RaftException error) {
        if (this.error == null) {
            this.error = error;
        }
    }

    @Override
    public synchronized void shutdown() {
        if (this.shutdownLatch != null) {
            return;
        }
        this.shutdownLatch = new CountDownLatch(1);
        ThreadPoolsFactory.runInThread(this.node.getGroupId(), () -> this.readIndexQueue.publishEvent((event, sequence) -> {
            event.shutdownLatch = this.shutdownLatch;
        }));
        this.scheduledExecutorService.shutdown();
    }

    @Override
    public void join() throws InterruptedException {
        if (this.shutdownLatch != null) {
            this.shutdownLatch.await();
        }
        this.readIndexDisruptor.shutdown();
        this.resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit.", new Object[0]));
        this.scheduledExecutorService.awaitTermination(5L, TimeUnit.SECONDS);
    }

    @Override
    public void addRequest(byte[] reqCtx, ReadIndexClosure closure) {
        this.addRequest(this.node.getRaftOptions().getReadOnlyOptions(), reqCtx, closure);
    }

    @Override
    public void addRequest(ReadOnlyOption readOnlyOptions, byte[] reqCtx, ReadIndexClosure closure) {
        if (this.shutdownLatch != null) {
            ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EHOSTDOWN, "Was stopped", new Object[0]));
            throw new IllegalStateException("Service already shutdown.");
        }
        try {
            EventTranslator translator = (event, sequence) -> {
                event.readOnlyOptions = readOnlyOptions;
                event.done = closure;
                event.requestContext = new Bytes(reqCtx);
                event.startTime = Utils.monotonicMs();
            };
            switch (this.node.getOptions().getApplyTaskMode()) {
                case Blocking: {
                    this.readIndexQueue.publishEvent(translator);
                    break;
                }
                default: {
                    if (!this.readIndexQueue.tryPublishEvent(translator)) {
                        String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize=" + this.readIndexQueue.getBufferSize();
                        ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EBUSY, errorMsg, new Object[0]));
                        this.nodeMetrics.recordTimes("read-index-overload-times", 1L);
                        LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", (Object)this.node.getNodeId());
                        if (closure == null) {
                            throw new OverloadException(errorMsg);
                        }
                    }
                    break;
                }
            }
        }
        catch (Exception e) {
            ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EPERM, "Node is down.", new Object[0]));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onApplied(long appliedIndex) {
        ArrayList pendingStatuses = null;
        this.lock.lock();
        try {
            if (this.pendingNotifyStatus.isEmpty()) {
                return;
            }
            NavigableMap<Long, List<ReadIndexStatus>> statuses = this.pendingNotifyStatus.headMap(appliedIndex, true);
            if (statuses != null) {
                pendingStatuses = new ArrayList(statuses.size() << 1);
                Iterator it = statuses.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = it.next();
                    pendingStatuses.addAll((Collection)entry.getValue());
                    it.remove();
                }
            }
            if (this.error != null) {
                this.resetPendingStatusError(this.error.getStatus());
            }
        }
        finally {
            this.lock.unlock();
            if (pendingStatuses != null && !pendingStatuses.isEmpty()) {
                for (ReadIndexStatus status : pendingStatuses) {
                    this.notifySuccess(status);
                }
            }
        }
    }

    @OnlyForTest
    void flush() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        this.readIndexQueue.publishEvent((task, sequence) -> {
            task.shutdownLatch = latch;
        });
        latch.await();
    }

    @OnlyForTest
    TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
        return this.pendingNotifyStatus;
    }

    @OnlyForTest
    RaftOptions getRaftOptions() {
        return this.raftOptions;
    }

    private void reportError(ReadIndexStatus status, Status st) {
        long nowMs = Utils.monotonicMs();
        List<ReadIndexState> states = status.getStates();
        int taskCount = states.size();
        for (int i = 0; i < taskCount; ++i) {
            ReadIndexState task = states.get(i);
            ReadIndexClosure done = task.getDone();
            if (done == null) continue;
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.run(st);
        }
    }

    private void notifySuccess(ReadIndexStatus status) {
        long nowMs = Utils.monotonicMs();
        List<ReadIndexState> states = status.getStates();
        int taskCount = states.size();
        for (int i = 0; i < taskCount; ++i) {
            ReadIndexState task = states.get(i);
            ReadIndexClosure done = task.getDone();
            if (done == null) continue;
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.setResult(task.getIndex(), task.getRequestContext().get());
            done.run(Status.OK());
        }
    }

    class ReadIndexResponseClosure
    extends RpcResponseClosureAdapter<RpcRequests.ReadIndexResponse> {
        final List<ReadIndexState> states;
        final RpcRequests.ReadIndexRequest request;

        public ReadIndexResponseClosure(List<ReadIndexState> states, RpcRequests.ReadIndexRequest request) {
            this.states = states;
            this.request = request;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(Status status) {
            if (!status.isOk()) {
                this.notifyFail(status);
                return;
            }
            RpcRequests.ReadIndexResponse readIndexResponse = (RpcRequests.ReadIndexResponse)this.getResponse();
            if (!readIndexResponse.getSuccess()) {
                this.notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
                return;
            }
            ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request, readIndexResponse.getIndex());
            for (ReadIndexState state : this.states) {
                state.setIndex(readIndexResponse.getIndex());
            }
            boolean doUnlock = true;
            ReadOnlyServiceImpl.this.lock.lock();
            try {
                if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                    doUnlock = false;
                    ReadOnlyServiceImpl.this.notifySuccess(readIndexStatus);
                } else if (readIndexStatus.isOverMaxReadIndexLag(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex(), ReadOnlyServiceImpl.this.raftOptions.getMaxReadIndexLag())) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                    doUnlock = false;
                    this.notifyFail(new Status(-1, "Fail to run ReadIndex task, the gap of current node's apply index between leader's commit index over maxReadIndexLag"));
                } else {
                    ReadOnlyServiceImpl.this.pendingNotifyStatus.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList(10)).add(readIndexStatus);
                }
            }
            finally {
                if (doUnlock) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                }
            }
        }

        private void notifyFail(Status status) {
            long nowMs = Utils.monotonicMs();
            for (ReadIndexState state : this.states) {
                ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
                ReadIndexClosure done = state.getDone();
                if (done == null) continue;
                Bytes reqCtx = state.getRequestContext();
                done.run(status, -1L, reqCtx != null ? reqCtx.get() : null);
            }
        }
    }

    private class ReadIndexEventHandler
    implements EventHandler<ReadIndexEvent> {
        private final List<ReadIndexEvent> events;

        private ReadIndexEventHandler() {
            this.events = new ArrayList<ReadIndexEvent>(ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
        }

        public void onEvent(ReadIndexEvent newEvent, long sequence, boolean endOfBatch) throws Exception {
            if (newEvent.shutdownLatch != null) {
                ReadOnlyServiceImpl.this.executeReadIndexEvents(this.events);
                this.reset();
                newEvent.shutdownLatch.countDown();
                return;
            }
            this.events.add(newEvent);
            if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                ReadOnlyServiceImpl.this.executeReadIndexEvents(this.events);
                this.reset();
            }
        }

        private void reset() {
            for (ReadIndexEvent event : this.events) {
                event.reset();
            }
            this.events.clear();
        }
    }

    private static class ReadIndexEventFactory
    implements EventFactory<ReadIndexEvent> {
        private ReadIndexEventFactory() {
        }

        public ReadIndexEvent newInstance() {
            return new ReadIndexEvent();
        }
    }

    private static class ReadIndexEvent {
        ReadOnlyOption readOnlyOptions;
        Bytes requestContext;
        ReadIndexClosure done;
        CountDownLatch shutdownLatch;
        long startTime;

        private ReadIndexEvent() {
        }

        private void reset() {
            this.readOnlyOptions = null;
            this.requestContext = null;
            this.done = null;
            this.shutdownLatch = null;
            this.startTime = 0L;
        }
    }
}

