import { Observable, Observer, ReplaySubject, Subscription } from 'rxjs';
import { StorageService } from './storage.service';
import { CRUD_BASE } from './endpoints';
import { EventSourceMessage, EventStreamContentType, fetchEventSource } from '@microsoft/fetch-event-source';
import { BaseService } from './baseservice';
import { v4 as uuidv4 } from 'uuid';
import { inject, Injectable } from '@angular/core';
import { NetworkDebugService } from './debug.service';

interface PsiUnsubscribeNotifiable {
  notifyUnsubscribed(): void;
}

class RetriableError extends Error { }
class FatalError extends Error { }

export class PsiReplaySubject<T> extends ReplaySubject<T> implements PsiUnsubscribeNotifiable {

  private subscriptionsCount = 0;
  private seenFirstSubscribe = false;

  constructor(replays: number,
              private onClose: () => void) {
    super(replays);

    // la funzione onClose serve a comunicare al server che un determinato eventsId non ci
    // serve più e le relative risorse possono quindi essere liberate.
    // Viene chiamata quando tutte le subscribe a questo Observable sono state chiuse con
    // relativa unsubscribe, quindi il tutto accade automaticamente quando il component
    // che ha fatto la subscribe nella ngOnInit, fa la relativa unsubscribe nella ngOnDestroy.
  }

  override subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
    this.subscriptionsCount++;
    this.seenFirstSubscribe = true;
    const subs = super.subscribe(observerOrNext);
    new PsiSubscription(this, subs);
    return subs;
  }

  notifyUnsubscribed(): void {
    this.subscriptionsCount--;
    if (this.subscriptionsCount === 0 && this.seenFirstSubscribe) {
      this.complete();
      this.onClose();
    }
  }
}

class PsiSubscription {
  private originalUnsubscribe;

  constructor(private wrapper: PsiUnsubscribeNotifiable,
              wrapped: Subscription) {
    this.originalUnsubscribe = wrapped.unsubscribe.bind(wrapped);
    wrapped.unsubscribe = this.unsubscribe.bind(this);
  }

  unsubscribe(): void {
    this.originalUnsubscribe();
    this.wrapper.notifyUnsubscribed();
  }
}

interface SubscribedEndpoint {
  subscriptionId: string,
  endpoint: string
}

export interface UrlAndSubject<T> {
  url: string,
  sseUrl: string,
  subject: PsiReplaySubject<T>,
  componentId: string
}

interface ObservablesMap {
  [eventsId: string]: UrlAndSubject<any>;
}
interface WatchdogCallback {
  delayFactor: number;
  fn: () => any;
}

interface Watchdogs {
  [wdid: string]: WatchdogCallback;
}

interface WatchdogTimers {
  [wdid: string]: any; // risultato di setTimeout
}

@Injectable({
  providedIn: 'root'
})
export class SSE {

  private baseSvc = inject(BaseService);
  public dbgSvc = inject(NetworkDebugService);

  private sessionIdKey = "sseSessionId";
  private visibilityChangeHandler = undefined;

  public connection: Promise<void> = undefined;
  public abortController: AbortController = undefined;
  public observables: ObservablesMap = {};
  private watchdogTimers: WatchdogTimers = {};

  constructor() {
    this.visibilityChangeHandler = async () => {
      if (document.visibilityState == 'hidden') {
        console.log("La webapp è stata messa in background");
        await this.unsubscribeAll();
        this.closeConnection();
        this.disableWatchdogs();
      } else { // 'visible'
        console.log("La webapp è stata portata in foreground");
        await this.unsubscribeAll();
        await this.checkConnection();
        await this.resubscribeAll();
      }
    };
  }

  private setSessionId(sid: string) {
    StorageService.sSet(this.sessionIdKey, sid);
  }

  public getSessionId(): string | undefined {
    return StorageService.sGet(this.sessionIdKey);
  }

  public async forEachSubscription(callback: (url: string, eventsId: string, subj: PsiReplaySubject<any>, componentId: string) => any) {
    console.log("Endpoints SSE attualmente registrati:");
    let ol = 0;
    if (this.observables)
      for (const evid in this.observables) {
        if (Object.hasOwn(this.observables, evid)) {
          ol++;
          console.log(ol + ") " + evid + " => " + this.observables[evid].componentId + " su " + this.observables[evid].url);
          await callback(this.observables[evid].url, evid, this.observables[evid].subject, this.observables[evid].componentId);
        }
      }
    if (ol === 0) {
      console.log("nessuno.");
    }
  }

  public forgetSubscription(eventsId: string) {
    if (this.observables)
      if (Object.hasOwn(this.observables, eventsId)) {
        delete this.observables[eventsId];
      }
  }

  public forgetAllSubscriptions() {
    this.forEachSubscription((_u: string, eventsId: string, _sub: PsiReplaySubject<any>) => {
      this.forgetSubscription(eventsId);
    });
  }


  private watchdogReset(timeout: number, fn: () => any, watchdogIndex: string) {
    if (Object.hasOwn(this.watchdogTimers, watchdogIndex)) {
      clearTimeout(this.watchdogTimers[watchdogIndex]);
    }

    this.watchdogTimers[watchdogIndex] = setTimeout(fn, timeout);
  }

  public disableWatchdogs() {
    // disabilito tutti i watchdog
    const nullWatchdogs: Watchdogs = {};
    const existingWDKeys = Object.keys(this.watchdogTimers);
    for (const k in existingWDKeys) {
      nullWatchdogs[k] = {
        delayFactor: 10,
        fn: () => {}
      };
    }
    this.allWatchdogsReset(nullWatchdogs, 86400 * 1000);
  }

  public async allWatchdogsReset(wdcbs: Watchdogs, baseKeepaliveTimeout: number) {
    const keys = Object.keys(wdcbs);
    for (const k of keys) {
      await this.watchdogReset(wdcbs[k].delayFactor * baseKeepaliveTimeout, wdcbs[k].fn, k);
    }
  }

  /**
   * Chiama il server per attivare un sessionId, da usare per tutte le successive chiamate SSE,
   * e per stabilire l'unica connessione SSE persistente per l'intera webapp.
   *
   * Se un id sessione è già presente nella sessione, allora usa quello.
   */
  async openConnection(): Promise<void> {
    const sThis = this;
    return new Promise<void>((resolve, reject) => {
      console.log("Stabilisco la connessione SSE");
      const existingSessionid = sThis.getSessionId();
      const sessionId = existingSessionid ?? uuidv4();
      if (sessionId === existingSessionid) {
        console.log("Recupero da sessionStorage il precedente ID per la connessione SSE: " + sessionId);
      } else {
        console.log("Creo un nuovo ID per la connessione SSE: " + sessionId);
      }
      const url = CRUD_BASE + '/sse/' + sessionId + "?keepalive=5000";
      sThis.abortController = new AbortController();

      const { signal } = sThis.abortController;
      sThis.connection = fetchEventSource(url, {
        headers: sThis.baseSvc.addBearerToken().headers,
        async onopen(response: Response) {
          if (response.ok && response.headers.get('content-type').startsWith(EventStreamContentType)) {
            sThis.resubscribeAll(); // nel caso in cui ci fossero delle subscribe rimaste appese su una vecchioa connessione SSE
            return; // everything's good
          } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
              // client-side errors are usually non-retriable:
              throw new FatalError();
          } else {
              throw new RetriableError();
          }
        },
        // eslint-disable-next-line prefer-arrow/prefer-arrow-functions
        onmessage(ev: EventSourceMessage) {
          try {
            if (ev.data) {
              const data = JSON.parse(ev.data);
              const eventId = data.event_id ?? undefined;
              if (eventId === 'sessionId') {
                console.log("Salvo il nuovo ID connessione SSE in sessionStorage: " + data.event_payload);
                sThis.setSessionId(data.event_payload);
                resolve();
                sThis.onVisibilityChange();
                console.log("La connessione SSE è ora stabilita");
              } else if (eventId === 'keepalive') {
                sThis.onKeepalive(parseInt(data.event_payload));
              } else {
                const serverSessionId = data.session_id ?? undefined;
                if (serverSessionId !== sThis.getSessionId()) {
                  reject("Il server ha inviato i dati per un vecchio ID di sessione: " + serverSessionId + ", mentre quello attuale è " + sessionId);
                } else {
                  console.log('Ricevuti dati SSE per ID sessione ' + sThis.getSessionId());
                  if (sThis.observables[eventId]) {
                    // console.log('URL richiesta: ' + SSE.observables[eventId].url);
                    if (Array.isArray(data.event_payload)) {
                      const payload = data.event_payload;
                      /*
                      for (const p_el of payload) {
                        const strpel = JSON.stringify(p_el);
                        console.log('Payload ricevuto: ' + strpel.substring(0, Math.min(20, strpel.length)));
                      }*/
                      sThis.observables[eventId].subject.next(payload);
                    } else {
                      console.error("Dati SSE non validi, deve essere un array: " + JSON.stringify(data.event_payload));
                    }
                  }
                }
              }
            }
          } catch (err: any) {
            console.error(err);
          }
        },
        onclose() {
          // if the server closes the connection unexpectedly, retry:
          throw new RetriableError();
        },
        onerror(err) {
          if (err instanceof FatalError) {
            throw err; // rethrow to stop the operation
          } else {
            // connessione persa, tento il ripristino
            console.log("Connessione SSE persa, tento il ripristino.");
            sThis.unsubscribeAll().then(() => {
              sThis.closeConnection();
              sThis.checkConnection().then((_v: boolean) => {
                console.log("Tentativo eseguito, provo a ripristinare le sottoscrizioni.");
                sThis.resubscribeAll().then(() => {
                  console.log("Tentativo di ripristino sottoscrizioni eseguito, non ci resta che sperare.");
                });
              });
            });
          }
        },
        signal
      });
    });
  }

  public closeConnection() {
    console.log("Chiudo la connessione SSE");
    this.abortController?.abort();
    this.connection = undefined;
    this.setSessionId(undefined);
  }

  async checkConnection(): Promise<any> {
    const sessionId = this.getSessionId();
    if (typeof sessionId === 'undefined' || typeof this.connection === 'undefined') {
      console.log("Manca l'ID della connessione SSE in sessionStorage, se non ho ancora ricreato la connessione lo faccio ora");
      this.closeConnection();
      return this.openConnection();
    }
  }

  onVisibilityChange() {
    /*
    document.removeEventListener('visibilitychange', this.visibilityChangeHandler);
    document.addEventListener('visibilitychange', this.visibilityChangeHandler);
    */
  }

  public async unsubscribeAll() {
    await this.forEachSubscription(async (_url: string, eventsId: string, _subj: Observable<any>) => {
      await this.unsubscribe(eventsId);
    });
  }

  async resubscribeAll() {
    console.log("Ripristino tutte le sottoscrizioni SSE");
    await this.forEachSubscription(this.subscribe.bind(this));
    console.log("Tutte le sottoscrizioni SSE sono state ripristinate");
  }

  subscribe<T=any>(url: string, eventsId: string, subject: PsiReplaySubject<T[]>, componentId: string): Promise<boolean> {
    return new Promise((resolve, _reject) => {
      const sessionId = this.getSessionId();
      if (typeof sessionId === 'undefined') {
        console.log("ID sessione della connessione SSE temporaneamente non disponibile");
        return new Promise<void>((res, _rej) => {
          setTimeout(async () => {
            await this.subscribe(url, eventsId, subject, componentId);
            res();
          }, 500);
        });
      } else {
        const subscribe = 'subscribe=' + sessionId + encodeURIComponent(',') + eventsId;
        const sseUrl = url + (url.indexOf('?') >= 0 ? '&' : '?') + subscribe;
        const uas: UrlAndSubject<T[]> = {
          url,
          sseUrl,
          subject,
          componentId
        };
        if (Object.hasOwn(this.observables, eventsId)) { // esiste già una subscription con questo id
          if (this.observables[eventsId].sseUrl == uas.sseUrl) { // ed è verso lo stesso URL (cioè l'ID della connessione SSE non è cambiato)
            console.log("Ignoro la richiesta di subscription già attiva verso " + uas.sseUrl + " per conto di " + uas.componentId);
            resolve(false);
          }
        }
        this.observables[eventsId] = uas;
        const httpsub = this.baseSvc.hget(uas.sseUrl)
          .subscribe(r => {
            console.log("sottoscrizione SSE: " + uas.sseUrl);
            console.log("per conto di: " + uas.componentId);
            httpsub.unsubscribe();
            resolve(true);
          });
      }
    });
  }

  public unsubscribe(eventsId: string) {
    const sessionId = this.getSessionId();
    if (typeof sessionId !== 'undefined') {
      console.log("Comunico al server di interrompere l'invio di dati sulla subscription " + eventsId);
      const delurl = CRUD_BASE + '/sse/' + sessionId + '/' + eventsId;
      const httpsub = this.baseSvc.hdelete(delurl).subscribe(d => {
        console.log("Il server ha confermato l'interruzione per " + eventsId);
        httpsub.unsubscribe();
      });
    }
  }

  async onKeepalive(baseKeepaliveTimeout: number) {
    // il server mi ha mandato il keepalive, quindi smetto di attenderlo
    // e faccio ripartire il timeout per attendere il prossimo (entro il
    // timeout ricevuto qui come parametro, che mi è stato inviato dal server)
    this.allWatchdogsReset({
      "CONNESSIONEKO": {
        delayFactor: 3,
        // se arrivo qui significa che il server non mi ha mandato il keepalive entro
        // 3 volte l'intervallo che aveva promesso, quindi ripristino la connessione SSE
        fn: async () => {
          this.dbgSvc.segnalaConnessioneKo();
          await this.unsubscribeAll();
          this.closeConnection();
          await this.checkConnection();
          await this.resubscribeAll();
        }
      }
    }, baseKeepaliveTimeout);
    // essendo arrivato il keepalive, elimino le eventuali segnalazioni di
    // problemi precedenti: questo farà chiudere eventuali popup sui monitor/totem
    this.dbgSvc.segnalaConnessioneOk();

  }
}

