import { useCallback, useEffect, useRef, useState } from 'react';
import {
  EventStreamContentType,
  fetchEventSource
} from '@microsoft/fetch-event-source';
import { useAxios } from '@rc/utils/axios';
import { useLogs } from './useLogsBase';
import { LOGS_PAGE_SIZE } from '@rc/admin/constants';

function parseLog (log) {
  const match = log.match(/^\[(.*?) UTC\]\s([A-Z\s]+):\s(.*)$/);
  if (match) {
    return {
      timestamp: `${match[1]}`,
      source: match[2],
      message: match[3],
      raw: log
    };
  }
  return { message: log, raw: log };
}

export function useMercureLogs () {
  const abortControllerRef = useRef(null);
  const consumerGroupId = useRef(null);
  const [, getTopic] = useAxios({}, { manual: true });

  const subscribe = useCallback(
    /**
     *
     * @param {import('./useLogsBase').SubscribeParams} param
     */
    async ({
      record,
      filters,
      offset,
      setLogs,
      setIsPaging,
      setIsLoading,
      setIsRunning
    }) => {
      const isUserPaging = offset === -1 && consumerGroupId.current;
      setIsPaging(isUserPaging);

      if (!isUserPaging) {
        setLogs([]);
        setIsLoading(true);
      }

      const headers = {
        'x-filters': JSON.stringify({ filters }),
        'x-offset': offset,
        'x-page-size': LOGS_PAGE_SIZE
      };

      if (isUserPaging) {
        headers['x-consumer-group-id'] = consumerGroupId.current;
      }

      try {
        const { data: topicData } = await getTopic({
          url: `/api/environments/${record.originId}/logs`,
          headers
        });

        consumerGroupId.current = topicData?.consumerGroupId;

        if (topicData?.topic) {
          const { topic, token } = topicData;
          const entrypoint =
            document.getElementById('mercure-entrypoint').innerText;
          const url = new URL(entrypoint, window.location.origin);
          url.searchParams.append('topic', topic);

          abortControllerRef.current = new AbortController();

          fetchEventSource(url.toString(), {
            credentials: 'omit',
            headers: { Authorization: 'Bearer ' + token },
            openWhenHidden: true,
            signal: abortControllerRef.current.signal,
            onopen: response => {
              if (
                response.ok &&
                response.headers.get('content-type') === EventStreamContentType
              ) {
                setIsRunning(true);
              }
            },
            onmessage: message => {
              try {
                const parsedData = JSON.parse(message.data);
                if (parsedData.data) {
                  const logString = window.atob(parsedData.data);
                  const log = parseLog(logString);
                  setLogs(prevLogs => [...prevLogs, log]);
                  if (offset === -1) {
                    setIsPaging(false);
                  }
                }
              } catch (error) {
                console.error('Error parsing message:', error);
              }
            },
            onerror: error => {
              console.error('Stream error:', error);
              setIsRunning(false);
            }
          });
        }
      } catch (error) {
        console.error('Subscription error:', error);
        consumerGroupId.current = null;
      }

      setIsLoading(false);
    },
    [getTopic]
  );

  /**
   *
   * @param {import('./useLogsBase').MoreParams} param
   */
  const more = useCallback(
    async ({
      record,
      filters,
      setLogs,
      setIsPaging,
      setIsLoading,
      setIsRunning
    }) => {
      // In live mode, we do not allow "more".
      // First, abort the current stream.
      abortControllerRef.current?.abort();
      abortControllerRef.current = null;

      // Then, subscribe again with offset = -1 (paging mode).
      await subscribe({
        record,
        filters,
        offset: -1,
        setLogs,
        setIsPaging,
        setIsLoading,
        setIsRunning
      });
    },
    [subscribe]
  );

  const unsubscribe = useCallback(() => {
    setIsRunning(false);
    consumerGroupId.current = null;
    abortControllerRef.current?.abort();
    abortControllerRef.current = null;
  }, []);

  const stop = useCallback(
    /**
     *
     * @param {import('./useLogsBase').StopParams} param
     */
    ({ isRunning }) => {
      if (isRunning) {
        unsubscribe();
      }
    },
    [unsubscribe]
  );

  return useLogs({ subscribe, more, stop });
}
