/*
 * Decompiled with CFR 0.152.
 */
package tachyon.worker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.apache.log4j.Logger;
import tachyon.Constants;
import tachyon.Version;
import tachyon.conf.WorkerConf;
import tachyon.org.apache.thrift.TException;
import tachyon.org.apache.thrift.server.THsHaServer;
import tachyon.org.apache.thrift.server.TServer;
import tachyon.org.apache.thrift.transport.TNonblockingServerSocket;
import tachyon.org.apache.thrift.transport.TTransportException;
import tachyon.thrift.BlockInfoException;
import tachyon.thrift.Command;
import tachyon.thrift.WorkerService;
import tachyon.util.CommonUtils;
import tachyon.worker.DataServer;
import tachyon.worker.WorkerServiceHandler;
import tachyon.worker.WorkerStorage;

public class Worker
implements Runnable {
    private static final Logger LOG = Logger.getLogger((String)Constants.LOGGER_TYPE);
    private final InetSocketAddress MasterAddress;
    private final InetSocketAddress WorkerAddress;
    private TServer mServer;
    private TNonblockingServerSocket mServerTNonblockingServerSocket;
    private WorkerStorage mWorkerStorage;
    private WorkerServiceHandler mWorkerServiceHandler;
    private DataServer mDataServer;
    private Thread mDataServerThread;
    private Thread mHeartbeatThread;
    private volatile boolean mStop = false;

    private Worker(InetSocketAddress masterAddress, InetSocketAddress workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads, String dataFolder, long memoryCapacityBytes) {
        this.MasterAddress = masterAddress;
        this.WorkerAddress = workerAddress;
        this.mWorkerStorage = new WorkerStorage(this.MasterAddress, this.WorkerAddress, dataFolder, memoryCapacityBytes);
        this.mWorkerServiceHandler = new WorkerServiceHandler(this.mWorkerStorage);
        this.mDataServer = new DataServer(new InetSocketAddress(workerAddress.getHostName(), dataPort), this.mWorkerStorage);
        this.mDataServerThread = new Thread(this.mDataServer);
        this.mHeartbeatThread = new Thread(this);
        try {
            LOG.info((Object)("The worker server tries to start @ " + workerAddress));
            WorkerService.Processor<WorkerServiceHandler> processor = new WorkerService.Processor<WorkerServiceHandler>(this.mWorkerServiceHandler);
            this.mServerTNonblockingServerSocket = new TNonblockingServerSocket(workerAddress);
            this.mServer = new THsHaServer(((THsHaServer.Args)new THsHaServer.Args(this.mServerTNonblockingServerSocket).processor(processor)).workerThreads(workerThreads));
        }
        catch (TTransportException e) {
            LOG.error((Object)e.getMessage(), (Throwable)e);
            CommonUtils.runtimeException(e);
        }
    }

    @Override
    public void run() {
        long lastHeartbeatMs = System.currentTimeMillis();
        Command cmd = null;
        while (!this.mStop) {
            block14: {
                long diff = System.currentTimeMillis() - lastHeartbeatMs;
                if (diff < (long)WorkerConf.get().TO_MASTER_HEARTBEAT_INTERVAL_MS) {
                    LOG.debug((Object)("Heartbeat process takes " + diff + " ms."));
                    CommonUtils.sleepMs(LOG, (long)WorkerConf.get().TO_MASTER_HEARTBEAT_INTERVAL_MS - diff);
                } else {
                    LOG.error((Object)("Heartbeat process takes " + diff + " ms."));
                }
                try {
                    cmd = this.mWorkerStorage.heartbeat();
                    lastHeartbeatMs = System.currentTimeMillis();
                }
                catch (BlockInfoException e) {
                    LOG.error((Object)e.getMessage(), (Throwable)e);
                }
                catch (TException e) {
                    LOG.error((Object)e.getMessage(), (Throwable)e);
                    this.mWorkerStorage.resetMasterClient();
                    CommonUtils.sleepMs(LOG, 1000L);
                    cmd = null;
                    if (System.currentTimeMillis() - lastHeartbeatMs < WorkerConf.get().HEARTBEAT_TIMEOUT_MS) break block14;
                    System.exit(-1);
                }
            }
            if (cmd != null) {
                switch (cmd.mCommandType) {
                    case Unknown: {
                        LOG.error((Object)("Unknown command: " + cmd));
                        break;
                    }
                    case Nothing: {
                        LOG.debug((Object)("Nothing command: " + cmd));
                        break;
                    }
                    case Register: {
                        LOG.info((Object)("Register command: " + cmd));
                        this.mWorkerStorage.register();
                        break;
                    }
                    case Free: {
                        this.mWorkerStorage.freeBlocks(cmd.mData);
                        LOG.info((Object)("Free command: " + cmd));
                        break;
                    }
                    case Delete: {
                        LOG.info((Object)("Delete command: " + cmd));
                        break;
                    }
                    default: {
                        CommonUtils.runtimeException("Un-recognized command from master " + cmd.toString());
                    }
                }
            }
            this.mWorkerStorage.checkStatus();
        }
    }

    public static synchronized Worker createWorker(InetSocketAddress masterAddress, InetSocketAddress workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads, String localFolder, long spaceLimitBytes) {
        return new Worker(masterAddress, workerAddress, dataPort, selectorThreads, acceptQueueSizePerThreads, workerThreads, localFolder, spaceLimitBytes);
    }

    public static synchronized Worker createWorker(String masterAddress, String workerAddress, int dataPort, int selectorThreads, int acceptQueueSizePerThreads, int workerThreads, String localFolder, long spaceLimitBytes) {
        String[] address = masterAddress.split(":");
        InetSocketAddress master = new InetSocketAddress(address[0], Integer.parseInt(address[1]));
        address = workerAddress.split(":");
        InetSocketAddress worker = new InetSocketAddress(address[0], Integer.parseInt(address[1]));
        return new Worker(master, worker, dataPort, selectorThreads, acceptQueueSizePerThreads, workerThreads, localFolder, spaceLimitBytes);
    }

    public void start() {
        this.mDataServerThread.start();
        this.mHeartbeatThread.start();
        LOG.info((Object)("The worker server started @ " + this.WorkerAddress));
        this.mServer.serve();
        LOG.info((Object)("The worker server ends @ " + this.WorkerAddress));
    }

    public void stop() throws IOException, InterruptedException {
        this.mStop = true;
        this.mWorkerStorage.stop();
        this.mDataServer.close();
        this.mServer.stop();
        this.mServerTNonblockingServerSocket.close();
        while (!this.mDataServer.isClosed() || this.mServer.isServing() || this.mHeartbeatThread.isAlive()) {
            this.mServer.stop();
            this.mServerTNonblockingServerSocket.close();
            CommonUtils.sleepMs(null, 100L);
        }
        this.mHeartbeatThread.join();
    }

    private static String getMasterLocation(String[] args) {
        String masterLocation;
        WorkerConf wConf = WorkerConf.get();
        String confFileMasterLoc = wConf.MASTER_HOSTNAME + ":" + wConf.MASTER_PORT;
        if (args.length < 2) {
            masterLocation = confFileMasterLoc;
        } else {
            masterLocation = args[1];
            if (masterLocation.indexOf(":") == -1) {
                masterLocation = masterLocation + ":" + wConf.MASTER_PORT;
            }
            if (!masterLocation.equals(confFileMasterLoc)) {
                LOG.warn((Object)("Master Address in configuration file(" + confFileMasterLoc + ") is different " + "from the command line one(" + masterLocation + ")."));
            }
        }
        return masterLocation;
    }

    public static void main(String[] args) throws UnknownHostException {
        if (args.length < 1 || args.length > 2) {
            LOG.info((Object)("Usage: java -cp target/tachyon-" + Version.VERSION + "-jar-with-dependencies.jar " + "tachyon.Worker <WorkerHost> [<MasterHost:Port>]"));
            System.exit(-1);
        }
        WorkerConf wConf = WorkerConf.get();
        Worker worker = Worker.createWorker(Worker.getMasterLocation(args), args[0] + ":" + wConf.PORT, wConf.DATA_PORT, wConf.SELECTOR_THREADS, wConf.QUEUE_SIZE_PER_SELECTOR, wConf.SERVER_THREADS, wConf.DATA_FOLDER, wConf.MEMORY_SIZE);
        worker.start();
    }

    WorkerServiceHandler getWorkerServiceHandler() {
        return this.mWorkerServiceHandler;
    }
}

