// noinspection SqlResolve

import type { Sqlite } from '../sqlite/SqliteEngineV1.js';
import { Mutex } from '../sqlite/Mutex.js';
import { BucketChecksum, BucketState, Checkpoint, OplogEntry, SyncDataBatch } from './SyncAdapter2.js';
import { CrudBatchEvent } from './SyncAdapter.js';
import { v4 } from 'uuid';
import { createViewStatement, createViewTriggerStatements, Schema } from './schemaLogic.js';
import { CrudEntry, OpId } from './types.js';

const INVALID_SQLITE_CHARS = /"'%,\.#\s/;
const COMPACT_OPERATION_INTERVAL = 1_000;

export class SqliteBucketStorage {
  private mutex = new Mutex();
  public tableNames = new Set<string>();
  private pendingBucketDeletes: boolean = true;
  private _hasCompletedSync = false;

  /**
   * Count up, and do a compact on startup.
   */
  private compactCounter = COMPACT_OPERATION_INTERVAL;

  constructor(private db: Sqlite.Database) {}

  async init() {
    this._hasCompletedSync = false;
    const existingTableRows = await this.db.all(
      `SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'objects__*'`
    );
    for (let row of existingTableRows) {
      this.tableNames.add(row.name);
    }
  }

  /**
   * Reset any caches.
   */
  startSession(): void {
    this.checksumCache = null;
  }

  canQueryType(type: string) {
    return this.tableNames.has(this.getTableName(type));
  }

  /**
   * Get a table name for a specific type. The table may or may not exist.
   *
   * The table name must always be enclosed in "quotes" when using inside a SQL query.
   *
   * @param type
   */
  getTableName(type: string) {
    // Test for invalid characters rather than escaping.
    if (INVALID_SQLITE_CHARS.test(type)) {
      throw new Error(`Invalid characters in type name: ${JSON.stringify(type)}`);
    }
    return `objects__${type}`;
  }

  async getBucketStates(): Promise<BucketState[]> {
    const rows = await this.db.all(
      'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM buckets WHERE pending_delete = 0'
    );
    return rows;
  }

  async saveSyncData(batch: SyncDataBatch) {
    await this.mutex.sharedLock(async () => {
      const saveStart = Date.now();
      let count = 0;

      // Splitting the save into smaller batches avoids having the database locked for the entire time.
      // This allows other operations to be interleaved, e.g. getting CRUD data.

      const BATCH_SIZE = 3000;
      let batchRemaining = BATCH_SIZE;
      let ops: Sqlite.Query[] = [];
      for (let b of batch.buckets) {
        let { bucket, data } = b;
        count += data.length;
        let start = 0;
        while (start < data.length) {
          const subset = data.slice(start, start + batchRemaining);
          batchRemaining -= subset.length;
          start += subset.length;
          const isFinal = !b.has_more && start >= data.length;
          ops = ops.concat(this.updateBucketOps(bucket, subset, isFinal));

          if (batchRemaining == 0) {
            await this.db.runBatch(ops);
            ops = [];
            batchRemaining = BATCH_SIZE;
          }
        }
      }
      if (ops.length > 0) {
        await this.db.runBatch(ops);
      }
      const duration = Date.now() - saveStart;
      this.compactCounter += count;
    });
  }

  async removeBuckets(buckets: string[]): Promise<void> {
    await this.mutex.sharedLock(async () => {
      for (let bucket of buckets) {
        await this.deleteBucket(bucket);
      }
    });
  }

  /**
   * Set a target checkpoint.
   */
  async setTargetCheckpoint(checkpoint: Checkpoint) {
    // No-op for now
  }

  async waitUntilPersisted() {
    await this.mutex.exclusiveLock(async () => {});
  }

  /**
   * Mark a bucket for deletion.
   *
   * @param bucket
   */
  private async deleteBucket(bucket: string) {
    // Delete a bucket, but allow it to be re-created.
    // To achieve this, we rename the bucket to a new temp name, and change all ops to remove.
    // By itself, this new bucket would ensure that the previous objects are deleted if they contain no more references.
    // If the old bucket is re-added, this new bucket would have no effect.
    const newName = `$delete_${bucket}_${v4()}`;
    await this.db.runBatch([
      // Change PUT -> REMOVE
      [`UPDATE oplog SET op='REMOVE', data=NULL WHERE op='PUT' AND superseded=0 AND bucket=?`, bucket],
      // Rename bucket
      [`UPDATE oplog SET bucket=? WHERE bucket=?`, newName, bucket],
      [`DELETE FROM buckets WHERE name = ?`, bucket],
      [
        `INSERT INTO buckets(name, pending_delete, last_op) SELECT ?, 1, IFNULL(MAX(op_id), 0) FROM oplog WHERE bucket = ?`,
        newName,
        newName
      ]
    ]);
    this.pendingBucketDeletes = true;
  }

  private updateBucketOps(bucket: string, data: OplogEntry[], finalBucketUpdate: boolean) {
    if (data.length == 0) {
      return [];
    }

    let ops: Sqlite.Query[] = [];
    let clearOps: Sqlite.Query[] = [];
    let allEntries: { type: string; id: string }[] = [];

    let inserts: any[] = [];
    let lastInsert = new Map<string, any>();

    let first_op: string | null = null;
    let last_op: string | null = null;
    let target_op: bigint | null = null;

    for (let op of data) {
      last_op = op.op_id;
      if (first_op == null) {
        first_op = op.op_id;
      }

      const insert = {
        op_id: op.op_id,
        op: op.op,
        bucket: bucket,
        object_type: op.object_type,
        object_id: op.object_id,
        data: op.data,
        checksum: op.checksum,
        superseded: 0
      };

      if (op.op == 'MOVE') {
        insert.superseded = 1;
      }

      if (op.op == 'PUT' || op.op == 'REMOVE' || op.op == 'MOVE') {
        inserts.push(insert);
      }

      if (op.op == 'PUT' || op.op == 'REMOVE') {
        const key = op.object_type + '/' + op.object_id;
        const prev = lastInsert.get(key);
        if (prev) {
          prev.superseded = 1;
        }
        lastInsert.set(key, insert);
        allEntries.push({ type: op.object_type, id: op.object_id });
      } else if (op.op == 'MOVE') {
        const target = op.data?.target as string | undefined;
        if (target) {
          const l = BigInt(target);
          if (target_op == null || l < target_op) {
            target_op = l;
          }
        }
      } else if (op.op == 'CLEAR') {
        // Any remaining PUT operations should get an implicit REMOVE.
        clearOps.push([
          `UPDATE oplog SET op='REMOVE', data=NULL, hash=0 WHERE (op='PUT' OR op='REMOVE') AND bucket=? AND op_id <= ?`,
          bucket,
          op.op_id
        ]);
        // And we need to re-apply all of those.
        // We also replace the checksum with the checksum of the CLEAR op.
        clearOps.push([`UPDATE buckets SET last_applied_op = 0, add_checksum = ? WHERE name = ?`, op.checksum, bucket]);
      }
    }

    // Mark old ops as superseded

    ops.push([
      `
        UPDATE oplog
        SET superseded = 1,
            op         = 'MOVE',
            data       = NULL
        WHERE oplog.superseded = 0
            AND unlikely(oplog.bucket = ?)
            AND (oplog.object_type, oplog.object_id) IN (
                SELECT json_extract(json_each.value, '$.type'), json_extract(json_each.value, '$.id')
                    FROM json_each(?)
                )
      `,
      bucket,
      JSON.stringify(allEntries)
    ]);

    ops.push([
      `INSERT INTO oplog(op_id, op, bucket, object_type, object_id, data, hash, superseded)
             SELECT json_extract(json_each.value, '$.op_id'),
                    json_extract(json_each.value, '$.op'),
                    json_extract(json_each.value, '$.bucket'),
                    json_extract(json_each.value, '$.object_type'),
                    json_extract(json_each.value, '$.object_id'),
                    json_extract(json_each.value, '$.data'),
                    json_extract(json_each.value, '$.checksum'),
                    json_extract(json_each.value, '$.superseded')
             FROM json_each(?)
  `,
      JSON.stringify(inserts)
    ]);

    ops.push([`INSERT OR IGNORE INTO buckets(name) VALUES(?)`, bucket]);

    if (last_op) {
      ops.push([`UPDATE buckets SET last_op = ? WHERE name = ?`, last_op, bucket]);
    }
    if (target_op) {
      ops.push([
        `UPDATE buckets SET target_op = MAX(?, buckets.target_op) WHERE name = ?`,
        target_op.toString(10),
        bucket
      ]);
    }

    ops.push(...clearOps);

    // Compact superseded ops immediately, but only _after_ clearing
    if (first_op && last_op) {
      ops.push([
        `UPDATE buckets
         SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
                                            FROM oplog
                                            WHERE superseded = 1
                                              AND oplog.bucket = ?
                                              AND oplog.op_id >= ?
                                              AND oplog.op_id <= ?)
         WHERE buckets.name = ?`,
        bucket,
        first_op,
        last_op,
        bucket
      ]);
      ops.push([
        `DELETE
         FROM oplog
         WHERE superseded = 1
           AND bucket = ?
           AND op_id >= ?
           AND op_id <= ?`,
        bucket,
        first_op,
        last_op
      ]);
    }

    return ops;
  }

  async hasCompletedSync() {
    if (this._hasCompletedSync) {
      return true;
    }
    const r = await this.db.get(`SELECT name, last_applied_op FROM buckets WHERE last_applied_op > 0 LIMIT 1`);
    const completed = r != null;
    if (completed) {
      this._hasCompletedSync = true;
    }
    return completed;
  }

  async syncLocalDatabase(
    checkpoint: Checkpoint
  ): Promise<{ ready: boolean; checkpointValid: boolean; failures?: any[] }> {
    console.debug('validate checksums', checkpoint);
    const r = await this.validateChecksums(checkpoint);
    console.debug('validated checksums', checkpoint);
    if (!r.valid) {
      console.error('Checksums failed for', r.failures);
      for (let b of r.failures) {
        await this.deleteBucket(b.bucket);
      }
      return { ready: false, checkpointValid: false, failures: r.failures };
    }
    const buckets = checkpoint.buckets.map((b) => b.bucket).concat('$local');
    await this.db.run(
      `UPDATE buckets SET last_op = max(last_op, ?) WHERE name IN (SELECT json_each.value FROM json_each(?))`,
      checkpoint.last_op_id,
      JSON.stringify(buckets)
    );

    const { valid } = await this.updateObjectsFromBuckets(checkpoint);
    if (!valid) {
      console.debug('Not at a consistent checkpoint - cannot update local db');
      return { ready: false, checkpointValid: true };
    }
    // await this.autoCompact();
    await this.forceCompact();

    return {
      ready: true,
      checkpointValid: true
    };
  }

  private async canUpdateLocal(tx: Sqlite.WriteContext) {
    const invalid_buckets = await tx.all(
      `SELECT name, target_op, last_op, last_applied_op FROM buckets WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)`
    );
    if (invalid_buckets.length > 0) {
      console.debug('Do not have complete data for', invalid_buckets);
      return false;
    }
    // This is specifically relevant for when data is added to crud before another batch is completed.
    const row = await tx.get('SELECT 1 FROM crud LIMIT 1');
    if (row != null) {
      console.warn('Crud is pending, cannot sync');
      return false;
    }
    return true;
  }

  /**
   * Atomically update the local state to the current checkpoint.
   *
   * This includes creating new tables, dropping old tables, and copying data over from the oplog.
   */
  private async updateObjectsFromBuckets(checkpoint: Checkpoint) {
    return await this.db.writeLock(async (tx) => {
      const start = Date.now();
      if (!(await this.canUpdateLocal(tx))) {
        return { valid: false };
      }
      const { changes: before } = await tx.get('SELECT total_changes() as changes');

      // Updated objects
      // TODO: Reduce memory usage
      // Some options here:
      // 1. Create a VIEW objects_updates, which contains triggers to update individual tables.
      //    This works well for individual tables, but difficult to have a catch all for untyped data,
      //    and performance degrades when we have hundreds of object types.
      // 2. Similar, but use a TEMP TABLE instead. We can then query those from JS, and populate the tables from JS.
      // 3. Skip the temp table, and query the data directly. Sorting and limiting becomes tricky.
      // 3a. LIMIT on the first oplog step. This prevents us from using JOIN after this.
      // 3b. LIMIT after the second oplog query

      // QUERY PLAN
      // |--SCAN buckets
      // |--SEARCH b USING INDEX sqlite_autoindex_oplog_1 (bucket=? AND op_id>?)
      // |--SEARCH r USING INDEX oplog_by_object (object_type=? AND object_id=?)
      // `--USE TEMP B-TREE FOR GROUP BY
      // language=SQLite
      await tx.run(
        `INSERT OR REPLACE INTO objects_updates(type, id, data, buckets, op_id)
        -- 3. Group the objects from different buckets together into a single one (ops).
         SELECT r.object_type as type,
                r.object_id as id,
                r.data as data,
                json_group_array(r.bucket) FILTER (WHERE r.op='PUT') as buckets,
                /* max() affects which row is used for 'data' */
                max(r.op_id) FILTER (WHERE r.op='PUT') as op_id
         -- 1. Filter oplog by the ops added but not applied yet (oplog b).
         FROM buckets
                CROSS JOIN oplog b ON b.bucket = buckets.name
              AND (b.op_id > buckets.last_applied_op)
                -- 2. Find *all* current ops over different buckets for those objects (oplog r).
                INNER JOIN oplog r
                           ON r.object_type = b.object_type
                             AND r.object_id = b.object_id
         WHERE r.superseded = 0
           AND b.superseded = 0
         -- Group for (3)
         GROUP BY r.object_type, r.object_id
        `
      );

      while (true) {
        const rows = await tx.all(
          'SELECT type, id, data, buckets, CAST(op_id AS TEXT) as op_id FROM objects_updates ORDER BY op_id LIMIT 1000'
        );
        if (rows.length == 0) {
          break;
        }

        const ops = this.saveOps(rows);

        let max_id = rows[rows.length - 1].op_id;
        ops.push([`DELETE FROM objects_updates WHERE op_id <= ?`, max_id]);
        await tx.runBatch(ops);
      }

      await tx.run(`UPDATE buckets
                SET last_applied_op = last_op
                WHERE last_applied_op != last_op`);

      const { changes: after } = await tx.get('SELECT total_changes() as changes');
      console.debug(`Synced local database. ${after - before} records updated in ${Date.now() - start}ms.`);

      return {
        valid: true
      };
    });
  }

  saveOps(rows: { type: string; id: string; data: string; buckets: string; op_id: string }[]): Sqlite.Query[] {
    let byType = new Map<string, any[]>();
    for (let row of rows) {
      if (!byType.has(row.type)) {
        byType.set(row.type, []);
      }
      byType.get(row.type).push(row);
    }

    let ops: Sqlite.Query[] = [];
    for (let type of byType.keys()) {
      const typeRows = byType.get(type);
      const table = this.getTableName(type);

      // Note that "PUT" and "DELETE" are split, and not applied in row order.
      // So we only do either PUT or DELETE for each individual object, not both.
      const removeIds = new Set<string>();
      let puts: any[] = [];
      for (let row of typeRows) {
        if (row.buckets == '[]') {
          removeIds.add(row.id);
        } else {
          puts.push(row);
          removeIds.delete(row.id);
        }
      }
      puts = puts.filter((update) => !removeIds.has(update.id));

      if (this.tableNames.has(table)) {
        ops.push([
          `REPLACE INTO "${table}"(id, data)
               SELECT json_extract(json_each.value, '$.id'),
                      json_extract(json_each.value, '$.data')
               FROM json_each(?)`,
          JSON.stringify(puts)
        ]);

        ops.push([
          `DELETE
               FROM "${table}"
               WHERE id IN (SELECT json_each.value FROM json_each(?))`,
          JSON.stringify([...removeIds])
        ]);
      } else {
        ops.push([
          `REPLACE INTO objects_untyped(type, id, data)
               SELECT ?,
                      json_extract(json_each.value, '$.id'),
                      json_extract(json_each.value, '$.data')
               FROM json_each(?)`,
          type,
          JSON.stringify(puts)
        ]);

        ops.push([
          `DELETE FROM objects_untyped
               WHERE type = ?
               AND id IN (SELECT json_each.value FROM json_each(?))`,
          type,
          JSON.stringify([...removeIds])
        ]);
      }
    }
    return ops;
  }

  async updateSchema(schema: Schema) {
    await this.db.writeLock(async (tx) => {
      const types = schema.models.map((model) => model.name);
      const createTablesOps = this.createTablesAndTriggersOps(types);
      if (createTablesOps.updated) {
        this.tableNames = createTablesOps.tableNames;
        let ops = [...createTablesOps.ops];
        for (let model of schema.models) {
          ops.push(createViewStatement(model));
          ops.push(...createViewTriggerStatements(model));
        }
        await tx.runBatch(ops);
      } else {
        let ops = [];
        for (let model of schema.models) {
          ops.push(createViewStatement(model));
          ops.push(...createViewTriggerStatements(model));
        }
        await tx.runBatch(ops);
      }
    });
  }

  /**
   * Create and populate tables.
   * @param types
   */
  private createTablesAndTriggersOps(types: string[]) {
    const remainingTables = new Set(this.tableNames);
    const updatedTableList = new Set<string>();
    const addedTypes: string[] = [];

    let createOps: any[] = [];
    for (let type of types) {
      const tableName = this.getTableName(type);
      updatedTableList.add(tableName);
      const exists = remainingTables.has(tableName);
      remainingTables.delete(tableName);
      if (exists) {
        continue;
      }
      addedTypes.push(type);

      createOps.push(`CREATE TABLE "${tableName}"
                      (
                        id   TEXT,
                        data TEXT,
                        PRIMARY KEY (id)
                      )`);
      createOps.push([
        `INSERT INTO "${tableName}"(id, data)
         SELECT id, data
         FROM objects_untyped
         WHERE type = ?`,
        type
      ]);
      createOps.push([
        `DELETE
                       FROM objects_untyped
                       WHERE type = ?`,
        type
      ]);
    }

    for (let tableName of remainingTables) {
      const typeMatch = /^objects__(.+)$/.exec(tableName);
      if (typeMatch == null) {
        continue;
      }
      const type = typeMatch[1];
      createOps.push(`INSERT INTO objects_untyped(type, id, data) SELECT ?, id, data FROM "${tableName}"`, type);
      createOps.push(`DROP TABLE "${tableName}"`);
    }

    return {
      updated: addedTypes.length > 0,
      ops: createOps,
      addedTypes: addedTypes,
      tableNames: updatedTableList
    };
  }

  private checksumCache: {
    checksums: Map<string, { checksum: BucketChecksum; last_op_id: OpId }>;
    lastOpId: OpId;
  } | null = null;

  async validateChecksums(checkpoint: Checkpoint) {
    // There may be cases where the oplog is empty for a bucket, but there is a bucket with add_checksum value.
    // To correctly cater for that, it is important that the conditions are in the correct places in this query
    // for the LEFT OUTER JOIN to function, instead of returning no results for that case.
    const rows = await this.db.all(
      `WITH
     bucket_list(bucket, lower_op_id) AS (
         SELECT
                json_extract(json_each.value, '$.bucket') as bucket,
                json_extract(json_each.value, '$.last_op_id') as lower_op_id
         FROM json_each(?)
         )
      SELECT
         buckets.name as bucket,
         buckets.add_checksum as add_checksum,
         IFNULL(SUM(oplog.hash), 0) as oplog_checksum,
         COUNT(oplog.op_id) as count,
         CAST(MAX(oplog.op_id) as TEXT) as last_op_id,
         CAST(buckets.last_applied_op as TEXT) as last_applied_op
       FROM bucket_list
         LEFT OUTER JOIN buckets ON
             buckets.name = bucket_list.bucket
         LEFT OUTER JOIN oplog ON
             bucket_list.bucket = oplog.bucket AND
             oplog.op_id <= ? AND oplog.op_id > bucket_list.lower_op_id
       GROUP BY bucket_list.bucket`,
      JSON.stringify(
        checkpoint.buckets.map((checksum) => {
          return {
            bucket: checksum.bucket,
            last_op_id: this.checksumCache?.checksums.get(checksum.bucket)?.last_op_id ?? '0'
          };
        })
      ),
      checkpoint.last_op_id
    );
    // TODO: Check if last_applied_op > lastOpId. If it is, we cannot safely use diffs, since compaction could have
    // affected it.

    const byBucket = new Map<string, { checksum: BucketChecksum; last_op_id: OpId }>();
    if (this.checksumCache) {
      const checksums = this.checksumCache.checksums;
      for (let row of rows) {
        if (BigInt(row.last_applied_op) > BigInt(this.checksumCache.lastOpId)) {
          throw new Error(`assertion failed: ${row.last_applied_op} > ${this.checksumCache.lastOpId}`);
        }
        let checksum: number;
        let last_op_id: OpId | null = row.last_op_id;
        if (checksums.has(row.bucket)) {
          // All rows may have been filtered out, in which case we use the previous one
          last_op_id ??= checksums.get(row.bucket).last_op_id;
          checksum = (checksums.get(row.bucket).checksum.checksum + row.oplog_checksum) & 0xffffffff;
        } else {
          checksum = (row.add_checksum + row.oplog_checksum) & 0xffffffff;
        }
        byBucket.set(row.bucket, {
          checksum: { bucket: row.bucket, checksum: checksum, count: row.count },
          last_op_id: last_op_id
        });
      }
    } else {
      for (let row of rows) {
        const checksum = (row.add_checksum + row.oplog_checksum) & 0xffffffff;
        byBucket.set(row.bucket, {
          checksum: { bucket: row.bucket, checksum: checksum, count: row.count },
          last_op_id: row.last_op_id
        });
      }
    }
    let failedChecksums: { bucket: string }[] = [];
    for (let checksum of checkpoint.buckets) {
      const local = byBucket.get(checksum.bucket)?.checksum ?? { bucket: checksum.bucket, count: 0, checksum: 0 };
      // Note: Count is informational only.
      if (local.checksum != checksum.checksum) {
        console.error(`Checksum failed for ${checksum.bucket}: local ${local.checksum} != remote ${checksum.checksum}`);
        console.log('cache', this.checksumCache);
        console.log('rows', rows);
        failedChecksums.push({
          bucket: checksum.bucket
        });
      }
    }
    if (failedChecksums.length == 0) {
      // FIXME: Checksum cache is broken add_checksum is modified.
      // this.checksumCache = {
      //   lastOpId: checkpoint.last_op_id,
      //   checksums: byBucket
      // };
      return { valid: true };
    } else {
      this.checksumCache = null;
      return { valid: false, failures: failedChecksums };
    }
  }

  private async deletePendingBuckets(tx: Sqlite.WriteContext) {
    if (this.pendingBucketDeletes !== false) {
      // Executed once after start-up, and again when there are pending deletes.
      await tx.runBatch([
        'DELETE FROM oplog WHERE bucket IN (SELECT name FROM buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)',
        'DELETE FROM buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op'
      ]);
      this.pendingBucketDeletes = false;
    }
  }

  private async clearRemoveOps(tx: Sqlite.WriteContext) {
    // TODO: Use persisted counters
    if (this.compactCounter < COMPACT_OPERATION_INTERVAL) {
      return;
    }

    const rows = await tx.all(
      'SELECT name, cast(last_applied_op as TEXT) as last_applied_op, cast(last_op as TEXT) as last_op FROM buckets WHERE pending_delete = 0'
    );
    for (let row of rows) {
      await tx.writeLock(async (tx) => {
        // Note: The row values here may be different from when queried. That should not be an issue.
        let ops: Sqlite.Query[] = [];
        ops.push([
          `UPDATE buckets
           SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
                                              FROM oplog
                                              WHERE (superseded = 1 OR op != 'PUT')
                                                AND oplog.bucket = ?
                                                AND oplog.op_id <= ?)
           WHERE buckets.name = ?`,
          row.name,
          row.last_applied_op,
          row.name
        ]);
        ops.push([
          `DELETE
           FROM oplog
           WHERE (superseded = 1 OR op != 'PUT')
             AND bucket = ?
             AND op_id <= ?`,
          // Must use the same values as above
          row.name,
          row.last_applied_op
        ]);
        await tx.runBatch(ops);
      });
    }
    this.compactCounter = 0;
  }

  async autoCompact() {
    const start = Date.now();
    // Note: Operations are only removed when they have already been applied.
    // This is catered for in the individual queries, by checking for last_applied_op.

    const { changes: before } = await this.db.get('SELECT total_changes() as changes');
    // 1. Delete buckets
    await this.deletePendingBuckets(this.db);

    // 2. Clear REMOVE operations, only keeping PUT ones
    await this.clearRemoveOps(this.db);

    const { changes: after } = await this.db.get('SELECT total_changes() as changes');
    const duration = Date.now() - start;
  }

  /**
   * Force a compact, for tests.
   *
   * @deprecated For tests only.
   */
  async forceCompact() {
    this.compactCounter = COMPACT_OPERATION_INTERVAL;
    this.pendingBucketDeletes = true;

    await this.autoCompact();
  }

  /**
   * Get a batch of objects to send to the server.
   * When the objects are successfully sent to the server, call .complete()
   */
  async getCrudBatch(limit?: number): Promise<CrudBatchEvent | null> {
    limit = limit ?? 100;

    // TODO: Return immediately if we know there is no data, so that we don't wait for a database lock.
    const row = await this.db.get('SELECT 1 FROM crud LIMIT 1');
    if (row == null) {
      return null;
    }

    const rows = await this.db.all('SELECT * FROM crud ORDER BY id ASC LIMIT ?', limit);
    let all: CrudEntry[] = [];
    for (let row of rows) {
      const data = JSON.parse(row.data);
      const id = row.id;
      all.push({ ...data, op_id: id });
    }
    if (all.length === 0) {
      return null;
    }
    const last = all[all.length - 1];
    return {
      crud: all,
      complete: async (options?: { last_op_id: string | null }) => {
        await this.db.runBatch([
          [`DELETE FROM crud WHERE id <= ?`, (last as any).op_id],
          [`UPDATE buckets SET target_op = ? WHERE name='$local'`, options?.last_op_id ?? '0']
        ]);
      },
      remaining: 1 // FIXME
    };
  }

  async objectOplogStats() {
    return await this.db.all(
      `SELECT object_type as type, object_id as id, count(*) as count FROM oplog GROUP BY object_type, object_id ORDER BY object_type, object_id`
    );
  }
}
