import { ReplaySubject, Subscription } from 'rxjs';

import { get, keyBy, set } from 'lodash';
import { SyncEvent, SyncEventStorage, SyncStorage } from './sync.model';
import { SyncService } from './sync.service';
import { mergeCopyArrays } from '@weavix/utils/src/merge-copy-arrays';
import { mongoDbMax } from '@weavix/utils/src/mongodb-max';
import { MetricAggregator } from './metric-aggregator';
import { TelemetryEvent } from '@weavix/models/src/telemetry';

export interface DeduplicationTelemetry{
    topic: string,
    diffMs: number,
    winnerNetwork: string,
    loserNetwork: string,
    winnerNetworkType: 'mobile' | 'wifi',
    loserNetworkType: 'mobile' | 'wifi'
}

export class SyncPartition<T extends { id?: string }> {
    loaded$ = new ReplaySubject<boolean>(1);
    reset$ = new ReplaySubject<boolean>(1);

    private subscription?: Subscription;
    private closed = false;
    private updating: SyncEvent<T>[] = [];
    private applying = false;
    private simulcastedEventCache :{[key: string]: {date:string, networkId:number, networkType:string, id:string, networkName?:string}} = {};
    private metricAggregator = new MetricAggregator<DeduplicationTelemetry>();
    private missingSyncEventAggregator = new MetricAggregator<{missingNetwork: 'mobile' | 'wifi'}>();
    private telemetryInterval;

    constructor(
        private syncService: SyncService,
        private storage: SyncStorage<T>,
        public topic: string,
        public partition: string,
        private lazy: boolean,
        private simulcast = false,
        private reinitialize = false,
        private eventStorage?: SyncEventStorage,
    ) {
        this.subscribeAndGet().catch(e => console.error(e));
        if (this.simulcast) {
            this.telemetryInterval = setInterval(() => {
                console.log(`[Sync Partition] checking for deduplication telemetry...`);
                if(this.metricAggregator.dataPointCount > 0){
                    this.syncService.sendTelemetry(TelemetryEvent.SyncDeduplicate, {
                        date: new Date(),
                        eventCount: this.metricAggregator.dataPointCount,
                        minDiffMs: this.metricAggregator.minOf('diffMs'),
                        maxDiffMs: this.metricAggregator.maxOf('diffMs'),
                        avgDiffMs: this.metricAggregator.avgOf('diffMs'),
                        wifiWins: this.metricAggregator.countOf('winnerNetworkType', 'wifi'),
                        mobileWins: this.metricAggregator.countOf('winnerNetworkType', 'mobile'),
                        winnerNetworks: this.metricAggregator.countOf('winnerNetwork'),
                        loserNetworks: this.metricAggregator.countOf('loserNetwork'),
                        topic: this.topic,
                    });
                    this.metricAggregator.clear();
                }

                if(this.missingSyncEventAggregator.dataPointCount > 0){
                    this.syncService.sendTelemetry(TelemetryEvent.SyncEventMissing, {
                        date: new Date(),
                        eventCount: this.missingSyncEventAggregator.dataPointCount,
                        topic: this.topic,
                        missingWifi: this.missingSyncEventAggregator.countOf('missingNetwork', 'wifi'),
                        missingMobile: this.missingSyncEventAggregator.countOf('missingNetwork', 'mobile'),
                    });
                    this.missingSyncEventAggregator.clear();
                }

                const pruned = Object.values(this.simulcastedEventCache).map(x => {
                    if (new Date().getTime() - new Date(x.date).getTime() > 120_000){
                        this.missingSyncEventAggregator.accumulate({
                            missingNetwork: x.networkType === 'wifi' ? 'mobile' : 'wifi'
                        });
                        return;
                    }
                    return x;
                }).filter(x => x);

                this.simulcastedEventCache = keyBy(pruned, 'id');
                console.log('[Sync Partition] simulcast event cache pruned.  current size:', pruned.length);
            }, 60_000 * 10);
        }
        
    }

    async close(reset: boolean) {
        if (this.closed) return;
        this.closed = true;

        console.log(`Clearing sync partition ${this.partition}`);
        this.unsubscribe();
        this.loaded$.next(true);

        if (reset) await this.eventStorage?.clear(this.partition);
    }

    async subscribeAndGet() {
        const date = this.reinitialize ? null : await this.eventStorage?.getDate(this.partition);
        let queue: SyncEvent<T>[] | null = [];

        console.log(`Initializing sync partition ${this.topic} from date ${date}`);
        await this.storage.dump(this.partition);

        const subject = await this.syncService.subscribe<SyncEvent<T>[]>(this.topic, () => this.closed, this.lazy, this.simulcast);
        this.subscription = subject.subscribe(
            async val => {
                const newPayload = val.payload.map(p => ({ ...p, networkId: val.networkId, networkType: val.networkType}));
                if (queue) queue.push(...newPayload);
                else await this.queueUpdate(newPayload, false);
            },
            () => this.subscribeAndGet().catch(e => console.error(e)),
        );

        const data = await this.syncService.get<SyncEvent<T>[]>(
            `/sync/${this.topic}`,
            date ? { date } : {},
            () => this.closed,
            this.lazy
        );
        this.syncService.reportMetric(data.length, JSON.stringify(data).length);

        await this.queueUpdate(data, true);
        const remaining = queue;
        queue = null;
        await this.queueUpdate(remaining, true);

        await this.storage.addPartition(this.partition);
        this.loaded$.next(true);
    }

    async flush() {
        await this.storage.flush();
    }

    private unsubscribe() {
        if (this.subscription) {
            this.subscription.unsubscribe();
            this.subscription = null;
            this.telemetryInterval?.clearInterval();
        }
    }

    protected async queueUpdate(publish: SyncEvent<T>[], init: boolean) {
        if (init && publish.length) console.log(`Applying ${publish.length} events to ${this.topic}`);
        this.updating.push(...publish);
        if (this.applying) return;
        this.applying = true;
        const start = new Date().getTime();
        while (this.updating.length && !this.closed) {
            const event = this.updating.shift() as SyncEvent<T>;
            const result = this.applyUpdate(event, init);
            // Doing large amounts of promises is slow in react native... has to do with timeouts and stuff
            // So we only await when we need to (most stuff in lazy storage needs loaded once then is fine)
            if ((result as Promise<any>)?.then) await result;
            this.eventStorage?.add(this.partition, event.date, event.syncId);
        }
        if (init && publish.length) console.log(`Done applying updates ${this.topic} in ${new Date().getTime() - start} ms sync time ${publish[publish.length - 1].date}`);
        this.applying = false;
    }

    protected applyUpdate(publish: SyncEvent<T>, init: boolean) {
        if (publish.syncId && this.eventStorage?.isDuplicate(publish.date, publish.syncId)) {
            if (!init) {
                console.log(`Sync ${publish.syncId} ignored due to duplication for ${this.topic}`);
                const winner = this.simulcastedEventCache[publish.syncId];
                const network = this.syncService.getCurrentNetworks()[publish.networkId];
                if (winner) {
                    const winnerDate = new Date(winner.date);
                    const loserDate = new Date();
                    this.metricAggregator.accumulate({
                        topic: this.topic,
                        diffMs: loserDate.getTime() - winnerDate.getTime(),
                        winnerNetwork: winner.networkName,
                        loserNetwork: network?.carrier ?? network?.ssid,
                        winnerNetworkType: winner.networkType as 'mobile' | 'wifi',
                        loserNetworkType: publish.networkType as 'mobile' | 'wifi',
                    });
                    delete this.simulcastedEventCache[publish.syncId];
                }
            }
            return;
        }
        if (this.simulcast && !init && Object.keys(this.syncService.getCurrentNetworks()).length > 1) {
            const network = this.syncService.getCurrentNetworks()[publish.networkId];
            this.simulcastedEventCache[publish.syncId] = {
                date: new Date().toISOString(),
                networkId: publish.networkId,
                networkType: publish.networkType,
                id: publish.syncId,
                networkName: network?.carrier ?? network?.ssid
            };
            console.log(`Sync ${publish.syncId} added to cache for ${this.topic}`);
        } 
        switch (publish.type) {
            case 'reset':
                console.log(`Sync reset event for ${this.topic}`);
                return this.reset();
            case 'clear':
                console.log(`Sync clear event for ${this.topic} of size ${publish.payload?.length}`);
                return this.eventStorage?.clear(this.partition, publish.payload ?? []);
            case 'insert':
                if (!init) console.log(`Sync insert event for ${this.partition} id ${publish.payload.id}`);
                return this.storage.add(publish.payload as T, this.partition);
            case 'delete':
            case 'update':
            case 'pull':
            case 'push':
                if (!init) console.log(`Sync ${publish.type} event for ${this.partition} from ${this.topic} id ${publish.id}`);

                const getKeys = () => Object.keys((publish as any).payload ?? {});
                switch (publish.type) {
                    case 'delete':
                        return this.storage.remove(publish.id, this.partition);
                    case 'update':
                        if (publish.inc || publish.set || publish.max) {
                            const incSet = record => {
                                if (publish.inc) {
                                    Object.keys(publish.inc).forEach(field => {
                                        const current = get(record, field) ?? 0;
                                        const incValue = publish.inc[field];
                                        const newValue = current + incValue;
                                        set(record, field, newValue);
                                    });
                                }
                                if (publish.max) {
                                    Object.keys(publish.max).forEach(field => {
                                        const current = get(record, field) ?? 0;
                                        const maxValue = publish.max[field];
                                        const newValue = mongoDbMax(current, maxValue);
                                        set(record, field, newValue);
                                    });
                                }
                                if (publish.set) {
                                    Object.keys(publish.set).forEach(field => {
                                        set(record, field, publish.set[field]);
                                    });
                                }
                                return record;
                            }
                            return this.storage.update(publish.id, incSet, getKeys, this.partition);
                        } else {
                            return this.storage.update(publish.id, record => mergeCopyArrays(record, publish.payload), getKeys, this.partition);
                        }
                    case 'pull':
                        const pull = record => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field);
                                if (array) {
                                    const filter = publish.payload[field as keyof T];
                                    for (let i = 0; i < array.length; i++) {
                                        if (typeof filter === 'object' && Object.keys(filter).every(key => array[i][key] === (filter as any)[key])
                                            || typeof filter !== 'object' && filter === array[i]) {
                                            array.splice(i--, 1);
                                        }
                                    }
                                }
                            });
                            return record;
                        }
                        return this.storage.update(publish.id, pull, getKeys, this.partition);
                    case 'push':
                        const push = record => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field);
                                if (!array) set(record as T, field, [publish.payload[field as keyof T]]);
                                else array.push(publish.payload[field as keyof T]);
                            });
                            return record;
                        }
                        return this.storage.update(publish.id, push, getKeys, this.partition);
                }
            default:
                console.warn(`${(publish as any).type} sync event not recognized`);
                return;
        }
    }

    private async reset() {
        await this.close(false);
        this.reset$.next(true);
    }
}
