/*
 * Decompiled with CFR 0.152.
 */
package zipkin.storage.cassandra3;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.DependencyLinker;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.MergeById;
import zipkin.internal.Nullable;
import zipkin.internal.Util;
import zipkin.storage.QueryRequest;
import zipkin.storage.cassandra3.CassandraUtil;
import zipkin.storage.cassandra3.Schema;
import zipkin.storage.guava.GuavaSpanStore;

final class CassandraSpanStore
implements GuavaSpanStore {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
    static final ListenableFuture<List<String>> EMPTY_LIST = Futures.immediateFuture(Collections.emptyList());
    static final Function<List<Span>, List<Span>> OR_NULL = new Function<List<Span>, List<Span>>(){

        public List<Span> apply(List<Span> input) {
            return input.isEmpty() ? null : input;
        }
    };
    private final int maxTraceCols;
    private final int indexFetchMultiplier;
    private final boolean strictTraceId;
    private final Session session;
    private final PreparedStatement selectTraces;
    private final PreparedStatement selectDependencies;
    private final PreparedStatement selectServiceNames;
    private final PreparedStatement selectSpanNames;
    private final PreparedStatement selectTraceIdsByServiceSpanName;
    private final PreparedStatement selectTraceIdsByServiceSpanNameAndDuration;
    private final PreparedStatement selectTraceIdsByAnnotation;
    private final Function<ResultSet, Map<Schema.TraceIdUDT, Long>> traceIdToTimestamp;
    private final Function<List<Map<Schema.TraceIdUDT, Long>>, Map<Schema.TraceIdUDT, Long>> collapseTraceIdMaps;
    private final int traceTtl;
    private final int indexTtl;

    CassandraSpanStore(Session session, int maxTraceCols, int indexFetchMultiplier, boolean strictTraceId) {
        this.session = session;
        this.maxTraceCols = maxTraceCols;
        this.indexFetchMultiplier = indexFetchMultiplier;
        this.strictTraceId = strictTraceId;
        this.selectTraces = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"trace_id", "id", "ts", "span_name", "parent_id", "duration", "annotations", "binary_annotations"}).from("traces").where(QueryBuilder.in((String)"trace_id", (Object[])new Object[]{QueryBuilder.bindMarker((String)"trace_id")})).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectDependencies = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"links"}).from("dependencies").where(QueryBuilder.in((String)"day", (Object[])new Object[]{QueryBuilder.bindMarker((String)"days")})));
        this.selectServiceNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"service_name"}).distinct().from("span_name_by_service"));
        this.selectSpanNames = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"span_name"}).from("span_name_by_service").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByServiceSpanName = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("trace_by_service_span").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"span_name", (Object)QueryBuilder.bindMarker((String)"span_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"bucket"))).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByServiceSpanNameAndDuration = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("trace_by_service_span").where(QueryBuilder.eq((String)"service_name", (Object)QueryBuilder.bindMarker((String)"service_name"))).and(QueryBuilder.eq((String)"span_name", (Object)QueryBuilder.bindMarker((String)"span_name"))).and(QueryBuilder.eq((String)"bucket", (Object)QueryBuilder.bindMarker((String)"bucket"))).and(QueryBuilder.gte((String)"ts", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts", (Object)QueryBuilder.bindMarker((String)"end_ts"))).and(QueryBuilder.gte((String)"duration", (Object)QueryBuilder.bindMarker((String)"start_duration"))).and(QueryBuilder.lte((String)"duration", (Object)QueryBuilder.bindMarker((String)"end_duration"))).limit(QueryBuilder.bindMarker((String)"limit_")));
        this.selectTraceIdsByAnnotation = session.prepare((RegularStatement)QueryBuilder.select((String[])new String[]{"ts", "trace_id"}).from("traces").where(QueryBuilder.like((String)"all_annotations", (Object)QueryBuilder.bindMarker((String)"annotation"))).and(QueryBuilder.gte((String)"ts_uuid", (Object)QueryBuilder.bindMarker((String)"start_ts"))).and(QueryBuilder.lte((String)"ts_uuid", (Object)QueryBuilder.bindMarker((String)"end_ts"))).limit(QueryBuilder.bindMarker((String)"limit_")).allowFiltering());
        this.traceIdToTimestamp = new Function<ResultSet, Map<Schema.TraceIdUDT, Long>>(){

            public Map<Schema.TraceIdUDT, Long> apply(ResultSet input) {
                LinkedHashMap<Schema.TraceIdUDT, Long> traceIdsToTimestamps = new LinkedHashMap<Schema.TraceIdUDT, Long>();
                for (Row row : input) {
                    traceIdsToTimestamps.put((Schema.TraceIdUDT)row.get("trace_id", Schema.TraceIdUDT.class), UUIDs.unixTimestamp((UUID)row.getUUID("ts")));
                }
                return traceIdsToTimestamps;
            }
        };
        this.collapseTraceIdMaps = new Function<List<Map<Schema.TraceIdUDT, Long>>, Map<Schema.TraceIdUDT, Long>>(){

            public Map<Schema.TraceIdUDT, Long> apply(List<Map<Schema.TraceIdUDT, Long>> input) {
                LinkedHashMap<Schema.TraceIdUDT, Long> result = new LinkedHashMap<Schema.TraceIdUDT, Long>();
                for (Map<Schema.TraceIdUDT, Long> m : input) {
                    result.putAll(m);
                }
                return result;
            }
        };
        KeyspaceMetadata md = Schema.getKeyspaceMetadata(session);
        this.traceTtl = md.getTable("traces").getOptions().getDefaultTimeToLive();
        this.indexTtl = md.getTable("trace_by_service_span").getOptions().getDefaultTimeToLive();
    }

    public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request) {
        ListenableFuture traceIds;
        int traceIndexFetchSize = request.limit * this.indexFetchMultiplier;
        ListenableFuture<Map<Schema.TraceIdUDT, Long>> traceIdToTimestamp = this.getTraceIdsByServiceNames(request);
        List<String> annotationKeys = CassandraUtil.annotationKeys(request);
        if (annotationKeys.isEmpty()) {
            traceIds = Futures.transform(traceIdToTimestamp, CassandraUtil.traceIdsSortedByDescTimestamp());
        } else {
            ArrayList<ListenableFuture<Map<Schema.TraceIdUDT, Long>>> futureKeySetsToIntersect = new ArrayList<ListenableFuture<Map<Schema.TraceIdUDT, Long>>>();
            futureKeySetsToIntersect.add(traceIdToTimestamp);
            for (String annotationKey : annotationKeys) {
                futureKeySetsToIntersect.add(this.getTraceIdsByAnnotation(annotationKey, request.endTs, request.lookback, traceIndexFetchSize));
            }
            traceIds = Futures.transform((ListenableFuture)Futures.allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets());
        }
        return Futures.transform((ListenableFuture)traceIds, (AsyncFunction)new AsyncFunction<Collection<Schema.TraceIdUDT>, List<List<Span>>>(){

            public ListenableFuture<List<List<Span>>> apply(Collection<Schema.TraceIdUDT> traceIds) {
                ImmutableSet set = ImmutableSet.copyOf((Iterator)Iterators.limit(traceIds.iterator(), (int)request.limit));
                return Futures.transform(CassandraSpanStore.this.getSpansByTraceIds((Set<Schema.TraceIdUDT>)set, CassandraSpanStore.this.maxTraceCols), (Function)new Function<List<Span>, List<List<Span>>>(){

                    public List<List<Span>> apply(List<Span> input) {
                        return GroupByTraceId.apply(input, (boolean)CassandraSpanStore.this.strictTraceId, (boolean)true);
                    }
                });
            }

            public String toString() {
                return "getSpansByTraceIds";
            }
        });
    }

    public ListenableFuture<List<Span>> getRawTrace(long traceId) {
        return this.getRawTrace(0L, traceId);
    }

    public ListenableFuture<List<Span>> getRawTrace(long traceIdHigh, long traceIdLow) {
        Schema.TraceIdUDT traceIdUDT = new Schema.TraceIdUDT(this.strictTraceId ? 0L : traceIdHigh, traceIdLow);
        return Futures.transform(this.getSpansByTraceIds(Collections.singleton(traceIdUDT), this.maxTraceCols), OR_NULL);
    }

    public ListenableFuture<List<Span>> getTrace(long traceId) {
        return this.getTrace(0L, traceId);
    }

    public ListenableFuture<List<Span>> getTrace(long traceIdHigh, long traceIdLow) {
        return Futures.transform(this.getRawTrace(traceIdHigh, traceIdLow), (Function)AdjustTrace.INSTANCE);
    }

    public ListenableFuture<List<String>> getServiceNames() {
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectServiceNames, "select-service-names");
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    LinkedHashSet<String> serviceNames = new LinkedHashSet<String>();
                    for (Row row : input) {
                        serviceNames.add(row.getString("service_name"));
                    }
                    return Ordering.natural().sortedCopy(serviceNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<String>> getSpanNames(String serviceName) {
        if (serviceName == null || serviceName.isEmpty()) {
            return EMPTY_LIST;
        }
        serviceName = ((String)Preconditions.checkNotNull((Object)serviceName, (Object)"serviceName")).toLowerCase();
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectSpanNames, "select-span-names").setString("service_name", serviceName).setInt("limit_", 1000);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<String>>(){

                public List<String> apply(ResultSet input) {
                    LinkedHashSet<String> spanNames = new LinkedHashSet<String>();
                    for (Row row : input) {
                        if (row.getString("span_name").isEmpty()) continue;
                        spanNames.add(row.getString("span_name"));
                    }
                    return Ordering.natural().sortedCopy(spanNames);
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    public ListenableFuture<List<DependencyLink>> getDependencies(long endTs, @Nullable Long lookback) {
        List days = Util.getDays((long)endTs, (Long)lookback);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectDependencies, "select-dependencies").setList("days", days);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)ConvertDependenciesResponse.INSTANCE);
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<List<Span>> getSpansByTraceIds(Set<Schema.TraceIdUDT> traceIds, int limit) {
        Preconditions.checkNotNull(traceIds, (Object)"traceIds");
        if (traceIds.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyList());
        }
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraces, "select-traces").setSet("trace_id", traceIds).setInt("limit_", limit);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, List<Span>>(){

                public List<Span> apply(ResultSet input) {
                    ArrayList<Span> result = new ArrayList<Span>(input.getAvailableWithoutFetching());
                    for (Row row : input) {
                        Schema.TraceIdUDT traceId = (Schema.TraceIdUDT)row.get("trace_id", Schema.TraceIdUDT.class);
                        Span.Builder builder = Span.builder().traceIdHigh(traceId.getHigh().longValue()).traceId(traceId.getLow()).id(row.getLong("id")).name(row.getString("span_name")).duration(Long.valueOf(row.getLong("duration")));
                        if (!row.isNull("ts")) {
                            builder = builder.timestamp(Long.valueOf(row.getLong("ts")));
                        }
                        if (!row.isNull("duration")) {
                            builder = builder.duration(Long.valueOf(row.getLong("duration")));
                        }
                        if (!row.isNull("parent_id")) {
                            builder = builder.parentId(Long.valueOf(row.getLong("parent_id")));
                        }
                        for (Object udt : row.getList("annotations", Schema.AnnotationUDT.class)) {
                            builder = builder.addAnnotation(((Schema.AnnotationUDT)udt).toAnnotation());
                        }
                        for (Object udt : row.getList("binary_annotations", Schema.BinaryAnnotationUDT.class)) {
                            builder = builder.addBinaryAnnotation(((Schema.BinaryAnnotationUDT)udt).toBinaryAnnotation());
                        }
                        result.add(builder.build());
                    }
                    return result;
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Schema.TraceIdUDT, Long>> getTraceIdsByServiceNames(QueryRequest request) {
        long oldestData = this.indexTtl == 0 ? 0L : System.currentTimeMillis() - (long)(this.indexTtl * 1000);
        long startTsMillis = Math.max(request.endTs - request.lookback, oldestData);
        long endTsMillis = Math.max(request.endTs, oldestData);
        try {
            Set<String> serviceNames;
            if (null != request.serviceName) {
                serviceNames = Collections.singleton(request.serviceName);
            } else {
                serviceNames = new LinkedHashSet<String>((Collection)this.getServiceNames().get());
                if (serviceNames.isEmpty()) {
                    return Futures.immediateFuture(Collections.emptyMap());
                }
            }
            int startBucket = CassandraUtil.durationIndexBucket(startTsMillis * 1000L);
            int endBucket = CassandraUtil.durationIndexBucket(endTsMillis * 1000L);
            if (startBucket > endBucket) {
                throw new IllegalArgumentException("Start bucket (" + startBucket + ") > end bucket (" + endBucket + ")");
            }
            ContiguousSet buckets = ContiguousSet.create((Range)Range.closed((Comparable)Integer.valueOf(startBucket), (Comparable)Integer.valueOf(endBucket)), (DiscreteDomain)DiscreteDomain.integers());
            boolean withDuration = null != request.minDuration || null != request.maxDuration;
            ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
            if (200 < serviceNames.size() * buckets.size()) {
                LOG.warn("read against trace_by_service_span fanning out to " + serviceNames.size() * buckets.size() + " requests");
            }
            for (String serviceName : serviceNames) {
                for (Integer bucket : buckets) {
                    BoundStatement bound = CassandraUtil.bindWithName(withDuration ? this.selectTraceIdsByServiceSpanNameAndDuration : this.selectTraceIdsByServiceSpanName, "select-trace-ids-by-service-name").setString("service_name", serviceName).setString("span_name", null != request.spanName ? request.spanName : "").setInt("bucket", bucket.intValue()).setUUID("start_ts", UUIDs.startOf((long)startTsMillis)).setUUID("end_ts", UUIDs.endOf((long)endTsMillis)).setInt("limit_", request.limit);
                    if (withDuration) {
                        bound = bound.setLong("start_duration", null != request.minDuration ? request.minDuration : 0L).setLong("end_duration", null != request.maxDuration ? request.maxDuration : Long.MAX_VALUE);
                    }
                    bound.setFetchSize(Integer.MAX_VALUE);
                    futures.add(Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), this.traceIdToTimestamp));
                }
            }
            return Futures.transform((ListenableFuture)Futures.allAsList(futures), this.collapseTraceIdMaps);
        }
        catch (InterruptedException | RuntimeException | ExecutionException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    ListenableFuture<Map<Schema.TraceIdUDT, Long>> getTraceIdsByAnnotation(String annotationKey, long endTsMillis, long lookbackMillis, int limit) {
        long oldestData = this.traceTtl == 0 ? 0L : System.currentTimeMillis() - (long)(this.traceTtl * 1000);
        long startTsMillis = Math.max(endTsMillis - lookbackMillis, oldestData);
        endTsMillis = Math.max(endTsMillis, oldestData);
        try {
            BoundStatement bound = CassandraUtil.bindWithName(this.selectTraceIdsByAnnotation, "select-trace-ids-by-annotation").setString("annotation", "%" + annotationKey + "%").setUUID("start_ts", UUIDs.startOf((long)startTsMillis)).setUUID("end_ts", UUIDs.endOf((long)endTsMillis)).setInt("limit_", limit);
            return Futures.transform((ListenableFuture)this.session.executeAsync((Statement)bound), (Function)new Function<ResultSet, Map<Schema.TraceIdUDT, Long>>(){

                public Map<Schema.TraceIdUDT, Long> apply(ResultSet input) {
                    LinkedHashMap<Schema.TraceIdUDT, Long> traceIdsToTimestamps = new LinkedHashMap<Schema.TraceIdUDT, Long>();
                    for (Row row : input) {
                        traceIdsToTimestamps.put((Schema.TraceIdUDT)row.get("trace_id", Schema.TraceIdUDT.class), row.getLong("ts"));
                    }
                    return traceIdsToTimestamps;
                }
            });
        }
        catch (RuntimeException ex) {
            return Futures.immediateFailedFuture((Throwable)ex);
        }
    }

    static enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyLink>>
    {
        INSTANCE;


        public List<DependencyLink> apply(ResultSet rs) {
            ImmutableList.Builder unmerged = ImmutableList.builder();
            for (Row row : rs) {
                ByteBuffer encodedDayOfDependencies = row.getBytes("links");
                for (DependencyLink link : Codec.THRIFT.readDependencyLinks(encodedDayOfDependencies)) {
                    unmerged.add((Object)link);
                }
            }
            return DependencyLinker.merge((Iterable)unmerged.build());
        }
    }

    static enum AdjustTrace implements Function<Collection<Span>, List<Span>>
    {
        INSTANCE;


        public List<Span> apply(Collection<Span> input) {
            List result = CorrectForClockSkew.apply((List)MergeById.apply(input));
            return result.isEmpty() ? null : result;
        }
    }
}

