/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.journal;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.journal.JournalMessageStore;
import org.apache.activemq.store.journal.JournalTopicMessageStore;
import org.apache.activemq.store.journal.JournalTransactionStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JournalPersistenceAdapter
implements PersistenceAdapter,
JournalEventListener,
UsageListener {
    private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
    private final Journal journal;
    private final PersistenceAdapter longTermPersistence;
    private final WireFormat wireFormat = new OpenWireFormat();
    private final ConcurrentHashMap queues = new ConcurrentHashMap();
    private final ConcurrentHashMap topics = new ConcurrentHashMap();
    private UsageManager usageManager;
    private long checkpointInterval = 300000L;
    private long lastCheckpointRequest = System.currentTimeMillis();
    private long lastCleanup = System.currentTimeMillis();
    private int maxCheckpointWorkers = 10;
    private int maxCheckpointMessageAddSize = 0x100000;
    private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
    private ThreadPoolExecutor checkpointExecutor;
    private TaskRunner checkpointTask;
    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
    private boolean fullCheckPoint;
    private AtomicBoolean started = new AtomicBoolean(false);
    private final Runnable periodicCheckpointTask = this.createPeriodicCheckpointTask();

    final Runnable createPeriodicCheckpointTask() {
        return new Runnable(){

            public void run() {
                if (System.currentTimeMillis() > JournalPersistenceAdapter.this.lastCheckpointRequest + JournalPersistenceAdapter.this.checkpointInterval) {
                    JournalPersistenceAdapter.this.checkpoint(false, true);
                }
            }
        };
    }

    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
        this.journal = journal;
        journal.setJournalEventListener(this);
        this.checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){

            public boolean iterate() {
                return JournalPersistenceAdapter.this.doCheckpoint();
            }
        }, "ActiveMQ Journal Checkpoint Worker");
        this.longTermPersistence = longTermPersistence;
    }

    public void setUsageManager(UsageManager usageManager) {
        this.usageManager = usageManager;
        this.longTermPersistence.setUsageManager(usageManager);
    }

    public Set getDestinations() {
        HashSet destinations = new HashSet(this.longTermPersistence.getDestinations());
        destinations.addAll(this.queues.keySet());
        destinations.addAll(this.topics.keySet());
        return destinations;
    }

    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
        if (destination.isQueue()) {
            return this.createQueueMessageStore((ActiveMQQueue)destination);
        }
        return this.createTopicMessageStore((ActiveMQTopic)destination);
    }

    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
        JournalMessageStore store = (JournalMessageStore)this.queues.get(destination);
        if (store == null) {
            MessageStore checkpointStore = this.longTermPersistence.createQueueMessageStore(destination);
            store = new JournalMessageStore(this, checkpointStore, destination);
            this.queues.put(destination, store);
        }
        return store;
    }

    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
        JournalTopicMessageStore store = (JournalTopicMessageStore)this.topics.get(destinationName);
        if (store == null) {
            TopicMessageStore checkpointStore = this.longTermPersistence.createTopicMessageStore(destinationName);
            store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
            this.topics.put(destinationName, store);
        }
        return store;
    }

    public TransactionStore createTransactionStore() throws IOException {
        return this.transactionStore;
    }

    public long getLastMessageBrokerSequenceId() throws IOException {
        return this.longTermPersistence.getLastMessageBrokerSequenceId();
    }

    public void beginTransaction(ConnectionContext context) throws IOException {
        this.longTermPersistence.beginTransaction(context);
    }

    public void commitTransaction(ConnectionContext context) throws IOException {
        this.longTermPersistence.commitTransaction(context);
    }

    public void rollbackTransaction(ConnectionContext context) throws IOException {
        this.longTermPersistence.rollbackTransaction(context);
    }

    public synchronized void start() throws Exception {
        if (!this.started.compareAndSet(false, true)) {
            return;
        }
        this.longTermPersistence.setUseExternalMessageReferences(false);
        this.checkpointExecutor = new ThreadPoolExecutor(this.maxCheckpointWorkers, this.maxCheckpointWorkers, 30L, TimeUnit.SECONDS, (BlockingQueue)new LinkedBlockingQueue(), new ThreadFactory(){

            public Thread newThread(Runnable runable) {
                Thread t = new Thread(runable, "Journal checkpoint worker");
                t.setPriority(7);
                return t;
            }
        });
        this.checkpointExecutor.allowCoreThreadTimeOut(true);
        this.usageManager.addUsageListener(this);
        if (this.longTermPersistence instanceof JDBCPersistenceAdapter) {
            ((JDBCPersistenceAdapter)this.longTermPersistence).setCleanupPeriod(0);
        }
        this.longTermPersistence.start();
        this.createTransactionStore();
        this.recover();
        Scheduler.executePeriodically(this.periodicCheckpointTask, this.checkpointInterval / 10L);
    }

    public void stop() throws Exception {
        this.usageManager.removeUsageListener(this);
        if (!this.started.compareAndSet(true, false)) {
            return;
        }
        Scheduler.cancel(this.periodicCheckpointTask);
        this.checkpoint(true, true);
        this.checkpointTask.shutdown();
        this.checkpointExecutor.shutdown();
        this.queues.clear();
        this.topics.clear();
        IOException firstException = null;
        try {
            this.journal.close();
        }
        catch (Exception e) {
            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
        }
        this.longTermPersistence.stop();
        if (firstException != null) {
            throw firstException;
        }
    }

    public PersistenceAdapter getLongTermPersistence() {
        return this.longTermPersistence;
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    public void overflowNotification(RecordLocation safeLocation) {
        this.checkpoint(false, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkpoint(boolean sync, boolean fullCheckpoint) {
        try {
            if (this.journal == null) {
                throw new IllegalStateException("Journal is closed.");
            }
            long now = System.currentTimeMillis();
            CountDownLatch latch = null;
            JournalPersistenceAdapter journalPersistenceAdapter = this;
            synchronized (journalPersistenceAdapter) {
                latch = this.nextCheckpointCountDownLatch;
                this.lastCheckpointRequest = now;
                if (fullCheckpoint) {
                    this.fullCheckPoint = true;
                }
            }
            this.checkpointTask.wakeup();
            if (sync) {
                log.debug("Waking for checkpoint to complete.");
                latch.await();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("Request to start checkpoint failed: " + e, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean doCheckpoint() {
        boolean fullCheckpoint;
        CountDownLatch latch = null;
        JournalPersistenceAdapter journalPersistenceAdapter = this;
        synchronized (journalPersistenceAdapter) {
            latch = this.nextCheckpointCountDownLatch;
            this.nextCheckpointCountDownLatch = new CountDownLatch(1);
            fullCheckpoint = this.fullCheckPoint;
            this.fullCheckPoint = false;
        }
        try {
            FutureTask task;
            JournalMessageStore ms;
            Iterator iterator;
            log.debug("Checkpoint started.");
            RecordLocation newMark = null;
            ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(this.queues.size() + this.topics.size());
            if (fullCheckpoint) {
                iterator = this.queues.values().iterator();
                while (iterator.hasNext()) {
                    try {
                        ms = (JournalMessageStore)iterator.next();
                        task = new FutureTask(new Callable(){

                            public Object call() throws Exception {
                                return ms.checkpoint();
                            }
                        });
                        futureTasks.add(task);
                        this.checkpointExecutor.execute(task);
                    }
                    catch (Exception e) {
                        log.error("Failed to checkpoint a message store: " + e, e);
                    }
                }
            }
            iterator = this.topics.values().iterator();
            while (iterator.hasNext()) {
                try {
                    ms = (JournalTopicMessageStore)iterator.next();
                    task = new FutureTask(new Callable((JournalTopicMessageStore)ms){
                        private final /* synthetic */ JournalTopicMessageStore val$ms;
                        {
                            this.val$ms = journalTopicMessageStore;
                        }

                        public Object call() throws Exception {
                            return this.val$ms.checkpoint();
                        }
                    });
                    futureTasks.add(task);
                    this.checkpointExecutor.execute(task);
                }
                catch (Exception e) {
                    log.error("Failed to checkpoint a message store: " + e, e);
                }
            }
            try {
                Iterator iter = futureTasks.iterator();
                while (iter.hasNext()) {
                    FutureTask ft = (FutureTask)iter.next();
                    RecordLocation mark = (RecordLocation)ft.get();
                    if (!fullCheckpoint || mark == null || newMark != null && newMark.compareTo(mark) >= 0) continue;
                    newMark = mark;
                }
            }
            catch (Throwable e) {
                log.error("Failed to checkpoint a message store: " + e, e);
            }
            if (fullCheckpoint) {
                long now;
                try {
                    if (newMark != null) {
                        log.debug("Marking journal at: " + newMark);
                        this.journal.setMark(newMark, true);
                    }
                }
                catch (Exception e) {
                    log.error("Failed to mark the Journal: " + e, e);
                }
                if (this.longTermPersistence instanceof JDBCPersistenceAdapter && (now = System.currentTimeMillis()) > this.lastCleanup + this.checkpointInterval) {
                    this.lastCleanup = now;
                    ((JDBCPersistenceAdapter)this.longTermPersistence).cleanup();
                }
            }
            log.debug("Checkpoint done.");
        }
        finally {
            latch.countDown();
        }
        journalPersistenceAdapter = this;
        synchronized (journalPersistenceAdapter) {
            return this.fullCheckPoint;
        }
    }

    public DataStructure readCommand(RecordLocation location) throws IOException {
        try {
            Packet packet = this.journal.read(location);
            return (DataStructure)this.wireFormat.unmarshal(this.toByteSequence(packet));
        }
        catch (InvalidRecordLocationException e) {
            throw this.createReadException(location, e);
        }
        catch (IOException e) {
            throw this.createReadException(location, e);
        }
    }

    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
        RecordLocation pos = null;
        int transactionCounter = 0;
        log.info("Journal Recovery Started from: " + this.journal);
        ConnectionContext context = new ConnectionContext();
        block13: while ((pos = this.journal.getNextRecordLocation(pos)) != null) {
            JournalMessageStore store;
            Packet data = this.journal.read(pos);
            DataStructure c = (DataStructure)this.wireFormat.unmarshal(this.toByteSequence(data));
            if (c instanceof Message) {
                Message message = (Message)c;
                store = (JournalMessageStore)this.createMessageStore(message.getDestination());
                if (message.isInTransaction()) {
                    this.transactionStore.addMessage(store, message, pos);
                    continue;
                }
                store.replayAddMessage(context, message);
                ++transactionCounter;
                continue;
            }
            switch (c.getDataStructureType()) {
                case 52: {
                    DataStructure command = (JournalQueueAck)c;
                    store = (JournalMessageStore)this.createMessageStore(((JournalQueueAck)command).getDestination());
                    if (((JournalQueueAck)command).getMessageAck().isInTransaction()) {
                        this.transactionStore.removeMessage(store, ((JournalQueueAck)command).getMessageAck(), pos);
                        break;
                    }
                    store.replayRemoveMessage(context, ((JournalQueueAck)command).getMessageAck());
                    ++transactionCounter;
                    break;
                }
                case 50: {
                    DataStructure command = (JournalTopicAck)c;
                    store = (JournalTopicMessageStore)this.createMessageStore(((JournalTopicAck)command).getDestination());
                    if (((JournalTopicAck)command).getTransactionId() != null) {
                        this.transactionStore.acknowledge((JournalTopicMessageStore)store, (JournalTopicAck)command, pos);
                        break;
                    }
                    ((JournalTopicMessageStore)store).replayAcknowledge(context, ((JournalTopicAck)command).getClientId(), ((JournalTopicAck)command).getSubscritionName(), ((JournalTopicAck)command).getMessageId());
                    ++transactionCounter;
                    break;
                }
                case 54: {
                    DataStructure command = (JournalTransaction)c;
                    try {
                        switch (((JournalTransaction)command).getType()) {
                            case 1: {
                                this.transactionStore.replayPrepare(((JournalTransaction)command).getTransactionId());
                                break;
                            }
                            case 2: 
                            case 4: {
                                JournalTransactionStore.Tx tx = this.transactionStore.replayCommit(((JournalTransaction)command).getTransactionId(), ((JournalTransaction)command).getWasPrepared());
                                if (tx == null) break;
                                tx.getOperations();
                                Iterator iter = tx.getOperations().iterator();
                                while (iter.hasNext()) {
                                    JournalTransactionStore.TxOperation op = (JournalTransactionStore.TxOperation)iter.next();
                                    if (op.operationType == 0) {
                                        op.store.replayAddMessage(context, (Message)op.data);
                                    }
                                    if (op.operationType == 1) {
                                        op.store.replayRemoveMessage(context, (MessageAck)op.data);
                                    }
                                    if (op.operationType != 3) continue;
                                    JournalTopicAck ack = (JournalTopicAck)op.data;
                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
                                }
                                ++transactionCounter;
                                break;
                            }
                            case 3: 
                            case 5: {
                                this.transactionStore.replayRollback(((JournalTransaction)command).getTransactionId());
                            }
                        }
                        continue block13;
                    }
                    catch (IOException e) {
                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
                        break;
                    }
                }
                case 53: {
                    JournalTrace trace = (JournalTrace)c;
                    log.debug("TRACE Entry: " + trace.getMessage());
                    break;
                }
                default: {
                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
                }
            }
        }
        RecordLocation location = this.writeTraceMessage("RECOVERED", true);
        this.journal.setMark(location, true);
        log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
    }

    private IOException createReadException(RecordLocation location, Exception e) {
        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
    }

    protected IOException createWriteException(DataStructure packet, Exception e) {
        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
    }

    protected IOException createWriteException(String command, Exception e) {
        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
    }

    protected IOException createRecoveryFailedException(Exception e) {
        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
    }

    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
        if (this.started.get()) {
            return this.journal.write(this.toPacket(this.wireFormat.marshal(command)), sync);
        }
        throw new IOException("closed");
    }

    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
        JournalTrace trace = new JournalTrace();
        trace.setMessage(message);
        return this.writeCommand(trace, sync);
    }

    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
        newPercentUsage = newPercentUsage / 10 * 10;
        oldPercentUsage = oldPercentUsage / 10 * 10;
        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
            boolean sync = newPercentUsage >= 90;
            this.checkpoint(sync, true);
        }
    }

    public JournalTransactionStore getTransactionStore() {
        return this.transactionStore;
    }

    public void deleteAllMessages() throws IOException {
        try {
            JournalTrace trace = new JournalTrace();
            trace.setMessage("DELETED");
            RecordLocation location = this.journal.write(this.toPacket(this.wireFormat.marshal(trace)), false);
            this.journal.setMark(location, true);
            log.info("Journal deleted: ");
        }
        catch (IOException e) {
            throw e;
        }
        catch (Throwable e) {
            throw IOExceptionSupport.create(e);
        }
        this.longTermPersistence.setUseExternalMessageReferences(false);
        this.longTermPersistence.deleteAllMessages();
    }

    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    public int getMaxCheckpointMessageAddSize() {
        return this.maxCheckpointMessageAddSize;
    }

    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
    }

    public int getMaxCheckpointWorkers() {
        return this.maxCheckpointWorkers;
    }

    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
        this.maxCheckpointWorkers = maxCheckpointWorkers;
    }

    public boolean isUseExternalMessageReferences() {
        return false;
    }

    public void setUseExternalMessageReferences(boolean enable) {
        if (enable) {
            throw new IllegalArgumentException("The journal does not support message references.");
        }
    }

    public Packet toPacket(ByteSequence sequence) {
        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
    }

    public ByteSequence toByteSequence(Packet packet) {
        org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
        return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
    }
}

