import * as zmq from 'zeromq';

import { makeNamespacedLog } from '@sb/log';

import type { MessageFns } from './auto_generated/messenger/messenger';
import {
  CommunicationType,
  Envelope,
  ErrorResponse,
} from './auto_generated/messenger/messenger';

const log = makeNamespacedLog('messenger-request-handler');

export interface HandlerConfig<RequestType = any, ResponseType = any> {
  handler: (payload: RequestType) => Promise<ResponseType>;
  requestType: MessageFns<RequestType>;
  responseType: MessageFns<ResponseType>;
}

/**
 * RequestHandler - Handles incoming requests and sends responses
 * This acts as the "server" side of the node.
 */
export class RequestHandler {
  // Server socket
  private socket: zmq.Router;

  // Node information
  private serviceKey: string;

  private host: string;

  private port: number = 0;

  // Message handlers
  private requestHandlers: Map<string, HandlerConfig> = new Map();

  // State
  private running: boolean = false;

  private shuttingDown: boolean = false;

  private debug: boolean = false;

  /**
   * Create a new RequestHandler
   * @param serviceKey - Service key for this handler
   * @param host - Host to bind to
   * @param port - Port to bind to (0 for dynamic assignment)
   */
  public constructor(serviceKey: string, host: string) {
    this.serviceKey = serviceKey;
    this.host = host;
    this.socket = new zmq.Router();
  }

  /**
   * Get the dynamic port if one was assigned
   * @returns The port number
   */
  public getPort(): number {
    return this.port;
  }

  /**
   * Get the handled request types
   * @returns An array of request types
   */
  public getRequestTypes(): string[] {
    return Array.from(this.requestHandlers.keys());
  }

  /**
   * Register a message handler
   * @param messageType - Type of message to handle
   * @param handler - Handler function
   */
  public onRequest<RequestType = any, ResponseType = any>(
    messageType: string,
    handler: HandlerConfig<RequestType, ResponseType>,
  ): void {
    this.requestHandlers.set(messageType, handler);

    if (this.debug) {
      log.info('handler.registered', `Registered handler for ${messageType}`);
    }
  }

  /**
   * Unregister a message handler
   * @param messageType - Type of message handler to remove
   */
  public offRequest(messageType: string): void {
    if (!this.requestHandlers.delete(messageType)) {
      throw new Error(`Handler for ${messageType} not found`);
    }

    if (this.debug) {
      log.info(
        'handler.unregistered',
        `Unregistered handler for ${messageType}`,
      );
    }
  }

  /**
   * Start the request handler
   * @returns A promise that resolves when the handler is started
   */
  public async start(port: number): Promise<void> {
    this.port = port;

    if (this.running || this.shuttingDown) {
      return;
    }

    try {
      // Bind to specified host and port
      const bindAddress = `tcp://${this.host}:${this.port}`;
      await this.socket.bind(bindAddress);

      this.running = true;

      if (this.debug) {
        log.info(
          'handler.started',
          `Handler for ${this.serviceKey} started on ${this.host}:${this.port}`,
        );
      }

      // Start handling messages
      this.startMessageHandling();
    } catch (error) {
      log.error(
        'handler.start-error',
        `Error starting handler: ${error instanceof Error ? error.message : String(error)}`,
      );

      try {
        this.socket.close();
      } catch (closeError) {
        // Ignore close errors
      }

      throw error;
    }
  }

  /**
   * Stop the request handler
   * @returns A promise that resolves when the handler is stopped
   */
  public stop(): void {
    if (!this.running) {
      return;
    }

    this.shuttingDown = true;
    this.running = false;

    // Close the socket
    try {
      this.socket.close();

      if (this.debug) {
        log.info('handler.stopped', `Handler for ${this.serviceKey} stopped`);
      }
    } catch (error) {
      log.error(
        'handler.stop-error',
        `Error stopping handler: ${error instanceof Error ? error.message : String(error)}`,
      );
    }
  }

  /**
   * Set debug mode
   * @param debug - Enable or disable debug logging
   */
  public setDebug(debug: boolean): void {
    this.debug = debug;
  }

  /**
   * Start message handling loop
   * @private
   */
  private startMessageHandling(): void {
    const receiveLoop = async () => {
      if (!this.running || this.shuttingDown) return;

      try {
        const messages = await this.socket.receive();

        if (this.shuttingDown) return;

        const [identity, envelopeBuffer] = messages;

        const envelope: Envelope = Envelope.decode(
          new Uint8Array(envelopeBuffer),
        );

        if (this.debug) {
          log.info(
            'handler.received',
            `Received new message request from ${identity.toString()}: ${JSON.stringify(envelope)}`,
          );
        }

        try {
          await this.handleIncomingMessage(identity.toString(), envelope);
        } catch (error) {
          log.error(
            'handler.message-parse-error',
            `Error handling message: ${error instanceof Error ? error.message : String(error)}`,
          );

          try {
            await this.sendErrorReply(
              identity.toString(),
              envelope.messageId,
              envelope.messageType,
              `Error handling message: ${error instanceof Error ? error.message : String(error)}`,
            );
          } catch (sendError) {
            log.error(
              'handler.send-error-failed',
              `Failed to send error response: ${sendError instanceof Error ? sendError.message : String(sendError)}`,
            );
          }
        }

        // Only continue the loop if we're still running
        if (this.running && !this.shuttingDown) {
          // Use setImmediate to avoid stack overflow for lots of messages
          setImmediate(() => receiveLoop());
        }
      } catch (error) {
        if (!this.shuttingDown) {
          log.error(
            'handler.receive-error',
            `Error receiving message: ${error instanceof Error ? error.message : String(error)}`,
          );
        }

        // Continue with delay if still running
        if (this.running && !this.shuttingDown) {
          setTimeout(() => receiveLoop(), 1000);
        }
      }
    };

    // Start receiving
    receiveLoop();
  }

  /**
   * Handle an incoming message
   * @param identity - Identity of the sender
   * @param messageTypeStr - Message type as string
   * @param envelope - Message envelope
   * @private
   */
  private async handleIncomingMessage(
    identity: string,
    envelope: Envelope,
  ): Promise<void> {
    const { messageType, communicationType, payload } = envelope;

    try {
      // Find handler for this message type

      const handlerConfig = this.requestHandlers.get(messageType);

      if (this.debug) {
        log.info('handler.received', `Received message type: ${messageType}`);
      }

      if (communicationType !== CommunicationType.REQUEST) {
        await this.sendErrorReply(
          identity,
          envelope.messageId,
          messageType,
          `Invalid communication type: ${communicationType}`,
        );
      }

      if (!handlerConfig) {
        log.error(
          'handler.no-handler',
          `No handler for message type: ${messageType}`,
        );

        await this.sendErrorReply(
          identity,
          envelope.messageId,
          messageType,
          `No handler for message type: ${messageType}`,
        );

        return;
      }

      const handlerPayload = handlerConfig.requestType.decode(payload);

      if (this.debug) {
        log.info(
          'handler.payload',
          `Received payload of message type ${messageType}: ${JSON.stringify(handlerPayload)}`,
        );
      }

      // Execute handler
      const result = await handlerConfig.handler(handlerPayload);

      const replyEnvelope = Envelope.encode({
        messageId: envelope.messageId,
        messageType,
        communicationType: CommunicationType.REPLY,
        payload: new Uint8Array(
          Buffer.from(handlerConfig.responseType.encode(result).finish()),
        ),
      }).finish();

      // Send reply
      await this.socket.send([identity, replyEnvelope]);

      if (this.debug) {
        log.info(
          'handler.reply-sent',
          `Sent reply for ${messageType} to ${identity}`,
        );
      }
    } catch (error) {
      log.error(
        'handler.error',
        `Error handling message: ${error instanceof Error ? error.message : String(error)}`,
      );

      await this.sendErrorReply(
        identity,
        envelope.messageId,
        messageType,
        `Error processing request: ${error instanceof Error ? error.message : String(error)}`,
      );
    }
  }

  /**
   * Send an error reply
   * @param identity - Identity of the recipient
   * @param messageId - Message ID
   * @param messageType - Message type
   * @param errorMessage - Error message to send
   * @private
   */
  private async sendErrorReply(
    identity: string,
    messageId: string,
    messageType: string,
    errorMessage: string,
  ): Promise<void> {
    try {
      const errorEnvelope = Envelope.encode({
        messageId,
        messageType,
        communicationType: CommunicationType.ERROR,
        payload: new Uint8Array(
          Buffer.from(
            ErrorResponse.encode({
              error: new Uint8Array(Buffer.from(errorMessage)),
            }).finish(),
          ),
        ),
      }).finish();

      await this.socket.send([identity, errorEnvelope]);

      log.error(
        'handler.error-sent',
        `Sent error for ${messageId}: ${errorMessage}`,
      );
    } catch (error) {
      log.error(
        'handler.send-error-failed',
        `Failed to send error response: ${error instanceof Error ? error.message : String(error)}`,
      );
    }
  }
}
