import { TRPCRequestMessage, TRPCResponseMessage } from '@trpc/server/rpc'
import { AnyRouter, inferRouterError } from '@trpc/server'
import { observable, Observer, UnsubscribeFn } from '@trpc/server/observable'
import { Operation, TRPCClientError, TRPCClientRuntime, TRPCLink } from '@trpc/client'
import { BasedClient } from '@based/client'
import { log } from '@shared/logger'


export type OperationScope = { orgAliasId?: string; projectAliasId?: string, userId: string }
export type OperationWithScope = Operation & {
  context: {
    scope: OperationScope
  }
}

type BasedCallbackResult<
  TRouter extends AnyRouter,
  TOutput
> = TRPCResponseMessage<TOutput, inferRouterError<TRouter>>

type TBasedCallbackObserver<TRouter extends AnyRouter, TOutput> = Observer<
  BasedCallbackResult<TRouter, TOutput>,
  TRPCClientError<TRouter>
>

export interface BasedClientOptions {
  based: BasedClient
  functionName: string,
}

export function createBasedTrpcClient(opts: BasedClientOptions) {
  const { based, functionName } = opts
  type TCallbacks = TBasedCallbackObserver<AnyRouter, unknown>

  function request(op: OperationWithScope, callbacks: TCallbacks): UnsubscribeFn {
    const { type, input, path, id, context } = op
    console.log('createBasedTrpcClient request', op)
    const scope = context.scope
    console.log("go with scope", scope)
    const envelope: TRPCRequestMessage & {
      scope: { orgAliasId?: string; projectAliasId?: string, userId?: string }
    } = {
      id,
      method: type,
      params: {
        input,
        path,
      },
      scope,
    }



    let unsubscribeRef = null;
    let isBasedErrorCb = false

    if (type === 'subscription') {
      unsubscribeRef = based.query(functionName, envelope, { persistent: false }).subscribe(
        (data) => {
          if (!data) {
            // this happens when there is a bug in the diffing algo
            return
          }

          if (data.result) {
            callbacks.next(data)
          } else if (data.error) {
            callbacks.error(TRPCClientError.from(data))
          } else {
            //console.log('Unknown/unhandled based trpc response: ', data)
            log.error(data, 'Unknown/unhandled based trpc response ')
          }
        },
        (err) => {
          callbacks.error(err as any)
          isBasedErrorCb = true
        }
      )
    }
    else {
      based.query(functionName, envelope, { persistent: false }).get().then(
        (data) => {
          if (data.result) {
            callbacks.next(data)
          } else if (data.error) {
            callbacks.error(TRPCClientError.from(data))
          } else {
            console.log('Unknown/unhandled based trpc response: ', data)
            log.error(data, 'Unknown/unhandled based trpc response ')
          }
        },
        (err) => {
          callbacks.error(err as any)
          isBasedErrorCb = true
        }
      )
    }


    return () => {
      if (type === 'subscription') {

        !isBasedErrorCb && unsubscribeRef && unsubscribeRef()
      }
    }
  }
  return {
    request,
  }
}

export type TRPCBasedClient = ReturnType<typeof createBasedTrpcClient>

export interface BasedLinkOptions {
  client: TRPCBasedClient
}

class TRPCBasedClosedError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'TRPCBasedClosedError'
    Object.setPrototypeOf(this, TRPCBasedClosedError.prototype)
  }
}

class TRPCSubscriptionEndedError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'TRPCSubscriptionEndedError'
    Object.setPrototypeOf(this, TRPCSubscriptionEndedError.prototype)
  }
}

export function basedLink<TRouter extends AnyRouter, OperationScope extends { scope: any }>(
  opts: BasedLinkOptions
): TRPCLink<TRouter> {
  return (runtime: TRPCClientRuntime & OperationScope) => {
    const { client } = opts
    return ({ op }) => {
      return observable((observer) => {
        let isDone = false;

        let unsubInitialized = false;  // Flag to check if unsub has been initialized

        /*
         * safeUnsub is designed to safely handle the unsubscribe action.
         * If unsub hasn't been initialized when it's called, it will retry 
         * up to 3 times with a delay of 20ms between each attempt. 
         * This accounts for potential timing issues where unsub might 
         * be invoked slightly before it's properly initialized.
         */
        const safeUnsub = (retry = 0) => {
          if (unsubInitialized) {
            actualUnsub();
          } else if (retry < 3) {
            setTimeout(() => safeUnsub(retry + 1), 20);  // Retry in 20ms
          } else {
            //console.error('Failed to unsubscribe from request.');
            log.error('Failed to unsubscribe from request.');
          }
        };

        let unsub = safeUnsub;

        const actualUnsub = client.request({
          ...op,
          scope: runtime.scope
        }, {
          error(err) {
            isDone = true;
            observer.error(err as TRPCClientError<any>);
            unsub();
          },
          complete() {
            if (!isDone) {
              isDone = true;
              observer.error(
                TRPCClientError.from(
                  new TRPCSubscriptionEndedError('Operation ended prematurely')
                )
              );
            } else {
              observer.complete();
            }
          },
          next(message) {
            observer.next({
              // @ts-ignore
              result: message.result,
            });

            if (op.type !== 'subscription') {
              // if it isn't a subscription we don't care about next response
              isDone = true;
              unsub();
              observer.complete();
            }
          },
        });

        unsubInitialized = true;  // Mark unsub as initialized

        return () => {
          isDone = true;
          unsub();
        }
      });
    }
  }
}
