import { ProgressCounter } from './ProgressCounter.js';
import {
  BucketChecksum,
  BucketRequest,
  Checkpoint,
  CrudRequest,
  CrudResponse,
  isStreamingKeepalive,
  isStreamingSyncCheckpoint,
  isStreamingSyncCheckpointComplete,
  isStreamingSyncCheckpointDiff,
  isStreamingSyncData,
  StreamingSyncLine,
  StreamingSyncRequest,
  SyncRequest,
  SyncResponse
} from './types.js';
import { BucketStorageAdapter } from './SyncAdapter2.js';
import { Remote } from './Remote.js';
import { ndjsonStream } from './ndjson-stream';

export class StreamingSyncImplementation {
  constructor(private adapter: BucketStorageAdapter, private remote: Remote) {}

  async hasCompletedSync() {
    return this.adapter.hasCompletedSync();
  }

  async fullSync() {
    await navigator.locks.request('crud', async (lock) => {
      while (true) {
        const uploaded = await this.uploadCrud();

        if (!uploaded) {
          break;
        }
      }
    });
  }

  async uploadCrud() {
    const data = await this.adapter.getCrudBatch();
    if (data == null) {
      return false;
    }
    const res: CrudResponse = (await this.remote.post('crud.json', { data: data.crud } as CrudRequest)).data;

    await data.complete({ last_op_id: res.checkpoint });
    return true;
  }

  async streamingSync(signal: AbortSignal, progress?: () => void): Promise<{ retry?: boolean }> {
    return await navigator.locks.request('sync', { signal: signal }, async () => {
      this.adapter.startSession();
      const bucketEntries = await this.adapter.getBucketStates();
      let initialBuckets = new Map<string, string>();

      for (let entry of bucketEntries) {
        initialBuckets.set(entry.bucket, entry.op_id);
      }

      let req: BucketRequest[] = [];
      for (let [bucket, after] of initialBuckets.entries()) {
        req.push({ name: bucket, after: after });
      }

      let lastSavePromise: Promise<void> | null = null;

      let targetCheckpoint: Checkpoint | null = null;
      let validatedCheckpoint: Checkpoint | null = null;

      let bucketSet = new Set<string>(initialBuckets.keys());

      for await (let line of this.streamingSyncRequest(
        {
          buckets: req,
          include_checksum: true
        },
        signal
      )) {
        console.debug('stream', line);
        if (isStreamingSyncCheckpoint(line)) {
          targetCheckpoint = line.checkpoint;
          const bucketsToDelete = new Set<string>(bucketSet);
          const newBuckets = new Set<string>();
          for (let checksum of line.checkpoint.buckets) {
            newBuckets.add(checksum.bucket);
            bucketsToDelete.delete(checksum.bucket);
          }
          if (bucketsToDelete.size > 0) {
            console.debug('Remove buckets', [...bucketsToDelete]);
          }
          bucketSet = newBuckets;
          await this.adapter.removeBuckets([...bucketsToDelete]);
          await this.adapter.setTargetCheckpoint(targetCheckpoint);
        } else if (isStreamingSyncCheckpointComplete(line)) {
          await this.adapter.waitUntilPersisted();
          const result = await this.adapter.syncLocalDatabase(targetCheckpoint);
          if (!result.checkpointValid) {
            // This means checksums failed. Start again with a new checkpoint.
            // TODO: better back-off
            await new Promise((resolve) => setTimeout(resolve, 50));
            return { retry: true };
          } else if (!result.ready) {
            // Checksums valid, but need more data for a consistent checkpoint.
            // Continue waiting.
          }

          validatedCheckpoint = targetCheckpoint;
        } else if (isStreamingSyncCheckpointDiff(line)) {
          // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
          if (targetCheckpoint == null) {
            throw new Error('Checkpoint diff without previous checkpoint');
          }
          const diff = line.checkpoint_diff;
          const newBuckets = new Map<string, BucketChecksum>();
          for (let checksum of targetCheckpoint.buckets) {
            newBuckets.set(checksum.bucket, checksum);
          }
          for (let checksum of diff.updated_buckets) {
            newBuckets.set(checksum.bucket, checksum);
          }
          for (let bucket of diff.removed_buckets) {
            newBuckets.delete(bucket);
          }

          const newCheckpoint: Checkpoint = {
            last_op_id: diff.last_op_id,
            buckets: [...newBuckets.values()]
          };
          targetCheckpoint = newCheckpoint;

          bucketSet = new Set<string>(newBuckets.keys());

          const bucketsToDelete = diff.removed_buckets;
          if (bucketsToDelete.length > 0) {
            console.debug('Remove buckets', bucketsToDelete);
          }
          await this.adapter.removeBuckets(bucketsToDelete);
          await this.adapter.setTargetCheckpoint(targetCheckpoint);
        } else if (isStreamingSyncData(line)) {
          const { data } = line;
          // We don't want to build up too much of a queue if the network is faster than what we can save
          await lastSavePromise;
          lastSavePromise = this.adapter.saveSyncData({ buckets: [data] });
          // waitUntilPersisted() covers this:
          //   await lastSavePromise;
        } else if (isStreamingKeepalive(line)) {
          const remaining_seconds = line.token_expires_in;
          if (remaining_seconds == 0) {
            // Connection would be closed automatically right after this
            console.debug('Token expiring; reconnect');
            return { retry: true };
          }
        }
        progress?.();
      }
      console.debug('stream done');
      // Connection closed. Likely due to auth issue.
      return { retry: true };
    });
  }

  async *streamingSyncRequest(req: StreamingSyncRequest, signal: AbortSignal): AsyncGenerator<StreamingSyncLine> {
    const body = await this.remote.postStreaming('/sync/stream', req, signal);
    const stream = ndjsonStream(body);
    const reader = stream.getReader();

    try {
      while (true) {
        // Read from the stream
        const { done, value } = await reader.read();
        // Exit if we're done
        if (done) return;
        // Else yield the chunk
        yield value;
      }
    } finally {
      reader.releaseLock();
    }
  }
}
