const { isArray, isPositiveInteger } = require("@wagerlab/utils/data/types");
const { generateRemovedDoc } = require("@wagerlab/utils/database/helpers");
const { logError, logInfo } = require("@wagerlab/utils/logging");
const _ = require("lodash");

// Rules:
// Listeners should be used sparingly and only on server-side
// deleteValue() serverTimeValue() and incrementValue() can only be used in update calls (both single and batchWrite) and NOT in set calls (single or batchWrite)

class MongoAdapter {
  databaseInstance = null;
  collectionName = null;
  primaryKey = null;
  collectionRef = null;

  constructor(databaseInstance, collectionName, primaryKey) {
    this.databaseInstance = databaseInstance;
    this.collectionName = collectionName;
    this.primaryKey = primaryKey;
    this.collectionRef = databaseInstance.db.collection(collectionName);
  }

  async query(queryParts, sorting = [], limit = null, selectFields = []) {
    const sort = {};
    const startAfterOrFilters = [];
    const endBeforeOrFilters = [];
    let nextStartAfterFilter = {};
    let nextEndBeforeFilter = {};
    sorting.forEach(({ sortBy, order = "asc", startAfter, endBefore }) => {
      const sortByField = sortBy === this.primaryKey ? "_id" : sortBy;
      sort[sortByField] = order === "desc" ? -1 : 1;

      if (startAfter != null) {
        const startsAfterFilterValue = order === "desc" ? { $lt: startAfter } : { $gt: startAfter };
        startAfterOrFilters.push({ ...nextStartAfterFilter, [sortByField]: startsAfterFilterValue });
        nextStartAfterFilter[sortByField] = startAfter;
      }
      if (endBefore != null) {
        const endBeforeFilterValue = order === "desc" ? { $gt: endBefore } : { $lt: endBefore };
        endBeforeOrFilters.push({ ...nextEndBeforeFilter, [sortByField]: endBeforeFilterValue });
        nextEndBeforeFilter[sortByField] = endBefore;
      }
    });
    const startAfterCombinedFilter = startAfterOrFilters.length ? { $or: startAfterOrFilters } : null;
    const endBeforeCombinedFilter = endBeforeOrFilters.length ? { $or: endBeforeOrFilters } : null;
    const sortFilters = [startAfterCombinedFilter, endBeforeCombinedFilter].filter((f) => f);

    const projection = selectFields.length
      ? selectFields.reduce((proj, fieldName) => {
          proj[fieldName] = 1;
          return proj;
        }, {})
      : null;

    let filter = this.buildMongoFilter(queryParts);
    if (sortFilters.length) filter = { $and: [filter, ...sortFilters] };
    let query = this.collectionRef.find(filter).sort(sort);
    if (limit) query = query.limit(limit);
    if (projection) query = query.project(projection);

    return await query
      .toArray()
      .then((results) => this.mongoQueryResponse(results))
      .catch((error) => {
        logError(`MongoAdapter:query error: ${error}`, { queryParts, sorting, limit, selectFields, error });
        return null;
      });
  }

  queryListener(queryParts, onUpdate, onError, onlyChanges = false) {
    const filter = this.buildMongoFilter(queryParts);
    let changeStream;
    let docsMap;
    let pendingChanges = [];
    let batchTimeout = null;
    let processingBatch = false;
    const BATCH_DELAY = 0;
    let isDestroyed = false;

    const processPendingChanges = () => {
      if (processingBatch || isDestroyed || !pendingChanges.length) return;

      processingBatch = true;
      const processingChanges = [...pendingChanges];
      pendingChanges = [];
      batchTimeout = null;

      const changes = [];

      for (const changeEvent of processingChanges) {
        const { fullDocument, operationType, documentKey } = changeEvent || {};
        const docID = documentKey?._id?.toString?.();
        if (!operationType || !docID) continue;

        if (operationType === "delete") {
          if (docsMap.has(docID)) {
            docsMap.delete(docID);
            changes.push(generateRemovedDoc(docID, true));
          }
        } else if (["insert", "replace", "update"].includes(operationType)) {
          if (!fullDocument) continue;
          const newMatchesFilter = this.checkMatchesFilter(queryParts, fullDocument);
          const newDoc = this.mongoDocResponse(fullDocument);

          const prevMatchedFilter = docsMap.has(docID);
          if (!newMatchesFilter && prevMatchedFilter) {
            docsMap.delete(docID);
            changes.push(generateRemovedDoc(docID, false, newDoc));
          } else if (newMatchesFilter) {
            docsMap.set(docID, newDoc);
            changes.push(newDoc);
          }
        }
      }

      try {
        const docsToBroadcast = !changes.length ? null : onlyChanges ? changes : _.cloneDeep(Array.from(docsMap.values()));
        if (docsToBroadcast) onUpdate(docsToBroadcast, false);
      } catch (error) {
        onError(error);
        return;
      }

      processingBatch = false;
      if (pendingChanges.length > 0) scheduleChangeProcessing();
    };

    const scheduleChangeProcessing = () => {
      if (processingBatch || isDestroyed) return;
      if (batchTimeout) clearTimeout(batchTimeout);
      batchTimeout = setTimeout(processPendingChanges, BATCH_DELAY);
    };

    const onChange = (next) => {
      pendingChanges.push(next);
      scheduleChangeProcessing();
    };

    const initial = this.collectionRef
      .find(filter)
      .toArray()
      .catch((error) => {
        if (isDestroyed) return null;
        isDestroyed = true;
        onError(error);
        return null;
      })
      .then((initialData) => {
        if (isDestroyed) return null;
        if (!isArray(initialData)) {
          onError(new Error("No initial data"));
          return null;
        }
        docsMap = new Map();
        const initialDocs = [];
        for (const initialItem of initialData) {
          const initialID = initialItem?._id?.toString?.();
          const initialDoc = this.mongoDocResponse(initialItem);
          if (!initialID || !initialDoc) continue;
          initialDocs.push(initialDoc);
          docsMap.set(initialID, initialDoc);
        }

        try {
          const docsToBroadcast = onlyChanges ? initialData : _.cloneDeep(Array.from(docsMap.values()));
          onUpdate(docsToBroadcast, true);
        } catch (error) {
          onError(error);
          return null;
        }

        changeStream = this.collectionRef.watch([{ $match: { operationType: { $in: ["insert", "replace", "update", "delete"] } } }], { fullDocument: "updateLookup" });
        changeStream.on("change", onChange);
        changeStream.on("error", onError);
        return initialData;
      });

    const unsubscribe = async () => {
      isDestroyed = true;
      if (batchTimeout) clearTimeout(batchTimeout);
      batchTimeout = null;
      if (docsMap) docsMap.clear();
      if (pendingChanges) pendingChanges = [];

      docsMap = null;
      if (changeStream) await changeStream.close();
      changeStream = null;
    };

    return { initial, unsubscribe };
  }

  docListener(id, onUpdate, onError) {
    let changeStream;
    let isDestroyed = false;

    const initial = this.collectionRef
      .findOne({ _id: id })
      .catch((error) => {
        if (isDestroyed) return null;
        isDestroyed = true;
        onError(error);
        return null;
      })
      .then((initialDoc) => {
        if (isDestroyed) return null;
        const formattedInitialDoc = this.mongoDocResponse(initialDoc);
        try {
          onUpdate(formattedInitialDoc, true);
        } catch (error) {
          onError(error);
          return null;
        }

        changeStream = this.collectionRef.watch([{ $match: { $and: [{ "documentKey._id": id }, { operationType: { $in: ["insert", "replace", "update", "delete"] } }] } }], {
          fullDocument: "updateLookup",
        });

        const onValue = (changeEvent) => {
          try {
            const { fullDocument, operationType } = changeEvent || {};
            if (isDestroyed || (operationType !== "delete" && !fullDocument)) return;
            const docResponse = operationType === "delete" ? null : fullDocument;
            const updatedDoc = this.mongoDocResponse(docResponse);
            onUpdate(updatedDoc, false);
          } catch (error) {
            onError(error);
          }
        };
        changeStream.on("change", onValue);
        changeStream.on("error", onError);
        return formattedInitialDoc;
      });

    const unsubscribe = () => {
      if (changeStream) {
        changeStream.close();
        changeStream = null;
      }
    };

    return { initial, unsubscribe };
  }

  async getAll() {
    return this.collectionRef
      .find({})
      .toArray()
      .then((results) => this.mongoQueryResponse(results))
      .catch((error) => {
        logError(`MongoAdapter:getAll error`, { error });
        return null;
      });
  }

  async get(id) {
    return this.collectionRef
      .findOne({ _id: id })
      .then((document) => this.mongoDocResponse(document))
      .catch((error) => {
        logError(`MongoAdapter:get error`, { id, error });
        return null;
      });
  }

  async set(id, data, returnModified = false) {
    data._id = id;
    return this.collectionRef
      .replaceOne({ _id: id }, data, { upsert: true })
      .then((result) => {
        if (!returnModified) return true;
        return isPositiveInteger(result?.modifiedCount) || isPositiveInteger(result?.upsertedCount);
      })
      .catch((error) => {
        logError(`MongoAdapter:set error`, { id, data, error });
        return null;
      });
  }

  async delete(id, returnModified = false) {
    return this.collectionRef
      .deleteOne({ _id: id })
      .then((result) => {
        if (!returnModified) return true;
        return result?.deletedCount === 1;
      })
      .catch((error) => {
        logError(`MongoAdapter:delete error`, { id, error });
        return null;
      });
  }

  async update(id, updates, returnModified = false) {
    if ("_id" in updates && updates._id !== id) {
      logError(`MongoAdapter:update error: Invalid update call`, { id, updates });
      return null;
    }
    const mongoUpdate = this.buildMongoUpdate(updates);
    return this.collectionRef
      .updateOne({ _id: id }, mongoUpdate)
      .then((result) => {
        if (!returnModified) return true;
        return isPositiveInteger(result?.modifiedCount) || isPositiveInteger(result?.upsertedCount);
      })
      .catch((error) => {
        logError(`MongoAdapter:update error`, { id, updates, error });
        return null;
      });
  }

  async batchWrite(batchList) {
    const operations = [];

    for (const { method, id, payload } of batchList) {
      let operation;
      if (method === "set") {
        const replacement = { ...payload, _id: id };
        operation = { replaceOne: { filter: { _id: id }, replacement, upsert: true } };
      } else if (method === "delete") {
        operation = { deleteOne: { filter: { _id: id } } };
      } else if (method === "update") {
        if ("_id" in payload && payload._id !== id) {
          logError(`MongoAdapter:batchWrite error: Invalid update call`, { id, payload });
          continue;
        }
        operation = {
          updateOne: {
            filter: { _id: id },
            update: this.buildMongoUpdate(payload),
          },
        };
      }
      if (operation) operations.push(operation);
    }
    if (!operations?.length) return true;

    return this.collectionRef
      .bulkWrite(operations, { ordered: false })
      .then(() => true)
      .catch((error) => {
        logError("MongoAdapter.batchWrite error", { error });
        return false;
      });
  }

  async transaction(inputData, transactionFunction) {
    const client = this.databaseInstance.client;
    return client.withSession(async (session) =>
      session.withTransaction(async (session) => {
        try {
          const id = inputData?.id;
          const { remoteDoc, fetchError } = await this.collectionRef
            .findOne({ _id: id })
            .then((data) => ({ remoteDoc: this.mongoDocResponse(data), fetchError: false }))
            .catch((error) => {
              logError(`MongoAdapter:transaction error: Error fetching remote data`, { error, id });
              return { remoteDoc: null, fetchError: true };
            });

          if (fetchError) {
            await session.abortTransaction();
            return { success: false, aborted: false, data: null, input: inputData };
          }

          let abortCalled = false;
          const abortFunc = () => {
            abortCalled = true;
          };

          const updatedDoc = transactionFunction(remoteDoc, inputData, abortFunc);

          if (abortCalled) {
            await session.abortTransaction();
            return { success: true, aborted: true, data: remoteDoc, input: inputData };
          }

          const writePromise = updatedDoc
            ? this.collectionRef
                .replaceOne({ _id: id }, { ...updatedDoc, _id: id }, { upsert: true, session })
                .then((res) => true)
                .catch((error) => {
                  logError(`MongoAdapter:set error`, { id, updatedDoc, error });
                  return false;
                })
            : this.collectionRef
                .deleteOne({ _id: id }, { session })
                .then((res) => true)
                .catch((error) => {
                  logError(`MongoAdapter:delete error`, { id, error });
                  return false;
                });

          const writeSuccessful = await writePromise;
          if (!writeSuccessful) {
            await session.abortTransaction();
            return { success: false, aborted: false, data: remoteDoc, input: inputData };
          }

          return { success: true, aborted: false, data: updatedDoc, input: inputData };
        } catch (error) {
          logError(`MongoAdapter:transaction error`, { inputData, error });
          return { success: false, aborted: false, data: null, input: inputData };
        }
      })
    );
  }

  // Unlike the other one, this one actually uses a fully atomic transaction
  // This comes at the expense of both speed and errors
  async batchTransaction_alt(inputList, transactionFunction) {
    if (!inputList?.length) return [];
    // Note: We may be able to increase this substantially.
    // Still need to conduct more testing. Currently narrowed ideal: 2k-10k
    // 10k is actually faster than the non-transaction version but prone to errors with more updates/larger docs
    // 250-500-ish max to prevent errors, but this is MUCH slower
    const BATCH_SIZE = 300;

    const results = [];
    const session = await this.databaseInstance.client.startSession();
    try {
      for (let i = 0; i < inputList.length; i += BATCH_SIZE) {
        const batchInputs = inputList.slice(i, i + BATCH_SIZE);

        const batchResults = await session
          .withTransaction(
            async () => {
              const idsToFetch = batchInputs.map((input) => input.id);
              const remoteDataList = await this.collectionRef.find({ _id: { $in: idsToFetch } }, { session }).toArray();
              const remoteDataMap = remoteDataList.reduce((rdMap, remoteDataItem) => {
                const remoteDataID = remoteDataItem?._id?.toString?.();
                if (!remoteDataID) return rdMap;
                rdMap[remoteDataID] = this.mongoDocResponse(remoteDataItem);
                return rdMap;
              }, {});

              const writeOperations = [];
              const processedResults = [];

              for (const input of batchInputs) {
                try {
                  let abortCalled = false;
                  const remoteData = remoteDataMap[input.id];
                  const abortFunc = () => (abortCalled = true);
                  const updatedData = transactionFunction(remoteData, input, abortFunc);

                  if (abortCalled) {
                    processedResults.push({
                      success: true,
                      aborted: true,
                      data: remoteData,
                      input,
                    });
                  } else if (updatedData) {
                    const replacement = { ...updatedData, _id: input.id };
                    writeOperations.push({ replaceOne: { filter: { _id: input.id }, replacement, upsert: true } });
                    processedResults.push({
                      success: true,
                      aborted: false,
                      data: updatedData,
                      input,
                    });
                  } else {
                    writeOperations.push({ deleteOne: { filter: { _id: input.id } } });
                    processedResults.push({
                      success: true,
                      aborted: false,
                      data: null,
                      input,
                    });
                  }
                } catch (error) {
                  logError(`MongoAdapter:batchTransaction error: Error processing item ${input?.id}`, { input, error });
                  processedResults.push({
                    success: false,
                    aborted: false,
                    data: null,
                    input,
                  });
                }
              }

              if (!writeOperations?.length) return processedResults;

              return await this.collectionRef
                .bulkWrite(writeOperations, { session, ordered: false })
                .then(() => processedResults)
                .catch((error) => {
                  logError(`MongoAdapter:batchTransaction error: Batch chunk error`, { error });
                  return processedResults.map((result) => (result.aborted ? result : { success: false, aborted: false, data: null, input: result.input }));
                });
            },
            {
              maxTimeMS: 600000,
            }
          )
          .catch((error) => {
            logError(`MongoAdapter:batchTransaction error: Transaction failed for batch at index ${i}`, { error });
            return batchInputs.map((input) => ({
              success: false,
              aborted: false,
              data: null,
              input,
            }));
          });

        results.push(...batchResults);
      }

      return results;
    } finally {
      await session.endSession();
    }
  }

  async batchTransaction(inputList, transactionFunction) {
    if (!inputList?.length) {
      logError(`MongoAdapter:batchTransaction error: No input list for batch transaction`);
      return [];
    }
    // 700/5 was the best from extensive testing
    // TODO: consider making it so we know when it's a retry and in such cases trim things down
    const BATCH_SIZE = 700;
    const CONCURRENCY = 5;

    const batches = _.chunk(inputList, BATCH_SIZE);
    const concurrentBatches = _.chunk(batches, CONCURRENCY);
    const results = [];

    for (let i = 0; i < concurrentBatches.length; i++) {
      const batchesToProcess = concurrentBatches[i];
      const concurrentBatchPromises = batchesToProcess.map(async (batchInputs) => {
        let remoteDataMap = {};
        try {
          const idsToFetch = batchInputs.map((input) => input.id);
          const remoteDataResponse = await this.collectionRef.find({ _id: { $in: idsToFetch } }).toArray();
          remoteDataMap = remoteDataResponse.reduce((rdMap, remoteDataItem) => {
            const remoteDataID = remoteDataItem?._id?.toString?.();
            if (!remoteDataID) return rdMap;
            rdMap[remoteDataID] = this.mongoDocResponse(remoteDataItem);
            return rdMap;
          }, {});
        } catch (error) {
          logError(`MongoAdapter:batchTransaction error: Transaction failed for batch at index ${i}`, { error });
          return batchInputs.map((input) => ({
            success: false,
            aborted: false,
            data: null,
            input,
          }));
        }

        const writeOperations = [];
        const processedResults = [];

        for (const input of batchInputs) {
          try {
            let abortCalled = false;
            const remoteData = remoteDataMap[input.id];
            const abortFunc = () => (abortCalled = true);
            const updatedData = transactionFunction(remoteData, input, abortFunc);

            if (abortCalled) {
              processedResults.push({
                success: true,
                aborted: true,
                data: remoteData,
                input,
              });
            } else if (updatedData) {
              const replacement = { ...updatedData, _id: input.id };
              writeOperations.push({ replaceOne: { filter: { _id: input.id }, replacement, upsert: true } });
              processedResults.push({
                success: true,
                aborted: false,
                data: updatedData,
                input,
              });
            } else {
              writeOperations.push({ deleteOne: { filter: { _id: input.id } } });
              processedResults.push({
                success: true,
                aborted: false,
                data: null,
                input,
              });
            }
          } catch (error) {
            logError(`MongoAdapter:batchTransaction error: Error processing item ${input?.id}`, { input, error });
            processedResults.push({
              success: false,
              aborted: false,
              data: null,
              input,
            });
          }
        }

        if (!writeOperations?.length) return processedResults;

        return await this.collectionRef
          .bulkWrite(writeOperations, { ordered: false })
          .then(() => processedResults)
          .catch((error) => {
            logError(`MongoAdapter:batchTransaction error: Batch chunk error`, { error });
            return processedResults.map((result) => (result.aborted ? result : { success: false, aborted: false, data: null, input: result.input }));
          });
      });
      const concurrentBatchResults = await Promise.all(concurrentBatchPromises);
      const resultsToPush = concurrentBatchResults.flat();
      results.push(...resultsToPush);
    }

    return results;
  }

  deleteValue() {
    return "__WL_REMOVE__";
  }
  serverTimeValue() {
    return "__WL_SERVERTIMESTAMP__";
  }
  incrementValue(incrementBy = 1) {
    return { __WL_INCREMENT__: true, by: incrementBy };
  }

  newID() {
    const CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
    const ID_LENGTH = 20;
    let id = "";
    for (let i = 0; i < ID_LENGTH; i++) {
      id += CHARS.charAt(Math.floor(Math.random() * CHARS.length));
    }
    return id;
  }

  // mongo specific methods
  getMongoCollection() {
    return this.collectionRef;
  }
  getMongoDatabase() {
    return this.databaseInstance.db;
  }
  getMongoClient() {
    return this.databaseInstance.client;
  }

  // Internal helpers

  isMongoDeleteValue = (item) => item === "__WL_REMOVE__";

  isMongoServerTimestampValue = (item) => item === "__WL_SERVERTIMESTAMP__";

  isMongoIncrementValue = (item) => item?.__WL_INCREMENT__ === true;

  mongoQueryResponse(docList) {
    if (!docList?.length) return docList;
    for (const doc of docList) {
      if (doc?._id) _.unset(doc, "_id");
    }
    return docList;
  }
  mongoDocResponse(doc) {
    if (doc?._id) _.unset(doc, "_id");
    return doc;
  }

  buildMongoUpdate(updateObj) {
    const operations = {
      $set: {},
      $unset: {},
      $inc: {},
      $currentDate: {},
    };

    if (updateObj?._id) _.unset(updateObj, "_id");

    const processLevel = (obj, parentPath = "") => {
      const entries = Object.entries(obj);

      for (const [key, value] of entries) {
        const currentPath = parentPath ? `${parentPath}.${key}` : key;
        if (value == null || this.isMongoDeleteValue(value)) operations.$unset[currentPath] = "";
        else if (this.isMongoServerTimestampValue(value)) operations.$currentDate[currentPath] = true;
        else if (this.isMongoIncrementValue(value)) operations.$inc[currentPath] = value.by || 1;
        else if (typeof value === "object" && !(value instanceof Date)) processLevel(value, currentPath);
        else operations.$set[currentPath] = value;
      }
    };

    processLevel(updateObj);

    for (const [operator, values] of Object.entries(operations)) {
      if (Object.keys(values).length === 0) {
        delete operations[operator];
      }
    }

    return operations;
  }

  buildMongoFilter(queryParts) {
    return queryParts.reduce((acc, { key, operator, value }) => {
      if (!key) return acc;
      let newCondition = null;
      if (operator === "==") newCondition = { $eq: value };
      else if (operator === "!=") newCondition = { $ne: value };
      else if (operator === ">") newCondition = { $gt: value };
      else if (operator === "<") newCondition = { $lt: value };
      else if (operator === ">=") newCondition = { $gte: value };
      else if (operator === "<=") newCondition = { $lte: value };
      else if (operator === "in") newCondition = { $in: value };

      if (!newCondition) return acc;
      acc[key] = acc[key] ? { ...acc[key], ...newCondition } : newCondition;
      return acc;
    }, {});
  }

  checkMatchesFilter(queryParts, doc) {
    if (!doc) return false;
    if (!queryParts.length) return true;
    for (const { key, operator, value } of queryParts) {
      const docValue = _.get(doc, key);
      if (operator === "==" && docValue !== value) return false;
      if (operator === "!=" && docValue === value) return false;
      if (operator === ">=" && docValue < value) return false;
      if (operator === "<=" && docValue > value) return false;
      if (operator === ">" && docValue <= value) return false;
      if (operator === "<" && docValue >= value) return false;
      if (operator === "in" && !value.includes(docValue)) return false;
    }
    return true;
  }
}
exports.MongoAdapter = MongoAdapter;
