import ReconnectingEventSource from 'reconnecting-eventsource';
import { injectable } from 'inversify';
import {
  IChatEventsService,
  ISseStreamParams,
  MessagesListener,
} from './chat-events.types';
import 'reflect-metadata';
import {
  BehaviorSubject,
  Observable,
  Subject,
  distinctUntilChanged,
  filter,
  switchMap,
} from 'rxjs';
import { logger } from '@/src/utils/common.utils';

@injectable()
export class SseChatEventsService implements IChatEventsService {
  private source: ReconnectingEventSource | null = null;
  private listeners: MessagesListener[] = [];
  private _destroy$ = new Subject<void>();
  public streamParams$: BehaviorSubject<ISseStreamParams> =
    new BehaviorSubject<ISseStreamParams>({
      chatId: undefined,
      topic: undefined,
    });

  public stream$ = this.streamParams$.asObservable().pipe(
    filter((params) => Boolean(params?.chatId)),
    // takeUntil(this._destroy$),
    distinctUntilChanged((prev, curr) => {
      return prev.chatId === curr.chatId && prev.topic === curr.topic;
    }),
    switchMap(({ chatId, topic }) => {
      return this.toSse(chatId, topic);
    }),
  );

  public subscribeToMessageEvents = (listener: MessagesListener) => {
    this.listeners.push(listener);
  };

  public unsubscribe = (listener: MessagesListener) => {
    this.listeners = this.listeners.filter((item) => item !== listener);
  };

  public close = () => {
    this.listeners = [];
    this._destroy$.next();
    this._destroy$.complete();
    this.streamParams$.next({
      chatId: undefined,
      topic: undefined,
    });
    if (this.source) {
      this.source.close();
      this.source = null;
    }
  };

  private toSse = (
    chatId?: string,
    topic?: string,
    experience?: string,
  ): Observable<ISseStreamParams> => {
    return new Observable((observer) => {
      if (typeof EventSource !== 'undefined') {
        const queryObject: Record<string, string> = {};

        if (topic) {
          queryObject['topic'] = topic;
        }

        if (experience) {
          queryObject['experience'] = experience;
        }

        const queryString = '?' + new URLSearchParams(queryObject).toString();

        this.source = new ReconnectingEventSource(
          `/api/message/${chatId}${queryString}`,
          {},
        );

        this.source.onmessage = (message) => {
          const data = JSON.parse(
            Buffer.from(message.data, 'base64').toString(),
          );
          this.listeners.forEach((listener) => {
            listener(data);
          });
        };

        this.source.onopen = () => {
          logger('[SSE]: event stream started');
          observer.next({ chatId, topic });
          observer.complete();
        };

        this.source.onerror = (e) => {
          console.error('[SSE]: event stream error ', e);
          observer.next({ chatId, topic });
          observer.complete();
        };

        window.addEventListener('beforeunload', () => {
          if (this.source) {
            this.source.close();
            observer.complete();
          }
        });
      } else {
        throw new Error('EventSource is not available in this environment');
      }
    });
  };
}
