/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.core;

import com.mongodb.ClientSessionOptions;
import com.mongodb.CursorType;
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.ValidationOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.DistinctPublisher;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MapReducePublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.Metric;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.PersistentProperty;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.context.MappingContextEvent;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.AggregationUtil;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.CountQuery;
import org.springframework.data.mongodb.core.DefaultReactiveIndexOperations;
import org.springframework.data.mongodb.core.DefaultWriteConcernResolver;
import org.springframework.data.mongodb.core.EntityOperations;
import org.springframework.data.mongodb.core.FindAndModifyOptions;
import org.springframework.data.mongodb.core.FindAndReplaceOptions;
import org.springframework.data.mongodb.core.FindPublisherPreparer;
import org.springframework.data.mongodb.core.MappedDocument;
import org.springframework.data.mongodb.core.MongoAction;
import org.springframework.data.mongodb.core.MongoActionOperation;
import org.springframework.data.mongodb.core.PropertyOperations;
import org.springframework.data.mongodb.core.QueryOperations;
import org.springframework.data.mongodb.core.ReactiveAggregationOperation;
import org.springframework.data.mongodb.core.ReactiveAggregationOperationSupport;
import org.springframework.data.mongodb.core.ReactiveChangeStreamOperation;
import org.springframework.data.mongodb.core.ReactiveChangeStreamOperationSupport;
import org.springframework.data.mongodb.core.ReactiveCollectionCallback;
import org.springframework.data.mongodb.core.ReactiveDatabaseCallback;
import org.springframework.data.mongodb.core.ReactiveFindOperation;
import org.springframework.data.mongodb.core.ReactiveFindOperationSupport;
import org.springframework.data.mongodb.core.ReactiveInsertOperation;
import org.springframework.data.mongodb.core.ReactiveInsertOperationSupport;
import org.springframework.data.mongodb.core.ReactiveMapReduceOperation;
import org.springframework.data.mongodb.core.ReactiveMapReduceOperationSupport;
import org.springframework.data.mongodb.core.ReactiveMongoContext;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.ReactiveRemoveOperation;
import org.springframework.data.mongodb.core.ReactiveRemoveOperationSupport;
import org.springframework.data.mongodb.core.ReactiveSessionCallback;
import org.springframework.data.mongodb.core.ReactiveSessionScoped;
import org.springframework.data.mongodb.core.ReactiveUpdateOperation;
import org.springframework.data.mongodb.core.ReactiveUpdateOperationSupport;
import org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.WriteConcernResolver;
import org.springframework.data.mongodb.core.WriteResultChecking;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
import org.springframework.data.mongodb.core.convert.JsonSchemaMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.convert.MongoJsonSchemaMapper;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.index.MongoMappingEventPublisher;
import org.springframework.data.mongodb.core.index.ReactiveIndexOperations;
import org.springframework.data.mongodb.core.index.ReactiveMongoPersistentEntityIndexCreator;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.mapping.event.AfterConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.mongodb.core.mapping.event.AfterLoadEvent;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.validation.Validator;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.NumberUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ReactiveMongoTemplate
implements ReactiveMongoOperations,
ApplicationContextAware {
    public static final DbRefResolver NO_OP_REF_RESOLVER = NoOpDbRefResolver.INSTANCE;
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMongoTemplate.class);
    private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;
    private static final Collection<Class<?>> ITERABLE_CLASSES;
    private final MongoConverter mongoConverter;
    private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
    private final ReactiveMongoDatabaseFactory mongoDatabaseFactory;
    private final PersistenceExceptionTranslator exceptionTranslator;
    private final QueryMapper queryMapper;
    private final UpdateMapper updateMapper;
    private final JsonSchemaMapper schemaMapper;
    private final SpelAwareProxyProjectionFactory projectionFactory;
    private final ApplicationListener<MappingContextEvent<?, ?>> indexCreatorListener;
    private final EntityOperations operations;
    private final PropertyOperations propertyOperations;
    private final QueryOperations queryOperations;
    @Nullable
    private WriteConcern writeConcern;
    private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
    private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
    @Nullable
    private ReadPreference readPreference;
    @Nullable
    private ApplicationEventPublisher eventPublisher;
    @Nullable
    private ReactiveEntityCallbacks entityCallbacks;
    @Nullable
    private ReactiveMongoPersistentEntityIndexCreator indexCreator;
    private SessionSynchronization sessionSynchronization = SessionSynchronization.ON_ACTUAL_TRANSACTION;

    public ReactiveMongoTemplate(MongoClient mongoClient, String databaseName) {
        this((ReactiveMongoDatabaseFactory)new SimpleReactiveMongoDatabaseFactory(mongoClient, databaseName), (MongoConverter)null);
    }

    public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory) {
        this(mongoDatabaseFactory, (MongoConverter)null);
    }

    public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory, @Nullable MongoConverter mongoConverter) {
        this(mongoDatabaseFactory, mongoConverter, ReactiveMongoTemplate::handleSubscriptionException);
    }

    public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory, @Nullable MongoConverter mongoConverter, Consumer<Throwable> subscriptionExceptionHandler) {
        MongoMappingContext mongoMappingContext;
        Assert.notNull((Object)mongoDatabaseFactory, (String)"ReactiveMongoDatabaseFactory must not be null!");
        this.mongoDatabaseFactory = mongoDatabaseFactory;
        this.exceptionTranslator = mongoDatabaseFactory.getExceptionTranslator();
        this.mongoConverter = mongoConverter == null ? this.getDefaultMongoConverter() : mongoConverter;
        this.queryMapper = new QueryMapper(this.mongoConverter);
        this.updateMapper = new UpdateMapper(this.mongoConverter);
        this.schemaMapper = new MongoJsonSchemaMapper(this.mongoConverter);
        this.projectionFactory = new SpelAwareProxyProjectionFactory();
        this.indexCreatorListener = new IndexCreatorEventListener(subscriptionExceptionHandler);
        this.mappingContext = this.mongoConverter.getMappingContext();
        this.operations = new EntityOperations(this.mappingContext);
        this.propertyOperations = new PropertyOperations(this.mappingContext);
        this.queryOperations = new QueryOperations(this.queryMapper, this.updateMapper, this.operations, this.propertyOperations, mongoDatabaseFactory);
        if (this.mappingContext instanceof MongoMappingContext && (mongoMappingContext = (MongoMappingContext)this.mappingContext).isAutoIndexCreation()) {
            this.indexCreator = new ReactiveMongoPersistentEntityIndexCreator(mongoMappingContext, this::indexOps);
            this.eventPublisher = new MongoMappingEventPublisher(this.indexCreatorListener);
            mongoMappingContext.setApplicationEventPublisher(this.eventPublisher);
            this.mappingContext.getPersistentEntities().forEach(entity -> this.onCheckForIndexes((MongoPersistentEntity<?>)entity, subscriptionExceptionHandler));
        }
    }

    private ReactiveMongoTemplate(ReactiveMongoDatabaseFactory dbFactory, ReactiveMongoTemplate that) {
        this.mongoDatabaseFactory = dbFactory;
        this.exceptionTranslator = that.exceptionTranslator;
        this.mongoConverter = that.mongoConverter;
        this.queryMapper = that.queryMapper;
        this.updateMapper = that.updateMapper;
        this.schemaMapper = that.schemaMapper;
        this.projectionFactory = that.projectionFactory;
        this.indexCreator = that.indexCreator;
        this.indexCreatorListener = that.indexCreatorListener;
        this.mappingContext = that.mappingContext;
        this.operations = that.operations;
        this.propertyOperations = that.propertyOperations;
        this.sessionSynchronization = that.sessionSynchronization;
        this.queryOperations = that.queryOperations;
    }

    private void onCheckForIndexes(MongoPersistentEntity<?> entity, Consumer<Throwable> subscriptionExceptionHandler) {
        if (this.indexCreator != null) {
            this.indexCreator.checkForIndexes(entity).subscribe(v -> {}, subscriptionExceptionHandler);
        }
    }

    private static void handleSubscriptionException(Throwable t) {
        LOGGER.error("Unexpected exception during asynchronous execution", t);
    }

    public void setWriteResultChecking(@Nullable WriteResultChecking resultChecking) {
        this.writeResultChecking = resultChecking == null ? DEFAULT_WRITE_RESULT_CHECKING : resultChecking;
    }

    public void setWriteConcern(@Nullable WriteConcern writeConcern) {
        this.writeConcern = writeConcern;
    }

    public void setWriteConcernResolver(@Nullable WriteConcernResolver writeConcernResolver) {
        this.writeConcernResolver = writeConcernResolver;
    }

    public void setReadPreference(ReadPreference readPreference) {
        this.readPreference = readPreference;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.prepareIndexCreator(applicationContext);
        this.eventPublisher = applicationContext;
        if (this.entityCallbacks == null) {
            this.setEntityCallbacks(ReactiveEntityCallbacks.create((BeanFactory)applicationContext));
        }
        if (this.mappingContext instanceof ApplicationEventPublisherAware) {
            ((ApplicationEventPublisherAware)this.mappingContext).setApplicationEventPublisher(this.eventPublisher);
        }
        this.projectionFactory.setBeanFactory((BeanFactory)applicationContext);
        this.projectionFactory.setBeanClassLoader(applicationContext.getClassLoader());
    }

    public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {
        Assert.notNull((Object)entityCallbacks, (String)"EntityCallbacks must not be null!");
        this.entityCallbacks = entityCallbacks;
    }

    private void prepareIndexCreator(ApplicationContext context) {
        String[] indexCreators;
        for (String creator : indexCreators = context.getBeanNamesForType(ReactiveMongoPersistentEntityIndexCreator.class)) {
            ReactiveMongoPersistentEntityIndexCreator creatorBean = (ReactiveMongoPersistentEntityIndexCreator)context.getBean(creator, ReactiveMongoPersistentEntityIndexCreator.class);
            if (!creatorBean.isIndexCreatorFor(this.mappingContext)) continue;
            return;
        }
        if (context instanceof ConfigurableApplicationContext) {
            ((ConfigurableApplicationContext)context).addApplicationListener(this.indexCreatorListener);
        }
    }

    @Override
    public MongoConverter getConverter() {
        return this.mongoConverter;
    }

    @Override
    public ReactiveIndexOperations indexOps(String collectionName) {
        return new DefaultReactiveIndexOperations(this, collectionName, this.queryMapper);
    }

    @Override
    public ReactiveIndexOperations indexOps(Class<?> entityClass) {
        return new DefaultReactiveIndexOperations((ReactiveMongoOperations)this, this.getCollectionName(entityClass), this.queryMapper, entityClass);
    }

    @Override
    public String getCollectionName(Class<?> entityClass) {
        return this.operations.determineCollectionName(entityClass);
    }

    @Override
    public Mono<Document> executeCommand(String jsonCommand) {
        Assert.notNull((Object)jsonCommand, (String)"Command must not be empty!");
        return this.executeCommand(Document.parse((String)jsonCommand));
    }

    @Override
    public Mono<Document> executeCommand(Document command) {
        return this.executeCommand(command, null);
    }

    @Override
    public Mono<Document> executeCommand(Document command, @Nullable ReadPreference readPreference) {
        Assert.notNull((Object)command, (String)"Command must not be null!");
        return this.createFlux(db -> readPreference != null ? db.runCommand((Bson)command, readPreference, Document.class) : db.runCommand((Bson)command, Document.class)).next();
    }

    @Override
    public <T> Flux<T> execute(Class<?> entityClass, ReactiveCollectionCallback<T> action) {
        return this.createFlux(this.getCollectionName(entityClass), action);
    }

    @Override
    public <T> Flux<T> execute(ReactiveDatabaseCallback<T> action) {
        return this.createFlux(action);
    }

    @Override
    public <T> Flux<T> execute(String collectionName, ReactiveCollectionCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveCollectionCallback must not be null!");
        return this.createFlux(collectionName, callback);
    }

    @Override
    public ReactiveSessionScoped withSession(Publisher<ClientSession> sessionProvider) {
        final Mono cachedSession = Mono.from(sessionProvider).cache();
        return new ReactiveSessionScoped(){

            @Override
            public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
                return cachedSession.flatMapMany(session -> ReactiveMongoTemplate.this.withSession(action, session).doFinally(signalType -> doFinally.accept((ClientSession)session)));
            }
        };
    }

    public void setSessionSynchronization(SessionSynchronization sessionSynchronization) {
        this.sessionSynchronization = sessionSynchronization;
    }

    @Override
    public ReactiveSessionScoped inTransaction() {
        return this.inTransaction((Publisher<ClientSession>)this.mongoDatabaseFactory.getSession(ClientSessionOptions.builder().causallyConsistent(true).build()));
    }

    @Override
    public ReactiveSessionScoped inTransaction(Publisher<ClientSession> sessionProvider) {
        final Mono cachedSession = Mono.from(sessionProvider).cache();
        return new ReactiveSessionScoped(){

            @Override
            public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
                return cachedSession.flatMapMany(session -> {
                    if (!session.hasActiveTransaction()) {
                        session.startTransaction();
                    }
                    return Flux.usingWhen((Publisher)Mono.just((Object)session), s -> ReactiveMongoTemplate.this.withSession(action, s), ClientSession::commitTransaction, (sess, err) -> sess.abortTransaction(), ClientSession::commitTransaction).doFinally(signalType -> doFinally.accept((ClientSession)session));
                });
            }
        };
    }

    private <T> Flux<T> withSession(ReactiveSessionCallback<T> action, ClientSession session) {
        ReactiveSessionBoundMongoTemplate operations = new ReactiveSessionBoundMongoTemplate(session, this);
        return Flux.from(action.doInSession(operations)).subscriberContext(ctx -> ReactiveMongoContext.setSession(ctx, (Publisher<ClientSession>)Mono.just((Object)session)));
    }

    @Override
    public ReactiveMongoOperations withSession(ClientSession session) {
        return new ReactiveSessionBoundMongoTemplate(session, this);
    }

    @Override
    public ReactiveSessionScoped withSession(ClientSessionOptions sessionOptions) {
        return this.withSession((Publisher<ClientSession>)this.mongoDatabaseFactory.getSession(sessionOptions));
    }

    public <T> Flux<T> createFlux(ReactiveDatabaseCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveDatabaseCallback must not be null!");
        return Mono.defer(this::doGetDatabase).flatMapMany(database -> callback.doInDB(this.prepareDatabase((MongoDatabase)database))).onErrorMap(this.translateException());
    }

    public <T> Mono<T> createMono(ReactiveDatabaseCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveDatabaseCallback must not be null!");
        return Mono.defer(this::doGetDatabase).flatMap(database -> Mono.from(callback.doInDB(this.prepareDatabase((MongoDatabase)database)))).onErrorMap(this.translateException());
    }

    public <T> Flux<T> createFlux(String collectionName, ReactiveCollectionCallback<T> callback) {
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        Assert.notNull(callback, (String)"ReactiveDatabaseCallback must not be null!");
        Mono collectionPublisher = this.doGetDatabase().map(database -> this.getAndPrepareCollection((MongoDatabase)database, collectionName));
        return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(this.translateException());
    }

    public <T> Mono<T> createMono(String collectionName, ReactiveCollectionCallback<T> callback) {
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        Assert.notNull(callback, (String)"ReactiveCollectionCallback must not be null!");
        Mono collectionPublisher = this.doGetDatabase().map(database -> this.getAndPrepareCollection((MongoDatabase)database, collectionName));
        return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection((MongoCollection<Document>)collection))).onErrorMap(this.translateException());
    }

    @Override
    public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass) {
        return this.createCollection(entityClass, CollectionOptions.empty());
    }

    @Override
    public <T> Mono<MongoCollection<Document>> createCollection(Class<T> entityClass, @Nullable CollectionOptions collectionOptions) {
        Assert.notNull(entityClass, (String)"EntityClass must not be null!");
        CollectionOptions options = collectionOptions != null ? collectionOptions : CollectionOptions.empty();
        options = Optionals.firstNonEmpty((Supplier[])new Supplier[]{() -> Optional.ofNullable(collectionOptions).flatMap(CollectionOptions::getCollation), () -> this.operations.forType(entityClass).getCollation()}).map(options::collation).orElse(options);
        return this.doCreateCollection(this.getCollectionName(entityClass), this.convertToCreateCollectionOptions(options, entityClass));
    }

    @Override
    public Mono<MongoCollection<Document>> createCollection(String collectionName) {
        return this.doCreateCollection(collectionName, new CreateCollectionOptions());
    }

    @Override
    public Mono<MongoCollection<Document>> createCollection(String collectionName, @Nullable CollectionOptions collectionOptions) {
        return this.doCreateCollection(collectionName, this.convertToCreateCollectionOptions(collectionOptions));
    }

    @Override
    public Mono<MongoCollection<Document>> getCollection(String collectionName) {
        Assert.notNull((Object)collectionName, (String)"Collection name must not be null!");
        return this.createMono(db -> Mono.just((Object)db.getCollection(collectionName)));
    }

    @Override
    public <T> Mono<Boolean> collectionExists(Class<T> entityClass) {
        return this.collectionExists(this.getCollectionName(entityClass));
    }

    @Override
    public Mono<Boolean> collectionExists(String collectionName) {
        return this.createMono(db -> Flux.from((Publisher)db.listCollectionNames()).filter(s -> s.equals(collectionName)).map(s -> true).single((Object)false));
    }

    @Override
    public <T> Mono<Void> dropCollection(Class<T> entityClass) {
        return this.dropCollection(this.getCollectionName(entityClass));
    }

    @Override
    public Mono<Void> dropCollection(String collectionName) {
        return this.createMono(collectionName, MongoCollection::drop).doOnSuccess(success -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Dropped collection [" + collectionName + "]");
            }
        }).then();
    }

    @Override
    public Flux<String> getCollectionNames() {
        return this.createFlux(MongoDatabase::listCollectionNames);
    }

    public Mono<MongoDatabase> getMongoDatabase() {
        return this.mongoDatabaseFactory.getMongoDatabase();
    }

    protected Mono<MongoDatabase> doGetDatabase() {
        return ReactiveMongoDatabaseUtils.getDatabase(this.mongoDatabaseFactory, this.sessionSynchronization);
    }

    @Override
    public <T> Mono<T> findOne(Query query, Class<T> entityClass) {
        return this.findOne(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Mono<T> findOne(Query query, Class<T> entityClass, String collectionName) {
        if (ObjectUtils.isEmpty((Object)query.getSortObject())) {
            return this.doFindOne(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
        }
        query.limit(1);
        return this.find(query, entityClass, collectionName).next();
    }

    @Override
    public Mono<Boolean> exists(Query query, Class<?> entityClass) {
        return this.exists(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public Mono<Boolean> exists(Query query, String collectionName) {
        return this.exists(query, null, collectionName);
    }

    @Override
    public Mono<Boolean> exists(Query query, @Nullable Class<?> entityClass, String collectionName) {
        if (query == null) {
            throw new InvalidDataAccessApiUsageException("Query passed in to exist can't be null");
        }
        return this.createFlux(collectionName, collection -> {
            QueryOperations.QueryContext queryContext = this.queryOperations.createQueryContext(query);
            Document filter = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
            FindPublisher findPublisher = collection.find((Bson)filter, Document.class).projection((Bson)new Document("_id", (Object)1));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("exists: {} in collection: {}", (Object)SerializationUtils.serializeToJsonSafely(filter), (Object)collectionName);
            }
            queryContext.applyCollation(entityClass, arg_0 -> ((FindPublisher)findPublisher).collation(arg_0));
            return findPublisher.limit(1);
        }).hasElements();
    }

    @Override
    public <T> Flux<T> find(Query query, Class<T> entityClass) {
        return this.find(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<T> find(@Nullable Query query, Class<T> entityClass, String collectionName) {
        if (query == null) {
            return this.findAll(entityClass, collectionName);
        }
        return this.doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
    }

    @Override
    public <T> Mono<T> findById(Object id, Class<T> entityClass) {
        return this.findById(id, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Mono<T> findById(Object id, Class<T> entityClass, String collectionName) {
        String idKey = this.operations.getIdPropertyName(entityClass);
        return this.doFindOne(collectionName, new Document(idKey, id), null, entityClass, (Collation)null);
    }

    @Override
    public <T> Flux<T> findDistinct(Query query, String field, Class<?> entityClass, Class<T> resultClass) {
        return this.findDistinct(query, field, this.getCollectionName(entityClass), entityClass, resultClass);
    }

    @Override
    public <T> Flux<T> findDistinct(Query query, String field, String collectionName, Class<?> entityClass, Class<T> resultClass) {
        Assert.notNull((Object)query, (String)"Query must not be null!");
        Assert.notNull((Object)field, (String)"Field must not be null!");
        Assert.notNull((Object)collectionName, (String)"CollectionName must not be null!");
        Assert.notNull(entityClass, (String)"EntityClass must not be null!");
        Assert.notNull(resultClass, (String)"ResultClass must not be null!");
        MongoPersistentEntity<?> entity = this.getPersistentEntity(entityClass);
        QueryOperations.DistinctQueryContext distinctQueryContext = this.queryOperations.distinctQueryContext(query, field);
        Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
        String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
        Class mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);
        Flux result = this.execute(collectionName, (MongoCollection<Document> collection) -> {
            QueryFindPublisherPreparer preparer;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Executing findDistinct using query {} for field: {} in collection: {}", new Object[]{SerializationUtils.serializeToJsonSafely(mappedQuery), field, collectionName});
            }
            if ((preparer = new QueryFindPublisherPreparer(query, entityClass)).hasReadPreference()) {
                collection = collection.withReadPreference(preparer.getReadPreference());
            }
            DistinctPublisher publisher = collection.distinct(mappedFieldName, (Bson)mappedQuery, mongoDriverCompatibleType);
            distinctQueryContext.applyCollation(entityClass, arg_0 -> ((DistinctPublisher)publisher).collation(arg_0));
            return publisher;
        });
        if (resultClass == Object.class || mongoDriverCompatibleType != resultClass) {
            Class<?> targetType = distinctQueryContext.getMostSpecificConversionTargetType(resultClass, entityClass);
            MongoConverter converter = this.getConverter();
            result = result.map(it -> converter.mapValueToTargetType(it, targetType, NO_OP_REF_RESOLVER));
        }
        return result;
    }

    @Override
    public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, String inputCollectionName, Class<O> outputType) {
        Assert.notNull(aggregation, (String)"Aggregation pipeline must not be null!");
        TypeBasedAggregationOperationContext context = new TypeBasedAggregationOperationContext(aggregation.getInputType(), this.mappingContext, this.queryMapper);
        return this.aggregate(aggregation, inputCollectionName, outputType, context);
    }

    @Override
    public <O> Flux<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType) {
        return this.aggregate(aggregation, this.getCollectionName(aggregation.getInputType()), outputType);
    }

    @Override
    public <O> Flux<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
        return this.aggregate(aggregation, this.getCollectionName(inputType), outputType, new TypeBasedAggregationOperationContext(inputType, this.mappingContext, this.queryMapper));
    }

    @Override
    public <O> Flux<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
        return this.aggregate(aggregation, collectionName, outputType, null);
    }

    protected <O> Flux<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType, @Nullable AggregationOperationContext context) {
        Assert.notNull((Object)aggregation, (String)"Aggregation pipeline must not be null!");
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        Assert.notNull(outputType, (String)"Output type must not be null!");
        AggregationUtil aggregationUtil = new AggregationUtil(this.queryMapper, this.mappingContext);
        AggregationOperationContext rootContext = aggregationUtil.prepareAggregationContext(aggregation, context);
        AggregationOptions options = aggregation.getOptions();
        List<Document> pipeline = aggregationUtil.createPipeline(aggregation, rootContext);
        Assert.isTrue((!options.isExplain() ? 1 : 0) != 0, (String)"Cannot use explain option with streaming!");
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Streaming aggregation: {} in collection {}", (Object)SerializationUtils.serializeToJsonSafely(pipeline), (Object)collectionName);
        }
        ReadDocumentCallback<Object> readCallback = new ReadDocumentCallback<Object>(this.mongoConverter, outputType, collectionName);
        return this.execute(collectionName, (MongoCollection<Document> collection) -> this.aggregateAndMap((MongoCollection<Document>)collection, pipeline, options, readCallback, aggregation instanceof TypedAggregation ? ((TypedAggregation)aggregation).getInputType() : null));
    }

    private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<Document> pipeline, AggregationOptions options, ReadDocumentCallback<O> readCallback, @Nullable Class<?> inputType) {
        AggregatePublisher cursor = collection.aggregate(pipeline, Document.class).allowDiskUse(Boolean.valueOf(options.isAllowDiskUse()));
        if (options.getCursorBatchSize() != null) {
            cursor = cursor.batchSize(options.getCursorBatchSize().intValue());
        }
        options.getComment().ifPresent(arg_0 -> ((AggregatePublisher)cursor).comment(arg_0));
        Supplier[] supplierArray = new Supplier[2];
        supplierArray[0] = options::getCollation;
        supplierArray[1] = () -> this.operations.forType(inputType).getCollation();
        Optionals.firstNonEmpty((Supplier[])supplierArray).map(Collation::toMongoCollation).ifPresent(arg_0 -> ((AggregatePublisher)cursor).collation(arg_0));
        if (options.hasExecutionTimeLimit()) {
            cursor = cursor.maxTime(options.getMaxTime().toMillis(), TimeUnit.MILLISECONDS);
        }
        return Flux.from((Publisher)cursor).concatMap(readCallback::doWith);
    }

    @Override
    public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass) {
        return this.geoNear(near, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<T> entityClass, String collectionName) {
        return this.geoNear(near, entityClass, collectionName, entityClass);
    }

    protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, String collectionName, Class<T> returnType) {
        if (near == null) {
            throw new InvalidDataAccessApiUsageException("NearQuery must not be null!");
        }
        if (entityClass == null) {
            throw new InvalidDataAccessApiUsageException("Entity class must not be null!");
        }
        String collection = StringUtils.hasText((String)collectionName) ? collectionName : this.getCollectionName(entityClass);
        String distanceField = this.operations.nearQueryDistanceFieldName(entityClass);
        GeoNearResultDocumentCallback callback = new GeoNearResultDocumentCallback(distanceField, new ProjectingReadCallback(this.mongoConverter, entityClass, returnType, collection), near.getMetric());
        Aggregation $geoNear = TypedAggregation.newAggregation(entityClass, Aggregation.geoNear(near, distanceField)).withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
        return this.aggregate($geoNear, collection, Document.class).concatMap(callback::doWith);
    }

    @Override
    public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass) {
        return this.findAndModify(query, update, new FindAndModifyOptions(), entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, Class<T> entityClass, String collectionName) {
        return this.findAndModify(query, update, new FindAndModifyOptions(), entityClass, collectionName);
    }

    @Override
    public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options, Class<T> entityClass) {
        return this.findAndModify(query, update, options, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Mono<T> findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options, Class<T> entityClass, String collectionName) {
        Assert.notNull((Object)options, (String)"Options must not be null! ");
        Assert.notNull(entityClass, (String)"Entity class must not be null!");
        FindAndModifyOptions optionsToUse = FindAndModifyOptions.of(options);
        Optionals.ifAllPresent(query.getCollation(), optionsToUse.getCollation(), (l, r) -> {
            throw new IllegalArgumentException("Both Query and FindAndModifyOptions define a collation. Please provide the collation only via one of the two.");
        });
        if (!optionsToUse.getCollation().isPresent()) {
            this.operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
        }
        return this.doFindAndModify(collectionName, query.getQueryObject(), query.getFieldsObject(), this.getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
    }

    @Override
    public <S, T> Mono<T> findAndReplace(Query query, S replacement, FindAndReplaceOptions options, Class<S> entityType, String collectionName, Class<T> resultType) {
        Assert.notNull((Object)query, (String)"Query must not be null!");
        Assert.notNull(replacement, (String)"Replacement must not be null!");
        Assert.notNull((Object)options, (String)"Options must not be null! Use FindAndReplaceOptions#empty() instead.");
        Assert.notNull(entityType, (String)"Entity class must not be null!");
        Assert.notNull((Object)collectionName, (String)"CollectionName must not be null!");
        Assert.notNull(resultType, (String)"ResultType must not be null! Use Object.class instead.");
        Assert.isTrue((query.getLimit() <= 1 ? 1 : 0) != 0, (String)"Query must not define a limit other than 1 ore none!");
        Assert.isTrue((query.getSkip() <= 0L ? 1 : 0) != 0, (String)"Query must not define skip.");
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityType);
        QueryOperations.QueryContext queryContext = this.queryOperations.createQueryContext(query);
        Document mappedQuery = queryContext.getMappedQuery(entity);
        Document mappedFields = queryContext.getMappedFields(entity, resultType, (ProjectionFactory)this.projectionFactory);
        Document mappedSort = queryContext.getMappedSort(entity);
        return Mono.defer(() -> {
            PersistableEntityModel<Object> pem = PersistableEntityModel.of(replacement, collectionName);
            this.maybeEmitEvent(new BeforeConvertEvent<Object>(pem.getSource(), pem.getCollection()));
            return this.maybeCallBeforeConvert(pem.getSource(), pem.getCollection()).map(pem::mutate).flatMap(it -> {
                PersistableEntityModel mapped = it.addTargetDocument(this.operations.forEntity(it.getSource()).toMappedDocument(this.mongoConverter).getDocument());
                this.maybeEmitEvent(new BeforeSaveEvent(mapped.getSource(), mapped.getTarget(), mapped.getCollection()));
                return this.maybeCallBeforeSave(it.getSource(), mapped.getTarget(), mapped.getCollection()).map(potentiallyModified -> PersistableEntityModel.of(potentiallyModified, mapped.getTarget(), mapped.getCollection()));
            }).flatMap(it -> {
                Mono afterFindAndReplace = this.doFindAndReplace(it.getCollection(), mappedQuery, mappedFields, mappedSort, queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(), options, resultType);
                return afterFindAndReplace.flatMap(saved -> {
                    this.maybeEmitEvent(new AfterSaveEvent<Object>(saved, it.getTarget(), it.getCollection()));
                    return this.maybeCallAfterSave(saved, it.getTarget(), it.getCollection());
                });
            });
        });
    }

    @Override
    public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass) {
        return this.findAndRemove(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName) {
        this.operations.forType(entityClass).getCollation(query);
        return this.doFindAndRemove(collectionName, query.getQueryObject(), query.getFieldsObject(), this.getMappedSortObject(query, entityClass), this.operations.forType(entityClass).getCollation(query).orElse(null), entityClass);
    }

    @Override
    public Mono<Long> count(Query query, Class<?> entityClass) {
        Assert.notNull(entityClass, (String)"Entity class must not be null!");
        return this.count(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public Mono<Long> count(Query query, String collectionName) {
        return this.count(query, null, collectionName);
    }

    @Override
    public Mono<Long> count(Query query, @Nullable Class<?> entityClass, String collectionName) {
        Assert.notNull((Object)query, (String)"Query must not be null!");
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        return this.createMono(collectionName, collection -> {
            QueryOperations.CountContext countContext = this.queryOperations.countQueryContext(query);
            CountOptions options = countContext.getCountOptions(entityClass);
            Document filter = countContext.getMappedQuery(entityClass, arg_0 -> this.mappingContext.getPersistentEntity(arg_0));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Executing count: {} in collection: {}", (Object)SerializationUtils.serializeToJsonSafely(filter), (Object)collectionName);
            }
            return this.doCount(collectionName, filter, options);
        });
    }

    protected Mono<Long> doCount(String collectionName, Document filter, CountOptions options) {
        return this.createMono(collectionName, collection -> collection.countDocuments((Bson)CountQuery.of(filter).toQueryDocument(), options));
    }

    @Override
    public <T> Mono<T> insert(Mono<? extends T> objectToSave) {
        Assert.notNull(objectToSave, (String)"Mono to insert must not be null!");
        return objectToSave.flatMap(this::insert);
    }

    @Override
    public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, Class<?> entityClass) {
        return this.insertAll(batchToSave, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave, String collectionName) {
        Assert.notNull(batchToSave, (String)"Batch to insert must not be null!");
        return Flux.from(batchToSave).flatMap(collection -> this.insert((Collection)collection, collectionName));
    }

    @Override
    public <T> Mono<T> insert(T objectToSave) {
        Assert.notNull(objectToSave, (String)"Object to insert must not be null!");
        this.ensureNotIterable(objectToSave);
        return this.insert(objectToSave, this.getCollectionName(ClassUtils.getUserClass(objectToSave)));
    }

    @Override
    public <T> Mono<T> insert(T objectToSave, String collectionName) {
        Assert.notNull(objectToSave, (String)"Object to insert must not be null!");
        this.ensureNotIterable(objectToSave);
        return this.doInsert(collectionName, objectToSave, this.mongoConverter);
    }

    protected <T> Mono<T> doInsert(String collectionName, T objectToSave, MongoWriter<Object> writer) {
        return Mono.just(PersistableEntityModel.of(objectToSave, collectionName)).doOnNext(it -> this.maybeEmitEvent(new BeforeConvertEvent(it.getSource(), it.getCollection()))).flatMap(it -> this.maybeCallBeforeConvert(it.getSource(), it.getCollection()).map(it::mutate)).map(it -> {
            EntityOperations.AdaptibleEntity entity = this.operations.forEntity(it.getSource(), this.mongoConverter.getConversionService());
            entity.assertUpdateableIdIfNotSet();
            PersistableEntityModel model = PersistableEntityModel.of(entity.initializeVersionProperty(), entity.toMappedDocument(writer).getDocument(), it.getCollection());
            this.maybeEmitEvent(new BeforeSaveEvent(model.getSource(), model.getTarget(), model.getCollection()));
            return model;
        }).flatMap(it -> this.maybeCallBeforeSave(it.getSource(), it.getTarget(), it.getCollection()).map(it::mutate)).flatMap(it -> this.insertDocument(it.getCollection(), it.getTarget(), it.getSource().getClass()).flatMap(id -> {
            Object saved = this.operations.forEntity(it.getSource(), this.mongoConverter.getConversionService()).populateIdIfNecessary(id);
            this.maybeEmitEvent(new AfterSaveEvent(saved, it.getTarget(), collectionName));
            return this.maybeCallAfterSave(saved, it.getTarget(), collectionName);
        }));
    }

    @Override
    public <T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass) {
        return this.doInsertBatch(this.getCollectionName(entityClass), batchToSave, this.mongoConverter);
    }

    @Override
    public <T> Flux<T> insert(Collection<? extends T> batchToSave, String collectionName) {
        return this.doInsertBatch(collectionName, batchToSave, this.mongoConverter);
    }

    @Override
    public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
        return this.doInsertAll(objectsToSave, this.mongoConverter);
    }

    @Override
    public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
        return Flux.from(objectsToSave).flatMap(this::insertAll);
    }

    protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
        HashMap elementsByCollection = new HashMap();
        listToSave.forEach(element -> {
            String collection = this.getCollectionName(element.getClass());
            List collectionElements = elementsByCollection.computeIfAbsent(collection, k -> new ArrayList());
            collectionElements.add(element);
        });
        return Flux.fromIterable(elementsByCollection.keySet()).flatMap(collectionName -> this.doInsertBatch((String)collectionName, (Collection)elementsByCollection.get(collectionName), writer));
    }

    protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave, MongoWriter<Object> writer) {
        Assert.notNull(writer, (String)"MongoWriter must not be null!");
        Mono prepareDocuments = Flux.fromIterable(batchToSave).flatMap(uninitialized -> {
            BeforeConvertEvent<Object> event = new BeforeConvertEvent<Object>(uninitialized, collectionName);
            Object toConvert = this.maybeEmitEvent(event).getSource();
            return this.maybeCallBeforeConvert(toConvert, collectionName).flatMap(it -> {
                EntityOperations.AdaptibleEntity<Object> entity = this.operations.forEntity(it, this.mongoConverter.getConversionService());
                entity.assertUpdateableIdIfNotSet();
                Object initialized = entity.initializeVersionProperty();
                Document dbDoc = entity.toMappedDocument(writer).getDocument();
                this.maybeEmitEvent(new BeforeSaveEvent<Object>(initialized, dbDoc, collectionName));
                return this.maybeCallBeforeSave(initialized, dbDoc, collectionName).thenReturn((Object)Tuples.of(entity, (Object)dbDoc));
            });
        }).collectList();
        Flux insertDocuments = prepareDocuments.flatMapMany(tuples -> {
            List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
            return this.insertDocumentList(collectionName, documents).thenMany((Publisher)Flux.fromIterable((Iterable)tuples));
        });
        return insertDocuments.flatMap(tuple -> {
            Document document = (Document)tuple.getT2();
            Object id = MappedDocument.of(document).getId();
            Object saved = ((EntityOperations.AdaptibleEntity)tuple.getT1()).populateIdIfNecessary(id);
            this.maybeEmitEvent(new AfterSaveEvent(saved, document, collectionName));
            return this.maybeCallAfterSave(saved, document, collectionName);
        });
    }

    @Override
    public <T> Mono<T> save(Mono<? extends T> objectToSave) {
        Assert.notNull(objectToSave, (String)"Mono to save must not be null!");
        return objectToSave.flatMap(this::save);
    }

    @Override
    public <T> Mono<T> save(Mono<? extends T> objectToSave, String collectionName) {
        Assert.notNull(objectToSave, (String)"Mono to save must not be null!");
        return objectToSave.flatMap(o -> this.save(o, collectionName));
    }

    @Override
    public <T> Mono<T> save(T objectToSave) {
        Assert.notNull(objectToSave, (String)"Object to save must not be null!");
        return this.save(objectToSave, this.getCollectionName(ClassUtils.getUserClass(objectToSave)));
    }

    @Override
    public <T> Mono<T> save(T objectToSave, String collectionName) {
        Assert.notNull(objectToSave, (String)"Object to save must not be null!");
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        EntityOperations.AdaptibleEntity<T> source = this.operations.forEntity(objectToSave, this.mongoConverter.getConversionService());
        return source.isVersionedEntity() ? this.doSaveVersioned(source, collectionName) : this.doSave(collectionName, objectToSave, this.mongoConverter);
    }

    private <T> Mono<T> doSaveVersioned(EntityOperations.AdaptibleEntity<T> source, String collectionName) {
        if (source.isNew()) {
            return this.doInsert(collectionName, source.getBean(), this.mongoConverter);
        }
        return this.createMono(collectionName, collection -> {
            Query query = source.getQueryForVersion();
            Object toSave = source.incrementVersion();
            source.assertUpdateableIdIfNotSet();
            BeforeConvertEvent event = new BeforeConvertEvent(toSave, collectionName);
            Object afterEvent = this.maybeEmitEvent(event).getSource();
            return this.maybeCallBeforeConvert(afterEvent, collectionName).flatMap(toConvert -> {
                MappedDocument mapped = this.operations.forEntity(toConvert).toMappedDocument(this.mongoConverter);
                Document document = mapped.getDocument();
                this.maybeEmitEvent(new BeforeSaveEvent<Object>(toConvert, document, collectionName));
                return this.maybeCallBeforeSave(toConvert, document, collectionName).flatMap(it -> this.doUpdate(collectionName, query, mapped.updateWithoutId(), it.getClass(), false, false).flatMap(result -> {
                    this.maybeEmitEvent(new AfterSaveEvent<Object>(it, document, collectionName));
                    return this.maybeCallAfterSave(it, document, collectionName);
                }));
            });
        });
    }

    protected <T> Mono<T> doSave(String collectionName, T objectToSave, MongoWriter<Object> writer) {
        this.assertUpdateableIdIfNotSet(objectToSave);
        return this.createMono(collectionName, collection -> {
            Object toSave = this.maybeEmitEvent(new BeforeConvertEvent<Object>(objectToSave, collectionName)).getSource();
            return this.maybeCallBeforeConvert(toSave, collectionName).flatMap(toConvert -> {
                EntityOperations.AdaptibleEntity<Object> entity = this.operations.forEntity(toConvert, this.mongoConverter.getConversionService());
                Document dbDoc = entity.toMappedDocument(writer).getDocument();
                this.maybeEmitEvent(new BeforeSaveEvent<Object>(toConvert, dbDoc, collectionName));
                return this.maybeCallBeforeSave(toConvert, dbDoc, collectionName).flatMap(it -> this.saveDocument(collectionName, dbDoc, it.getClass()).flatMap(id -> {
                    Object saved = entity.populateIdIfNecessary(id);
                    this.maybeEmitEvent(new AfterSaveEvent(saved, dbDoc, collectionName));
                    return this.maybeCallAfterSave(saved, dbDoc, collectionName);
                }));
            });
        });
    }

    protected Mono<Object> insertDocument(String collectionName, Document dbDoc, Class<?> entityClass) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Inserting Document containing fields: " + dbDoc.keySet() + " in collection: " + collectionName);
        }
        Document document = new Document((Map)dbDoc);
        Flux execute = this.execute(collectionName, (MongoCollection<Document> collection) -> {
            MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.INSERT, collectionName, entityClass, dbDoc, null);
            WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
            MongoCollection<Document> collectionToUse = this.prepareCollection((MongoCollection<Document>)collection, writeConcernToUse);
            return collectionToUse.insertOne((Object)document);
        });
        return Flux.from(execute).last().map(success -> MappedDocument.of(document).getId());
    }

    protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document> dbDocList) {
        if (dbDocList.isEmpty()) {
            return Flux.empty();
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Inserting list of Documents containing " + dbDocList.size() + " items");
        }
        ArrayList documents = new ArrayList();
        return this.execute(collectionName, (MongoCollection<Document> collection) -> {
            MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.INSERT_LIST, collectionName, null, null, null);
            WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
            MongoCollection<Document> collectionToUse = this.prepareCollection((MongoCollection<Document>)collection, writeConcernToUse);
            documents.addAll(ReactiveMongoTemplate.toDocuments(dbDocList));
            return collectionToUse.insertMany(documents);
        }).flatMap(s -> Flux.fromStream(documents.stream().map(MappedDocument::of).filter(it -> it.isIdPresent(ObjectId.class)).map(it -> it.getId(ObjectId.class))));
    }

    private MongoCollection<Document> prepareCollection(MongoCollection<Document> collection, @Nullable WriteConcern writeConcernToUse) {
        MongoCollection collectionToUse = collection;
        if (writeConcernToUse != null) {
            collectionToUse = collectionToUse.withWriteConcern(writeConcernToUse);
        }
        return collectionToUse;
    }

    protected Mono<Object> saveDocument(String collectionName, Document document, Class<?> entityClass) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Saving Document containing fields: " + document.keySet());
        }
        return this.createMono(collectionName, collection -> {
            Publisher publisher;
            MongoCollection collectionToUse;
            MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.SAVE, collectionName, entityClass, document, null);
            WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
            MappedDocument mapped = MappedDocument.of(document);
            MongoCollection mongoCollection = collectionToUse = writeConcernToUse == null ? collection : collection.withWriteConcern(writeConcernToUse);
            if (!mapped.hasId()) {
                publisher = collectionToUse.insertOne((Object)document);
            } else {
                MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityClass);
                QueryOperations.UpdateContext updateContext = this.queryOperations.replaceSingleContext(mapped, true);
                Document filter = updateContext.getMappedQuery(entity);
                Document replacement = updateContext.getMappedUpdate(entity);
                Mono deferredFilter = updateContext.requiresShardKey(filter, entity) ? (entity.getShardKey().isImmutable() ? Mono.just((Object)updateContext.applyShardKey(entity, filter, null)) : Mono.from((Publisher)collection.find((Bson)filter, Document.class).projection((Bson)updateContext.getMappedShardKey(entity)).first()).defaultIfEmpty((Object)replacement).map(it -> updateContext.applyShardKey(entity, filter, (Document)it))) : Mono.just((Object)filter);
                publisher = deferredFilter.flatMapMany(it -> collectionToUse.replaceOne((Bson)it, (Object)replacement, updateContext.getReplaceOptions(entityClass)));
            }
            return Mono.from((Publisher)publisher).map(o -> mapped.getId());
        });
    }

    @Override
    public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass) {
        return this.doUpdate(this.getCollectionName(entityClass), query, update, entityClass, true, false);
    }

    @Override
    public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, String collectionName) {
        return this.doUpdate(collectionName, query, update, null, true, false);
    }

    @Override
    public Mono<UpdateResult> upsert(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {
        return this.doUpdate(collectionName, query, update, entityClass, true, false);
    }

    @Override
    public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass) {
        return this.doUpdate(this.getCollectionName(entityClass), query, update, entityClass, false, false);
    }

    @Override
    public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, String collectionName) {
        return this.doUpdate(collectionName, query, update, null, false, false);
    }

    @Override
    public Mono<UpdateResult> updateFirst(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {
        return this.doUpdate(collectionName, query, update, entityClass, false, false);
    }

    @Override
    public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass) {
        return this.doUpdate(this.getCollectionName(entityClass), query, update, entityClass, false, true);
    }

    @Override
    public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, String collectionName) {
        return this.doUpdate(collectionName, query, update, null, false, true);
    }

    @Override
    public Mono<UpdateResult> updateMulti(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {
        return this.doUpdate(collectionName, query, update, entityClass, false, true);
    }

    protected Mono<UpdateResult> doUpdate(String collectionName, Query query, @Nullable UpdateDefinition update, @Nullable Class<?> entityClass, boolean upsert, boolean multi) {
        Flux result;
        if (query.isSorted() && LOGGER.isWarnEnabled()) {
            LOGGER.warn("{} does not support sort ('{}'). Please use findAndModify() instead.", (Object)(upsert ? "Upsert" : "UpdateFirst"), (Object)SerializationUtils.serializeToJsonSafely(query.getSortObject()));
        }
        MongoPersistentEntity<?> entity = entityClass == null ? null : this.getPersistentEntity(entityClass);
        QueryOperations.UpdateContext updateContext = multi ? this.queryOperations.updateContext(update, query, upsert) : this.queryOperations.updateSingleContext(update, query, upsert);
        updateContext.increaseVersionForUpdateIfNecessary(entity);
        Document queryObj = updateContext.getMappedQuery(entity);
        UpdateOptions updateOptions = updateContext.getUpdateOptions(entityClass);
        if (updateContext.isAggregationUpdate()) {
            List<Document> pipeline = updateContext.getUpdatePipeline(entityClass);
            MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass, update.getUpdateObject(), queryObj);
            WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
            result = this.execute(collectionName, (MongoCollection<Document> collection) -> {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s", SerializationUtils.serializeToJsonSafely(queryObj), SerializationUtils.serializeToJsonSafely(pipeline), collectionName));
                }
                collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;
                return multi ? collection.updateMany((Bson)queryObj, pipeline, updateOptions) : collection.updateOne((Bson)queryObj, pipeline, updateOptions);
            });
        } else {
            Document updateObj = updateContext.getMappedUpdate(entity);
            MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.UPDATE, collectionName, entityClass, updateObj, queryObj);
            WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
            result = this.execute(collectionName, (MongoCollection<Document> collection) -> {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Calling update using query: %s and update: %s in collection: %s", SerializationUtils.serializeToJsonSafely(queryObj), SerializationUtils.serializeToJsonSafely(updateObj), collectionName));
                }
                MongoCollection<Document> collectionToUse = this.prepareCollection((MongoCollection<Document>)collection, writeConcernToUse);
                if (!UpdateMapper.isUpdateObject(updateObj)) {
                    Document filter = new Document((Map)queryObj);
                    Mono deferredFilter = updateContext.requiresShardKey(filter, entity) ? (entity.getShardKey().isImmutable() ? Mono.just((Object)updateContext.applyShardKey(entity, filter, null)) : Mono.from((Publisher)collection.find((Bson)filter, Document.class).projection((Bson)updateContext.getMappedShardKey(entity)).first()).defaultIfEmpty((Object)updateObj).map(it -> updateContext.applyShardKey(entity, filter, (Document)it))) : Mono.just((Object)filter);
                    ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entityClass);
                    return deferredFilter.flatMap(it -> Mono.from((Publisher)collectionToUse.replaceOne((Bson)it, (Object)updateObj, replaceOptions)));
                }
                return multi ? collectionToUse.updateMany((Bson)queryObj, (Bson)updateObj, updateOptions) : collectionToUse.updateOne((Bson)queryObj, (Bson)updateObj, updateOptions);
            });
        }
        result = result.doOnNext(updateResult -> {
            if (entity != null && entity.hasVersionProperty() && !multi && updateResult.wasAcknowledged() && updateResult.getMatchedCount() == 0L) {
                Document updateObj = updateContext.getMappedUpdate(entity);
                if (this.containsVersionProperty(queryObj, entity)) {
                    throw new OptimisticLockingFailureException("Optimistic lock exception on saving entity: " + updateObj.toString() + " to collection " + collectionName);
                }
            }
        });
        return result.next();
    }

    private boolean containsVersionProperty(Document document, @Nullable MongoPersistentEntity<?> persistentEntity) {
        if (persistentEntity == null || !persistentEntity.hasVersionProperty()) {
            return false;
        }
        return document.containsKey((Object)((MongoPersistentProperty)persistentEntity.getRequiredVersionProperty()).getFieldName());
    }

    @Override
    public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove) {
        return objectToRemove.flatMap(this::remove);
    }

    @Override
    public Mono<DeleteResult> remove(Mono<? extends Object> objectToRemove, String collectionName) {
        return objectToRemove.flatMap(it -> this.remove(it, collectionName));
    }

    @Override
    public Mono<DeleteResult> remove(Object object) {
        Assert.notNull((Object)object, (String)"Object must not be null!");
        return this.remove(this.operations.forEntity(object).getRemoveByQuery(), object.getClass());
    }

    @Override
    public Mono<DeleteResult> remove(Object object, String collectionName) {
        Assert.notNull((Object)object, (String)"Object must not be null!");
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        return this.doRemove(collectionName, this.operations.forEntity(object).getRemoveByQuery(), object.getClass());
    }

    private void assertUpdateableIdIfNotSet(Object value) {
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(value.getClass());
        if (entity != null && entity.hasIdProperty()) {
            MongoPersistentProperty property = (MongoPersistentProperty)entity.getRequiredIdProperty();
            Object propertyValue = entity.getPropertyAccessor(value).getProperty((PersistentProperty)property);
            if (propertyValue != null) {
                return;
            }
            if (!MongoSimpleTypes.AUTOGENERATED_ID_TYPES.contains(property.getType())) {
                throw new InvalidDataAccessApiUsageException(String.format("Cannot autogenerate id of type %s for entity of type %s!", property.getType().getName(), value.getClass().getName()));
            }
        }
    }

    @Override
    public Mono<DeleteResult> remove(Query query, String collectionName) {
        return this.remove(query, null, collectionName);
    }

    @Override
    public Mono<DeleteResult> remove(Query query, Class<?> entityClass) {
        return this.remove(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public Mono<DeleteResult> remove(Query query, @Nullable Class<?> entityClass, String collectionName) {
        return this.doRemove(collectionName, query, entityClass);
    }

    protected <T> Mono<DeleteResult> doRemove(String collectionName, Query query, @Nullable Class<T> entityClass) {
        if (query == null) {
            throw new InvalidDataAccessApiUsageException("Query passed in to remove can't be null!");
        }
        Assert.hasText((String)collectionName, (String)"Collection name must not be null or empty!");
        MongoPersistentEntity<?> entity = this.getPersistentEntity(entityClass);
        QueryOperations.DeleteContext deleteContext = this.queryOperations.deleteQueryContext(query);
        Document queryObject = deleteContext.getMappedQuery(entity);
        DeleteOptions deleteOptions = deleteContext.getDeleteOptions(entityClass);
        Document removeQuery = deleteContext.getMappedQuery(entity);
        MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass, null, removeQuery);
        WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
        return this.execute(collectionName, (MongoCollection<Document> collection) -> {
            this.maybeEmitEvent(new BeforeDeleteEvent(removeQuery, entityClass, collectionName));
            MongoCollection<Document> collectionToUse = this.prepareCollection((MongoCollection<Document>)collection, writeConcernToUse);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Remove using query: {} in collection: {}.", new Object[]{SerializationUtils.serializeToJsonSafely(removeQuery), collectionName});
            }
            if (query.getLimit() > 0 || query.getSkip() > 0L) {
                FindPublisher cursor = new QueryFindPublisherPreparer(query, entityClass).prepare((FindPublisher<Document>)collection.find((Bson)removeQuery)).projection((Bson)MappedDocument.getIdOnlyProjection());
                return Flux.from((Publisher)cursor).map(MappedDocument::of).map(MappedDocument::getId).collectList().flatMapMany(val -> collectionToUse.deleteMany((Bson)MappedDocument.getIdIn(val), deleteOptions));
            }
            return collectionToUse.deleteMany((Bson)removeQuery, deleteOptions);
        }).doOnNext(it -> this.maybeEmitEvent(new AfterDeleteEvent(queryObject, entityClass, collectionName))).next();
    }

    @Override
    public <T> Flux<T> findAll(Class<T> entityClass) {
        return this.findAll(entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<T> findAll(Class<T> entityClass, String collectionName) {
        return this.executeFindMultiInternal(new FindCallback(null), FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName), collectionName);
    }

    @Override
    public <T> Flux<T> findAllAndRemove(Query query, String collectionName) {
        return this.findAllAndRemove(query, Object.class, collectionName);
    }

    @Override
    public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass) {
        return this.findAllAndRemove(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<T> findAllAndRemove(Query query, Class<T> entityClass, String collectionName) {
        return this.doFindAndDelete(collectionName, query, entityClass);
    }

    @Override
    public <T> Flux<T> tail(Query query, Class<T> entityClass) {
        return this.tail(query, entityClass, this.getCollectionName(entityClass));
    }

    @Override
    public <T> Flux<T> tail(@Nullable Query query, Class<T> entityClass, String collectionName) {
        if (query == null) {
            LOGGER.debug(String.format("Tail for class: %s in collection: %s", entityClass, collectionName));
            return this.executeFindMultiInternal(collection -> new FindCallback(null).doInCollection((MongoCollection<Document>)collection).cursorType(CursorType.TailableAwait), FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName), collectionName);
        }
        return this.doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass, new TailingQueryFindPublisherPreparer(query, entityClass));
    }

    @Override
    public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName, ChangeStreamOptions options, Class<T> targetType) {
        List<Document> filter = this.prepareFilter(options);
        FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP;
        return ReactiveMongoDatabaseUtils.getDatabase(database, this.mongoDatabaseFactory).map(db -> {
            ChangeStreamPublisher publisher = StringUtils.hasText((String)collectionName) ? (filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class) : db.getCollection(collectionName).watch(filter, Document.class)) : (filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class));
            publisher = options.getResumeToken().map(BsonValue::asDocument).map(arg_0 -> ((ChangeStreamPublisher)publisher).resumeAfter(arg_0)).orElse(publisher);
            publisher = options.getCollation().map(Collation::toMongoCollation).map(arg_0 -> ((ChangeStreamPublisher)publisher).collation(arg_0)).orElse(publisher);
            publisher = options.getResumeBsonTimestamp().map(arg_0 -> ((ChangeStreamPublisher)publisher).startAtOperationTime(arg_0)).orElse(publisher);
            return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
        }).flatMapMany(publisher -> Flux.from((Publisher)publisher).map(document -> new ChangeStreamEvent((ChangeStreamDocument<Document>)document, targetType, this.getConverter())));
    }

    List<Document> prepareFilter(ChangeStreamOptions options) {
        List<Document> filter = options.getFilter().orElse(Collections.emptyList());
        if (filter instanceof Aggregation) {
            Aggregation agg = (Aggregation)((Object)filter);
            AggregationOperationContext context = agg instanceof TypedAggregation ? new TypeBasedAggregationOperationContext(((TypedAggregation)agg).getInputType(), this.getConverter().getMappingContext(), this.queryMapper) : Aggregation.DEFAULT_CONTEXT;
            return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
        }
        if (filter instanceof List) {
            return filter;
        }
        throw new IllegalArgumentException("ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
    }

    @Override
    public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction, String reduceFunction, MapReduceOptions options) {
        return this.mapReduce(filterQuery, domainType, this.getCollectionName(domainType), resultType, mapFunction, reduceFunction, options);
    }

    @Override
    public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType, String mapFunction, String reduceFunction, MapReduceOptions options) {
        Assert.notNull((Object)filterQuery, (String)"Filter query must not be null!");
        Assert.notNull(domainType, (String)"Domain type must not be null!");
        Assert.hasText((String)inputCollectionName, (String)"Input collection name must not be null or empty!");
        Assert.notNull(resultType, (String)"Result type must not be null!");
        Assert.notNull((Object)mapFunction, (String)"Map function must not be null!");
        Assert.notNull((Object)reduceFunction, (String)"Reduce function must not be null!");
        Assert.notNull((Object)options, (String)"MapReduceOptions must not be null!");
        ReactiveMongoTemplate.assertLocalFunctionNames(mapFunction, reduceFunction);
        return this.createFlux(inputCollectionName, collection -> {
            Document mappedQuery = this.queryMapper.getMappedObject((Bson)filterQuery.getQueryObject(), (MongoPersistentEntity)this.mappingContext.getPersistentEntity(domainType));
            MapReducePublisher publisher = collection.mapReduce(mapFunction, reduceFunction, Document.class);
            publisher.filter((Bson)mappedQuery);
            Document mappedSort = this.getMappedSortObject(filterQuery, domainType);
            if (mappedSort != null && !mappedSort.isEmpty()) {
                publisher.sort((Bson)mappedSort);
            }
            if (filterQuery.getMeta().getMaxTimeMsec() != null) {
                publisher.maxTime(filterQuery.getMeta().getMaxTimeMsec().longValue(), TimeUnit.MILLISECONDS);
            }
            if (filterQuery.getLimit() > 0 || options.getLimit() != null) {
                if (filterQuery.getLimit() > 0 && options.getLimit() != null) {
                    throw new IllegalArgumentException("Both Query and MapReduceOptions define a limit. Please provide the limit only via one of the two.");
                }
                if (filterQuery.getLimit() > 0) {
                    publisher.limit(filterQuery.getLimit());
                }
                if (options.getLimit() != null) {
                    publisher.limit(options.getLimit().intValue());
                }
            }
            Optional<Collation> collation = filterQuery.getCollation();
            Optionals.ifAllPresent(filterQuery.getCollation(), options.getCollation(), (l, r) -> {
                throw new IllegalArgumentException("Both Query and MapReduceOptions define a collation. Please provide the collation only via one of the two.");
            });
            if (options.getCollation().isPresent()) {
                collation = options.getCollation();
            }
            if (!CollectionUtils.isEmpty(options.getScopeVariables())) {
                publisher = publisher.scope((Bson)new Document(options.getScopeVariables()));
            }
            if (options.getLimit() != null && options.getLimit() > 0) {
                publisher = publisher.limit(options.getLimit().intValue());
            }
            if (options.getFinalizeFunction().filter(StringUtils::hasText).isPresent()) {
                publisher = publisher.finalizeFunction(options.getFinalizeFunction().get());
            }
            if (options.getJavaScriptMode() != null) {
                publisher = publisher.jsMode(options.getJavaScriptMode().booleanValue());
            }
            if (options.getOutputSharded().isPresent()) {
                publisher = publisher.sharded(options.getOutputSharded().get().booleanValue());
            }
            if (StringUtils.hasText((String)options.getOutputCollection()) && !options.usesInlineOutput()) {
                publisher = publisher.collectionName(options.getOutputCollection()).action(options.getMapReduceAction());
                if (options.getOutputDatabase().isPresent()) {
                    publisher = publisher.databaseName(options.getOutputDatabase().get());
                }
            }
            publisher = collation.map(Collation::toMongoCollation).map(arg_0 -> ((MapReducePublisher)publisher).collation(arg_0)).orElse(publisher);
            return Flux.from((Publisher)publisher).concatMap(new ReadDocumentCallback<Object>(this.mongoConverter, resultType, inputCollectionName)::doWith);
        });
    }

    private static void assertLocalFunctionNames(String ... functions) {
        for (String function : functions) {
            if (!ResourceUtils.isUrl((String)function)) continue;
            throw new IllegalArgumentException(String.format("Blocking accessing to resource %s is not allowed using reactive infrastructure. You may load the resource at startup and cache its value.", function));
        }
    }

    @Override
    public <T> ReactiveFindOperation.ReactiveFind<T> query(Class<T> domainType) {
        return new ReactiveFindOperationSupport(this).query(domainType);
    }

    @Override
    public <T> ReactiveUpdateOperation.ReactiveUpdate<T> update(Class<T> domainType) {
        return new ReactiveUpdateOperationSupport(this).update(domainType);
    }

    @Override
    public <T> ReactiveRemoveOperation.ReactiveRemove<T> remove(Class<T> domainType) {
        return new ReactiveRemoveOperationSupport(this).remove(domainType);
    }

    @Override
    public <T> ReactiveInsertOperation.ReactiveInsert<T> insert(Class<T> domainType) {
        return new ReactiveInsertOperationSupport(this).insert(domainType);
    }

    @Override
    public <T> ReactiveAggregationOperation.ReactiveAggregation<T> aggregateAndReturn(Class<T> domainType) {
        return new ReactiveAggregationOperationSupport(this).aggregateAndReturn(domainType);
    }

    public <T> ReactiveMapReduceOperation.ReactiveMapReduce<T> mapReduce(Class<T> domainType) {
        return new ReactiveMapReduceOperationSupport(this).mapReduce((Class)domainType);
    }

    @Override
    public <T> ReactiveChangeStreamOperation.ReactiveChangeStream<T> changeStream(Class<T> domainType) {
        return new ReactiveChangeStreamOperationSupport(this).changeStream(domainType);
    }

    protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<T> entityClass) {
        Flux<T> flux = this.find(query, entityClass, collectionName);
        return Flux.from(flux).collectList().filter(it -> !it.isEmpty()).flatMapMany(list -> Flux.from(this.remove(this.operations.getByIdInQuery((Collection<?>)list), entityClass, collectionName)).flatMap(deleteResult -> Flux.fromIterable((Iterable)list)));
    }

    protected Mono<MongoCollection<Document>> doCreateCollection(String collectionName, CreateCollectionOptions collectionOptions) {
        return this.createMono(db -> db.createCollection(collectionName, collectionOptions)).doOnSuccess(it -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Created collection [{}]", (Object)collectionName);
            }
        }).then(this.getCollection(collectionName));
    }

    protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields, Class<T> entityClass, @Nullable Collation collation) {
        return this.doFindOne(collectionName, query, fields, entityClass, (FindPublisher<Document> findPublisher) -> collation != null ? findPublisher.collation(collation.toMongoCollation()) : findPublisher);
    }

    protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityClass);
        QueryOperations.QueryContext queryContext = this.queryOperations.createQueryContext(new BasicQuery(query, fields != null ? fields : new Document()));
        Document mappedFields = queryContext.getMappedFields(entity, entityClass, (ProjectionFactory)this.projectionFactory);
        Document mappedQuery = queryContext.getMappedQuery(entity);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("findOne using query: %s fields: %s for class: %s in collection: %s", SerializationUtils.serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
        }
        return this.executeFindOneInternal(new FindOneCallback(mappedQuery, mappedFields, preparer), new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName), collectionName);
    }

    protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
        return this.doFind(collectionName, query, fields, entityClass, null, new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName));
    }

    protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
        return this.doFind(collectionName, query, fields, entityClass, preparer, new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName));
    }

    protected <S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityClass);
        QueryOperations.QueryContext queryContext = this.queryOperations.createQueryContext(new BasicQuery(query, fields));
        Document mappedFields = queryContext.getMappedFields(entity, entityClass, (ProjectionFactory)this.projectionFactory);
        Document mappedQuery = queryContext.getMappedQuery(entity);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("find using query: %s fields: %s for class: %s in collection: %s", SerializationUtils.serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
        }
        return this.executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer, objectCallback, collectionName);
    }

    <S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, FindPublisherPreparer preparer) {
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(sourceClass);
        QueryOperations.QueryContext queryContext = this.queryOperations.createQueryContext(new BasicQuery(query, fields));
        Document mappedFields = queryContext.getMappedFields(entity, targetClass, (ProjectionFactory)this.projectionFactory);
        Document mappedQuery = queryContext.getMappedQuery(entity);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("find using query: {} fields: {} for class: {} in collection: {}", new Object[]{SerializationUtils.serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName});
        }
        return this.executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer, new ProjectingReadCallback<S, T>(this.mongoConverter, sourceClass, targetClass, collectionName), collectionName);
    }

    private Document getMappedFieldsObject(Document fields, @Nullable MongoPersistentEntity<?> entity, Class<?> targetType) {
        if (entity == null) {
            return fields;
        }
        Document projectedFields = this.propertyOperations.computeFieldsForProjection((ProjectionFactory)this.projectionFactory, fields, entity.getType(), targetType);
        if (ObjectUtils.nullSafeEquals((Object)fields, (Object)projectedFields)) {
            return this.queryMapper.getMappedFields(projectedFields, entity);
        }
        return this.queryMapper.getMappedFields(projectedFields, (MongoPersistentEntity)this.mappingContext.getRequiredPersistentEntity(targetType));
    }

    protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions) {
        return this.convertToCreateCollectionOptions(collectionOptions, Object.class);
    }

    protected CreateCollectionOptions convertToCreateCollectionOptions(@Nullable CollectionOptions collectionOptions, Class<?> entityType) {
        CreateCollectionOptions result = new CreateCollectionOptions();
        if (collectionOptions == null) {
            return result;
        }
        collectionOptions.getCapped().ifPresent(arg_0 -> ((CreateCollectionOptions)result).capped(arg_0));
        collectionOptions.getSize().ifPresent(arg_0 -> ((CreateCollectionOptions)result).sizeInBytes(arg_0));
        collectionOptions.getMaxDocuments().ifPresent(arg_0 -> ((CreateCollectionOptions)result).maxDocuments(arg_0));
        collectionOptions.getCollation().map(Collation::toMongoCollation).ifPresent(arg_0 -> ((CreateCollectionOptions)result).collation(arg_0));
        collectionOptions.getValidationOptions().ifPresent(it -> {
            ValidationOptions validationOptions = new ValidationOptions();
            it.getValidationAction().ifPresent(arg_0 -> ((ValidationOptions)validationOptions).validationAction(arg_0));
            it.getValidationLevel().ifPresent(arg_0 -> ((ValidationOptions)validationOptions).validationLevel(arg_0));
            it.getValidator().ifPresent(val -> validationOptions.validator((Bson)this.getMappedValidator((Validator)val, entityType)));
            result.validationOptions(validationOptions);
        });
        return result;
    }

    private Document getMappedValidator(Validator validator, Class<?> domainType) {
        Document validationRules = validator.toDocument();
        if (validationRules.containsKey((Object)"$jsonSchema")) {
            return this.schemaMapper.mapSchema(validationRules, domainType);
        }
        return this.queryMapper.getMappedObject((Bson)validationRules, (MongoPersistentEntity)this.mappingContext.getPersistentEntity(domainType));
    }

    protected <T> Mono<T> doFindAndRemove(String collectionName, Document query, Document fields, Document sort, @Nullable Collation collation, Class<T> entityClass) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("findAndRemove using query: %s fields: %s sort: %s for class: %s in collection: %s", SerializationUtils.serializeToJsonSafely(query), fields, sort, entityClass, collectionName));
        }
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityClass);
        return this.executeFindOneInternal(new FindAndRemoveCallback(this.queryMapper.getMappedObject((Bson)query, entity), fields, sort, collation), new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName), collectionName);
    }

    protected <T> Mono<T> doFindAndModify(String collectionName, Document query, Document fields, Document sort, Class<T> entityClass, UpdateDefinition update, FindAndModifyOptions options) {
        MongoPersistentEntity entity = (MongoPersistentEntity)this.mappingContext.getPersistentEntity(entityClass);
        QueryOperations.UpdateContext updateContext = this.queryOperations.updateSingleContext(update, query, false);
        updateContext.increaseVersionForUpdateIfNecessary(entity);
        return Mono.defer(() -> {
            Document mappedUpdate;
            Document mappedQuery = updateContext.getMappedQuery(entity);
            Document document = mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entityClass) : updateContext.getMappedUpdate(entity);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("findAndModify using query: %s fields: %s sort: %s for class: %s and update: %s in collection: %s", SerializationUtils.serializeToJsonSafely(mappedQuery), fields, sort, entityClass, SerializationUtils.serializeToJsonSafely(mappedUpdate), collectionName));
            }
            return this.executeFindOneInternal(new FindAndModifyCallback(mappedQuery, fields, sort, mappedUpdate, update.getArrayFilters().stream().map(UpdateDefinition.ArrayFilter::asDocument).collect(Collectors.toList()), options), new ReadDocumentCallback<Object>(this.mongoConverter, entityClass, collectionName), collectionName);
        });
    }

    protected <T> Mono<T> doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields, Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
        return Mono.defer(() -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("findAndReplace using query: {} fields: {} sort: {} for class: {} and replacement: {} in collection: {}", new Object[]{SerializationUtils.serializeToJsonSafely(mappedQuery), mappedFields, mappedSort, entityType, SerializationUtils.serializeToJsonSafely(replacement), collectionName});
            }
            return this.executeFindOneInternal(new FindAndReplaceCallback(mappedQuery, mappedFields, mappedSort, replacement, collation, options), new ProjectingReadCallback(this.mongoConverter, entityType, resultType, collectionName), collectionName);
        });
    }

    protected <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent(event);
        }
        return event;
    }

    protected <T> Mono<T> maybeCallBeforeConvert(T object, String collection) {
        if (this.entityCallbacks != null) {
            return this.entityCallbacks.callback(ReactiveBeforeConvertCallback.class, object, new Object[]{collection});
        }
        return Mono.just(object);
    }

    protected <T> Mono<T> maybeCallBeforeSave(T object, Document document, String collection) {
        if (this.entityCallbacks != null) {
            return this.entityCallbacks.callback(ReactiveBeforeSaveCallback.class, object, new Object[]{document, collection});
        }
        return Mono.just(object);
    }

    protected <T> Mono<T> maybeCallAfterSave(T object, Document document, String collection) {
        if (this.entityCallbacks != null) {
            return this.entityCallbacks.callback(ReactiveAfterSaveCallback.class, object, new Object[]{document, collection});
        }
        return Mono.just(object);
    }

    protected <T> Mono<T> maybeCallAfterConvert(T object, Document document, String collection) {
        if (this.entityCallbacks != null) {
            return this.entityCallbacks.callback(ReactiveAfterConvertCallback.class, object, new Object[]{document, collection});
        }
        return Mono.just(object);
    }

    private MongoCollection<Document> getAndPrepareCollection(MongoDatabase db, String collectionName) {
        try {
            MongoCollection collection = db.getCollection(collectionName, Document.class);
            return this.prepareCollection((MongoCollection<Document>)collection);
        }
        catch (RuntimeException e) {
            throw ReactiveMongoTemplate.potentiallyConvertRuntimeException(e, this.exceptionTranslator);
        }
    }

    protected void ensureNotIterable(Object o) {
        boolean isIterable;
        boolean bl = isIterable = o.getClass().isArray() || ITERABLE_CLASSES.stream().anyMatch(iterableClass -> iterableClass.isAssignableFrom(o.getClass()) || o.getClass().getName().equals(iterableClass.getName()));
        if (isIterable) {
            throw new IllegalArgumentException("Cannot use a collection here.");
        }
    }

    protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
        return this.readPreference != null ? collection.withReadPreference(this.readPreference) : collection;
    }

    protected MongoDatabase prepareDatabase(MongoDatabase database) {
        return database;
    }

    @Nullable
    protected WriteConcern prepareWriteConcern(MongoAction mongoAction) {
        WriteConcern wc = this.writeConcernResolver.resolve(mongoAction);
        return this.potentiallyForceAcknowledgedWrite(wc);
    }

    @Nullable
    private WriteConcern potentiallyForceAcknowledgedWrite(@Nullable WriteConcern wc) {
        if (ObjectUtils.nullSafeEquals((Object)((Object)WriteResultChecking.EXCEPTION), (Object)((Object)this.writeResultChecking)) && (wc == null || wc.getWObject() == null || wc.getWObject() instanceof Number && ((Number)wc.getWObject()).intValue() < 1)) {
            return WriteConcern.ACKNOWLEDGED;
        }
        return wc;
    }

    private <T> Mono<T> executeFindOneInternal(ReactiveCollectionCallback<Document> collectionCallback, DocumentCallback<T> objectCallback, String collectionName) {
        return this.createMono(collectionName, collection -> Mono.from(collectionCallback.doInCollection((MongoCollection<Document>)collection)).flatMap(objectCallback::doWith));
    }

    private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Document> collectionCallback, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback, String collectionName) {
        return this.createFlux(collectionName, collection -> Flux.from(preparer.initiateFind((MongoCollection<Document>)collection, collectionCallback::doInCollection)).concatMap(objectCallback::doWith));
    }

    private Function<Throwable, Throwable> translateException() {
        return throwable -> {
            if (throwable instanceof RuntimeException) {
                return ReactiveMongoTemplate.potentiallyConvertRuntimeException((RuntimeException)throwable, this.exceptionTranslator);
            }
            return throwable;
        };
    }

    private static RuntimeException potentiallyConvertRuntimeException(RuntimeException ex, PersistenceExceptionTranslator exceptionTranslator) {
        DataAccessException resolved = exceptionTranslator.translateExceptionIfPossible(ex);
        return resolved == null ? ex : resolved;
    }

    @Nullable
    private MongoPersistentEntity<?> getPersistentEntity(@Nullable Class<?> type) {
        return type == null ? null : (MongoPersistentEntity)this.mappingContext.getPersistentEntity(type);
    }

    private MappingMongoConverter getDefaultMongoConverter() {
        MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList());
        MongoMappingContext context = new MongoMappingContext();
        context.setSimpleTypeHolder(conversions.getSimpleTypeHolder());
        context.afterPropertiesSet();
        MappingMongoConverter converter = new MappingMongoConverter(NO_OP_REF_RESOLVER, (MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty>)context);
        converter.setCustomConversions(conversions);
        converter.setCodecRegistryProvider(this.mongoDatabaseFactory);
        converter.afterPropertiesSet();
        return converter;
    }

    private Document getMappedSortObject(Query query, Class<?> type) {
        if (query == null) {
            return null;
        }
        return this.queryMapper.getMappedSort(query.getSortObject(), (MongoPersistentEntity)this.mappingContext.getPersistentEntity(type));
    }

    private static FindOneAndDeleteOptions convertToFindOneAndDeleteOptions(Document fields, Document sort) {
        FindOneAndDeleteOptions result = new FindOneAndDeleteOptions();
        result = result.projection((Bson)fields).sort((Bson)sort);
        return result;
    }

    private static List<? extends Document> toDocuments(Collection<? extends Document> documents) {
        return new ArrayList<Document>(documents);
    }

    static {
        HashSet<Class> iterableClasses = new HashSet<Class>();
        iterableClasses.add(List.class);
        iterableClasses.add(Collection.class);
        iterableClasses.add(Iterator.class);
        iterableClasses.add(Publisher.class);
        ITERABLE_CLASSES = Collections.unmodifiableCollection(iterableClasses);
    }

    private static class PersistableEntityModel<T> {
        private final T source;
        @Nullable
        private final Document target;
        private final String collection;

        private PersistableEntityModel(T source, @Nullable Document target, String collection) {
            this.source = source;
            this.target = target;
            this.collection = collection;
        }

        static <T> PersistableEntityModel<T> of(T source, String collection) {
            return new PersistableEntityModel<T>(source, null, collection);
        }

        static <T> PersistableEntityModel<T> of(T source, Document target, String collection) {
            return new PersistableEntityModel<T>(source, target, collection);
        }

        PersistableEntityModel<T> mutate(T source) {
            return new PersistableEntityModel<T>(source, this.target, this.collection);
        }

        PersistableEntityModel<T> addTargetDocument(Document target) {
            return new PersistableEntityModel<T>(this.source, target, this.collection);
        }

        T getSource() {
            return this.source;
        }

        @Nullable
        Document getTarget() {
            return this.target;
        }

        String getCollection() {
            return this.collection;
        }
    }

    class IndexCreatorEventListener
    implements ApplicationListener<MappingContextEvent<?, ?>> {
        final Consumer<Throwable> subscriptionExceptionHandler;

        public void onApplicationEvent(MappingContextEvent<?, ?> event) {
            if (!event.wasEmittedBy(ReactiveMongoTemplate.this.mappingContext)) {
                return;
            }
            PersistentEntity entity = event.getPersistentEntity();
            if (entity instanceof MongoPersistentEntity) {
                ReactiveMongoTemplate.this.onCheckForIndexes((MongoPersistentEntity)entity, this.subscriptionExceptionHandler);
            }
        }

        public IndexCreatorEventListener(Consumer<Throwable> subscriptionExceptionHandler) {
            this.subscriptionExceptionHandler = subscriptionExceptionHandler;
        }
    }

    static class ReactiveSessionBoundMongoTemplate
    extends ReactiveMongoTemplate {
        private final ReactiveMongoTemplate delegate;
        private final ClientSession session;

        ReactiveSessionBoundMongoTemplate(ClientSession session, ReactiveMongoTemplate that) {
            super(that.mongoDatabaseFactory.withSession(session), that);
            this.delegate = that;
            this.session = session;
        }

        @Override
        public Mono<MongoCollection<Document>> getCollection(String collectionName) {
            return this.delegate.getCollection(collectionName);
        }

        @Override
        public Mono<MongoDatabase> getMongoDatabase() {
            return this.delegate.getMongoDatabase();
        }
    }

    class TailingQueryFindPublisherPreparer
    extends QueryFindPublisherPreparer {
        TailingQueryFindPublisherPreparer(Query query, Class<?> type) {
            super(query, type);
        }

        @Override
        public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {
            return super.prepare((FindPublisher<Document>)findPublisher.cursorType(CursorType.TailableAwait));
        }
    }

    class QueryFindPublisherPreparer
    implements FindPublisherPreparer {
        private final Query query;
        @Nullable
        private final Class<?> type;

        QueryFindPublisherPreparer(@Nullable Query query, Class<?> type) {
            this.query = query;
            this.type = type;
        }

        @Override
        public FindPublisher<Document> prepare(FindPublisher<Document> findPublisher) {
            FindPublisher findPublisherToUse = ReactiveMongoTemplate.this.operations.forType(this.type).getCollation(this.query).map(Collation::toMongoCollation).map(arg_0 -> findPublisher.collation(arg_0)).orElse(findPublisher);
            Meta meta = this.query.getMeta();
            if (this.query.getSkip() <= 0L && this.query.getLimit() <= 0 && ObjectUtils.isEmpty((Object)this.query.getSortObject()) && !StringUtils.hasText((String)this.query.getHint()) && !meta.hasValues()) {
                return findPublisherToUse;
            }
            try {
                if (this.query.getSkip() > 0L) {
                    findPublisherToUse = findPublisherToUse.skip((int)this.query.getSkip());
                }
                if (this.query.getLimit() > 0) {
                    findPublisherToUse = findPublisherToUse.limit(this.query.getLimit());
                }
                if (!ObjectUtils.isEmpty((Object)this.query.getSortObject())) {
                    Document sort = this.type != null ? ReactiveMongoTemplate.this.getMappedSortObject(this.query, this.type) : this.query.getSortObject();
                    findPublisherToUse = findPublisherToUse.sort((Bson)sort);
                }
                if (StringUtils.hasText((String)this.query.getHint())) {
                    String hint = this.query.getHint();
                    findPublisherToUse = BsonUtils.isJsonDocument(hint) ? findPublisherToUse.hint((Bson)BsonUtils.parse(hint, ReactiveMongoTemplate.this.mongoDatabaseFactory)) : findPublisherToUse.hintString(hint);
                }
                if (meta.hasValues()) {
                    if (StringUtils.hasText((String)meta.getComment())) {
                        findPublisherToUse = findPublisherToUse.comment(meta.getComment());
                    }
                    if (meta.getMaxTimeMsec() != null) {
                        findPublisherToUse = findPublisherToUse.maxTime(meta.getMaxTimeMsec().longValue(), TimeUnit.MILLISECONDS);
                    }
                    if (meta.getCursorBatchSize() != null) {
                        findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize().intValue());
                    }
                }
            }
            catch (RuntimeException e) {
                throw ReactiveMongoTemplate.potentiallyConvertRuntimeException(e, ReactiveMongoTemplate.this.exceptionTranslator);
            }
            return findPublisherToUse;
        }

        @Override
        public ReadPreference getReadPreference() {
            return this.query.getMeta().getFlags().contains((Object)Meta.CursorOption.SLAVE_OK) ? ReadPreference.primaryPreferred() : null;
        }
    }

    static class GeoNearResultDocumentCallback<T>
    implements DocumentCallback<GeoResult<T>> {
        private final String distanceField;
        private final DocumentCallback<T> delegate;
        private final Metric metric;

        GeoNearResultDocumentCallback(String distanceField, DocumentCallback<T> delegate, Metric metric) {
            Assert.notNull(delegate, (String)"DocumentCallback must not be null!");
            this.distanceField = distanceField;
            this.delegate = delegate;
            this.metric = metric;
        }

        @Override
        public Mono<GeoResult<T>> doWith(Document object) {
            double distance = this.getDistance(object);
            return this.delegate.doWith(object).map(doWith -> new GeoResult(doWith, new Distance(distance, this.metric)));
        }

        double getDistance(Document object) {
            if (object.containsKey((Object)this.distanceField)) {
                return (Double)NumberUtils.convertNumberToTargetClass((Number)((Number)object.get((Object)this.distanceField, Number.class)), Double.class);
            }
            return Double.NaN;
        }
    }

    private class ProjectingReadCallback<S, T>
    implements DocumentCallback<T> {
        @NonNull
        private final EntityReader<Object, Bson> reader;
        @NonNull
        private final Class<S> entityType;
        @NonNull
        private final Class<T> targetType;
        @NonNull
        private final String collectionName;

        @Override
        public Mono<T> doWith(Document document) {
            Object result;
            Class<S> typeToRead = this.targetType.isInterface() || this.targetType.isAssignableFrom(this.entityType) ? this.entityType : this.targetType;
            ReactiveMongoTemplate.this.maybeEmitEvent(new AfterLoadEvent<S>(document, typeToRead, this.collectionName));
            Object source = this.reader.read(typeToRead, (Object)document);
            Object castEntity = result = this.targetType.isInterface() ? ReactiveMongoTemplate.this.projectionFactory.createProjection(this.targetType, source) : source;
            if (castEntity != null) {
                ReactiveMongoTemplate.this.maybeEmitEvent(new AfterConvertEvent<Object>(document, castEntity, this.collectionName));
                return ReactiveMongoTemplate.this.maybeCallAfterConvert(castEntity, document, this.collectionName);
            }
            return Mono.empty();
        }

        public ProjectingReadCallback(@NonNull EntityReader<Object, Bson> reader, @NonNull Class<S> entityType, @NonNull Class<T> targetType, String collectionName) {
            if (reader == null) {
                throw new IllegalArgumentException("reader is marked non-null but is null");
            }
            if (entityType == null) {
                throw new IllegalArgumentException("entityType is marked non-null but is null");
            }
            if (targetType == null) {
                throw new IllegalArgumentException("targetType is marked non-null but is null");
            }
            if (collectionName == null) {
                throw new IllegalArgumentException("collectionName is marked non-null but is null");
            }
            this.reader = reader;
            this.entityType = entityType;
            this.targetType = targetType;
            this.collectionName = collectionName;
        }
    }

    class ReadDocumentCallback<T>
    implements DocumentCallback<T> {
        private final EntityReader<? super T, Bson> reader;
        private final Class<T> type;
        private final String collectionName;

        ReadDocumentCallback(EntityReader<? super T, Bson> reader, Class<T> type, String collectionName) {
            Assert.notNull(reader, (String)"EntityReader must not be null!");
            Assert.notNull(type, (String)"Entity type must not be null!");
            this.reader = reader;
            this.type = type;
            this.collectionName = collectionName;
        }

        @Override
        public Mono<T> doWith(Document document) {
            ReactiveMongoTemplate.this.maybeEmitEvent(new AfterLoadEvent<T>(document, this.type, this.collectionName));
            Object source = this.reader.read(this.type, (Object)document);
            if (source != null) {
                ReactiveMongoTemplate.this.maybeEmitEvent(new AfterConvertEvent<Object>(document, source, this.collectionName));
                return ReactiveMongoTemplate.this.maybeCallAfterConvert(source, document, this.collectionName);
            }
            return Mono.empty();
        }
    }

    static interface ReactiveCollectionQueryCallback<T>
    extends ReactiveCollectionCallback<T> {
        @Override
        public FindPublisher<T> doInCollection(MongoCollection<Document> var1) throws MongoException, DataAccessException;
    }

    static interface MongoDatabaseCallback<T> {
        public T doInDatabase(MongoDatabase var1);
    }

    static interface DocumentCallback<T> {
        public Mono<T> doWith(Document var1);
    }

    private static class FindAndReplaceCallback
    implements ReactiveCollectionCallback<Document> {
        private final Document query;
        private final Document fields;
        private final Document sort;
        private final Document update;
        @Nullable
        private final com.mongodb.client.model.Collation collation;
        private final FindAndReplaceOptions options;

        @Override
        public Publisher<Document> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
            FindOneAndReplaceOptions findOneAndReplaceOptions = this.convertToFindOneAndReplaceOptions(this.options, this.fields, this.sort);
            return collection.findOneAndReplace((Bson)this.query, (Object)this.update, findOneAndReplaceOptions);
        }

        private FindOneAndReplaceOptions convertToFindOneAndReplaceOptions(FindAndReplaceOptions options, Document fields, Document sort) {
            FindOneAndReplaceOptions result = new FindOneAndReplaceOptions().collation(this.collation);
            result = result.projection((Bson)fields).sort((Bson)sort).upsert(options.isUpsert());
            result = options.isReturnNew() ? result.returnDocument(ReturnDocument.AFTER) : result.returnDocument(ReturnDocument.BEFORE);
            return result;
        }

        FindAndReplaceCallback(Document query, Document fields, Document sort, Document update, @Nullable com.mongodb.client.model.Collation collation, FindAndReplaceOptions options) {
            this.query = query;
            this.fields = fields;
            this.sort = sort;
            this.update = update;
            this.collation = collation;
            this.options = options;
        }
    }

    private static class FindAndModifyCallback
    implements ReactiveCollectionCallback<Document> {
        private final Document query;
        private final Document fields;
        private final Document sort;
        private final Object update;
        private final List<Document> arrayFilters;
        private final FindAndModifyOptions options;

        @Override
        public Publisher<Document> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
            if (this.options.isRemove()) {
                FindOneAndDeleteOptions findOneAndDeleteOptions = ReactiveMongoTemplate.convertToFindOneAndDeleteOptions(this.fields, this.sort);
                findOneAndDeleteOptions = this.options.getCollation().map(Collation::toMongoCollation).map(arg_0 -> ((FindOneAndDeleteOptions)findOneAndDeleteOptions).collation(arg_0)).orElse(findOneAndDeleteOptions);
                return collection.findOneAndDelete((Bson)this.query, findOneAndDeleteOptions);
            }
            FindOneAndUpdateOptions findOneAndUpdateOptions = FindAndModifyCallback.convertToFindOneAndUpdateOptions(this.options, this.fields, this.sort, this.arrayFilters);
            if (this.update instanceof Document) {
                return collection.findOneAndUpdate((Bson)this.query, (Bson)((Document)this.update), findOneAndUpdateOptions);
            }
            if (this.update instanceof List) {
                return collection.findOneAndUpdate((Bson)this.query, (List)this.update, findOneAndUpdateOptions);
            }
            return Flux.error((Throwable)new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", this.update)));
        }

        private static FindOneAndUpdateOptions convertToFindOneAndUpdateOptions(FindAndModifyOptions options, Document fields, Document sort, List<Document> arrayFilters) {
            FindOneAndUpdateOptions result = new FindOneAndUpdateOptions();
            result = result.projection((Bson)fields).sort((Bson)sort).upsert(options.isUpsert());
            result = options.isReturnNew() ? result.returnDocument(ReturnDocument.AFTER) : result.returnDocument(ReturnDocument.BEFORE);
            result = options.getCollation().map(Collation::toMongoCollation).map(arg_0 -> ((FindOneAndUpdateOptions)result).collation(arg_0)).orElse(result);
            if (!CollectionUtils.isEmpty(arrayFilters)) {
                result.arrayFilters(arrayFilters);
            }
            return result;
        }

        public FindAndModifyCallback(Document query, Document fields, Document sort, Object update, List<Document> arrayFilters, FindAndModifyOptions options) {
            this.query = query;
            this.fields = fields;
            this.sort = sort;
            this.update = update;
            this.arrayFilters = arrayFilters;
            this.options = options;
        }
    }

    private static class FindAndRemoveCallback
    implements ReactiveCollectionCallback<Document> {
        private final Document query;
        private final Document fields;
        private final Document sort;
        private final Optional<Collation> collation;

        FindAndRemoveCallback(Document query, Document fields, Document sort, @Nullable Collation collation) {
            this.query = query;
            this.fields = fields;
            this.sort = sort;
            this.collation = Optional.ofNullable(collation);
        }

        @Override
        public Publisher<Document> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
            FindOneAndDeleteOptions findOneAndDeleteOptions = ReactiveMongoTemplate.convertToFindOneAndDeleteOptions(this.fields, this.sort);
            this.collation.map(Collation::toMongoCollation).ifPresent(arg_0 -> ((FindOneAndDeleteOptions)findOneAndDeleteOptions).collation(arg_0));
            return collection.findOneAndDelete((Bson)this.query, findOneAndDeleteOptions);
        }
    }

    private static class FindCallback
    implements ReactiveCollectionQueryCallback<Document> {
        @Nullable
        private final Document query;
        @Nullable
        private final Document fields;

        FindCallback(@Nullable Document query) {
            this(query, null);
        }

        @Override
        public FindPublisher<Document> doInCollection(MongoCollection<Document> collection) {
            FindPublisher findPublisher = ObjectUtils.isEmpty((Object)this.query) ? collection.find(Document.class) : collection.find((Bson)this.query, Document.class);
            if (ObjectUtils.isEmpty((Object)this.fields)) {
                return findPublisher;
            }
            return findPublisher.projection((Bson)this.fields);
        }

        public FindCallback(@Nullable Document query, @Nullable Document fields) {
            this.query = query;
            this.fields = fields;
        }
    }

    private static class FindOneCallback
    implements ReactiveCollectionCallback<Document> {
        private final Document query;
        private final Optional<Document> fields;
        private final FindPublisherPreparer preparer;

        FindOneCallback(Document query, @Nullable Document fields, FindPublisherPreparer preparer) {
            this.query = query;
            this.fields = Optional.ofNullable(fields);
            this.preparer = preparer;
        }

        @Override
        public Publisher<Document> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("findOne using query: {} fields: {} in db.collection: {}", new Object[]{SerializationUtils.serializeToJsonSafely(this.query), SerializationUtils.serializeToJsonSafely(this.fields.orElseGet(Document::new)), collection.getNamespace().getFullName()});
            }
            FindPublisher publisher = this.preparer.initiateFind(collection, col -> col.find((Bson)this.query, Document.class));
            if (this.fields.isPresent()) {
                publisher = publisher.projection((Bson)this.fields.get());
            }
            return publisher.limit(1).first();
        }
    }
}

