import { Inject, Injectable } from '@angular/core';
import {
  BehaviorSubject,
  Observable,
  ReplaySubject,
  Subject,
  from,
  interval,
  of,
  timer,
} from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  finalize,
  map,
  retry,
  skip,
  switchAll,
  take,
  takeUntil,
  tap,
} from 'rxjs/operators';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';

import { AmplifyProvider, CoreAuthAmplifyProvider } from '../auth/providers';
import { ConfigService } from './config.service';

export const RECONNECT_INTERVAL = 5000;

export type WsTopicData = { command: 'notify'; topic: string; data: any };

// https://medium.com/@sportypriyank/websocket-service-for-enterprise-applications-using-rxjs-b6cebd52d1ae

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private baseUrl = '';

  private observablesTopics: Record<
    string,
    ReplaySubject<Observable<WsTopicData>>
  > = {};
  private status$: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(
    false
  );

  stopPing$: Subject<boolean> = new Subject<boolean>();

  // trust us
  private ws!: WebSocketSubject<WsTopicData>;

  constructor(
    @Inject(CoreAuthAmplifyProvider.providerToken)
    private readonly coreAuthAmplify: AmplifyProvider,
    private configService: ConfigService
  ) {
    this.baseUrl = this.configService.getConfig().charlyWsApi.baseUrl;
  }

  public connect() {
    this.create();
    this.connectionStatus$
      .pipe(
        skip(1),
        filter((status) => !status),
        tap(() => this.create())
      )
      .subscribe();

    return this.connectionStatus$;
  }

  private create() {
    if (this.ws) {
      this.close();
    }

    const openObserver = new Subject<Event>();
    openObserver.subscribe(() => {
      // check if we have any topics subscribed, if yes then call multiplex for
      // each topic and switch to new observable given by multiplex function.
      for (const key in this.observablesTopics) {
        this.websocketSubscribe(key, true);
      }

      // Start the ping
      this.ping();

      // Only update the status at the end.
      this.status$.next(true);
    });

    const closeObserver = new Subject<CloseEvent>();
    closeObserver.pipe(map(() => false)).subscribe(this.status$);

    return from(this.coreAuthAmplify.currentSessionIdToken()).subscribe(
      (token) => {
        // RxJS does not support adding the token as a HTTP Header,
        // so we have to add it as a query parameter.
        // The API Gateway is configured to accept the token as a query parameter.

        const url = `${this.baseUrl}?Authorization=${token}`;
        console.log('WS connecting to', this.baseUrl);
        this.ws = webSocket<any>({
          url,
          openObserver,
          closeObserver,
        });

        this.ws
          .pipe(
            retry({
              delay: (errs) => {
                console.log('retrying', errs);
                this.status$.next(false);
                console.log(
                  `Websocket connection down, will attempt reconnection in ${RECONNECT_INTERVAL}ms`,
                  new Date()
                );
                return timer(RECONNECT_INTERVAL);
              },
            })
          )
          .subscribe();
      }
    );
  }

  public get connectionStatus$(): Observable<boolean> {
    return this.status$.pipe(distinctUntilChanged());
  }

  close() {
    if (this.ws) {
      this.stopPing$.next(true);
      this.ws.unsubscribe();
    }
  }

  websocketSubscribe(topic: string, reconnect = false) {
    console.log('websocketSubscribe', topic, reconnect);
    if (this.observablesTopics[topic] && !reconnect) {
      // check if we have replay subject for the topic
      return this.observablesTopics[topic].pipe(
        switchAll<Observable<WsTopicData>>()
      );
    } else {
      //if it is reconnection then call multiplex and switch to new observable
      const messagesSubject$ =
        this.observablesTopics[topic] || new ReplaySubject<Observable<any>>(1);

      // if connected
      if (this.status$.value) {
        messagesSubject$.next(
          this.ws
            .multiplex(
              () => ({ action: 'subscribe', topic }),
              () => ({ action: 'unsubscribe', topic }),
              (message: WsTopicData) => {
                return message.topic === topic && message.command === 'notify';
              }
            )
            .pipe(
              finalize(() => {
                if (!this.observablesTopics[topic].observed) {
                  // Remove the ReplaySubject if all components are unsubscribed.
                  this.observablesTopics[topic].complete();
                  delete this.observablesTopics[topic];
                }
              })
            )
        );
      } else {
        messagesSubject$.next(of()); //dummy observable
      }
      this.observablesTopics[topic] = messagesSubject$;
      return this.observablesTopics[topic].pipe(
        switchAll<Observable<WsTopicData>>()
      );
    }
  }

  message(message: any) {
    this.connectionStatus$
      .pipe(
        filter((status) => status),
        tap(() => this.ws?.next(message)),
        take(1)
      )
      .subscribe();
  }

  ping() {
    const pingInterval =
      this.configService.getConfig().charlyWsApi.pingInterval;
    if (this.configService.getConfig().charlyWsApi.pingInterval) {
      interval(pingInterval)
        .pipe(takeUntil(this.stopPing$))
        .subscribe(() => {
          this.message({ action: 'ping' });
        });
    }
  }
}
