import * as zmq from 'zeromq';

import { makeNamespacedLog } from '@sb/log';

import type { MessageFns } from './auto_generated/messenger/messenger';
import {
  CommunicationType,
  Envelope,
  ErrorResponse,
} from './auto_generated/messenger/messenger';
import { TimeoutError } from './utils/TimeoutError';

const log = makeNamespacedLog('messenger-request-sender');

/**
 * Represents a pending request
 */
interface PendingRequest {
  resolve: (value: any) => void;
  reject: (reason: any) => void;
  timeout: NodeJS.Timeout;
  requestType: MessageFns<any>;
  responseType: MessageFns<any>;
}

/**
 * RequestSender - Sends requests to other services and handles responses
 * This acts as the "client" side of the node.
 */
export class RequestSender {
  // Node information
  private serviceKey: string;

  // Connection sockets by endpoint (host:port)
  private connections: Map<string, zmq.Dealer> = new Map();

  // Pending requests by message ID
  private pendingRequests: Map<string, PendingRequest> = new Map();

  // State
  private running: boolean = false;

  private shuttingDown: boolean = false;

  private debug: boolean = false;

  private requestTimeout: number = 2000;

  private messageCounter: number = 0;

  /**
   * Create a new RequestSender
   * @param serviceKey - Service key for this sender
   * @param timeout - Request timeout in milliseconds
   */
  public constructor(serviceKey: string, timeout: number = 2000) {
    this.serviceKey = serviceKey;
    this.requestTimeout = timeout;
  }

  /**
   * Start the request sender
   */
  public start(): void {
    if (this.running) {
      return;
    }

    this.running = true;

    if (this.debug) {
      log.info(
        'sender.started',
        `RequestSender for ${this.serviceKey} started`,
      );
    }
  }

  /**
   * Stop the request sender
   */
  public stop(): void {
    if (!this.running) {
      return;
    }

    this.shuttingDown = true;
    this.running = false;

    // Close all connections
    Array.from(this.connections.keys()).forEach((endpoint) => {
      const socket = this.connections.get(endpoint);

      try {
        socket?.close();

        if (this.debug) {
          log.info(
            'sender.connection-closed',
            `Closed connection to ${endpoint}`,
          );
        }
      } catch (error) {
        log.error(
          'sender.connection-close-error',
          `Error closing connection to ${endpoint}: ${error instanceof Error ? error.message : String(error)}`,
        );
      }
    });

    this.connections.clear();

    for (const [_, pendingRequest] of Array.from(
      this.pendingRequests.entries(),
    )) {
      clearTimeout(pendingRequest.timeout);
      pendingRequest.reject(new Error('RequestSender stopped'));
    }

    this.pendingRequests.clear();

    if (this.debug) {
      log.info(
        'sender.stopped',
        `RequestSender for ${this.serviceKey} stopped`,
      );
    }
  }

  /**
   * Set debug mode
   * @param debug - Enable or disable debug logging
   */
  public setDebug(debug: boolean): void {
    this.debug = debug;
  }

  /**
   * Set the request timeout
   * @param timeout - Timeout in milliseconds
   */
  public setTimeout(timeout: number): void {
    this.requestTimeout = timeout;
  }

  /**
   * Send a request to a service
   * @param endpoint - Endpoint string (host:port)
   * @param messageType - Message type
   * @param payload - Message payload
   * @returns A promise that resolves with the response
   */
  public async sendRequest(
    endpoint: string,
    messageType: string,
    payload: any,
    requestType: MessageFns<any>,
    responseType: MessageFns<any>,
  ): Promise<any> {
    if (!this.running) {
      throw new Error('RequestSender is not running');
    }

    const messageId = this.generateMessageId().toString();

    const requestEnvelope = Envelope.encode({
      messageId,
      messageType,
      communicationType: CommunicationType.REQUEST,
      payload: new Uint8Array(
        Buffer.from(requestType.encode(payload).finish()),
      ),
    }).finish();

    // Create promise for the response
    const requestPromise = new Promise<Uint8Array>((resolve, reject) => {
      // Set timeout
      const timeout = setTimeout(() => {
        this.pendingRequests.delete(messageId);

        reject(
          new TimeoutError(`Request timed out after ${this.requestTimeout}ms`, {
            endpoint,
            method: messageType,
            timeout: this.requestTimeout,
          }),
        );
      }, this.requestTimeout);

      // Store pending request
      this.pendingRequests.set(messageId, {
        resolve,
        reject,
        timeout,
        requestType,
        responseType,
      });
    });

    const targetSocket = this.getSocket(endpoint);

    try {
      await targetSocket.send([Buffer.from(requestEnvelope)]);
    } catch (error) {
      log.error(
        'sender.send-error',
        `Error sending request to ${endpoint}: ${error instanceof Error ? error.message : String(error)}`,
      );

      const pendingRequest = this.pendingRequests.get(messageId);

      if (pendingRequest) {
        pendingRequest.reject(new Error('Error sending request'));
      }

      this.pendingRequests.delete(messageId);
      this.cleanupConnection(endpoint);
    }

    // Return promise that will resolve when response is received
    return requestPromise;
  }

  private getSocket(endpoint: string): zmq.Dealer {
    let socket = this.connections.get(endpoint);
    const tcpEndpoint = `tcp://${endpoint}`;

    if (!socket) {
      socket = new zmq.Dealer();
      socket.routingId = this.serviceKey;
      socket.connect(tcpEndpoint);
      this.connections.set(endpoint, socket);

      if (this.debug) {
        log.info('sender.connected', `Connected to ${tcpEndpoint}`);
      }

      this.startResponseHandling(socket, endpoint);
    }

    return socket;
  }

  private startResponseHandling(
    socket: zmq.Dealer,
    targetEndpoint: string,
  ): void {
    const receiveReplies = async () => {
      if (!this.running || this.shuttingDown) return;

      try {
        const messages = await socket.receive();

        if (this.shuttingDown) return;

        const [envelopeBuffer] = messages;

        const envelope: Envelope = Envelope.decode(
          new Uint8Array(envelopeBuffer),
        );

        if (this.debug) {
          log.info(
            'client.received-message',
            `Received message from ${targetEndpoint}`,
          );
        }

        try {
          this.handleResponse(envelope);
        } catch (parseError: unknown) {
          log.error(
            'client.reply-parse-error',
            `Error parsing message: ${parseError instanceof Error ? parseError.message : String(parseError)}`,
          );
        }

        // Continue receiving replies only after the previous operation is complete
        if (this.running && !this.shuttingDown) {
          // Use setImmediate to avoid stack overflow
          setImmediate(() => receiveReplies());
        }
      } catch (error: unknown) {
        // The receive method will throw an error if the socket is closed
        if (!this.shuttingDown) {
          log.error(
            'client.reply-receive-error',
            `Error receiving reply from ${targetEndpoint}: ${error instanceof Error ? error.message : String(error)}`,
          );

          // Clean up the connection on error
          this.cleanupConnection(targetEndpoint).catch(() => {});

          // Try to reconnect after a short delay
          if (this.running && !this.shuttingDown) {
            setTimeout(() => receiveReplies(), 1000);
          }
        }
      }
    };

    // Start receiving replies
    receiveReplies();
  }

  /**
   * Handle a response message
   * @param envelope - Response envelope
   * @private
   */
  private handleResponse(envelope: Envelope): void {
    const { messageId, payload, communicationType } = envelope;

    if (!messageId) {
      log.error(
        'sender.missing-message-id',
        'Received response without message ID',
      );

      return;
    }

    const pending = this.pendingRequests.get(messageId);

    if (!pending) {
      if (this.debug) {
        log.info(
          'sender.unknown-response',
          `Received response for unknown request ${messageId}`,
        );
      }

      return;
    }

    // Clear timeout
    clearTimeout(pending.timeout);

    if (communicationType === CommunicationType.ERROR) {
      const errorResponse = ErrorResponse.decode(payload);
      log.error('sender.error-response', errorResponse.error.toString());
      pending.reject(new Error(errorResponse.error.toString()));
    } else if (communicationType === CommunicationType.REPLY) {
      try {
        // Resolve promise
        const decodedPayload = pending.responseType.decode(payload);
        pending.resolve(decodedPayload);
      } catch (error) {
        // Add enhanced logging for response parsing errors
        log.error(
          'sender.response-error',
          `[${messageId}] Error parsing response: ${error}`,
          {
            messageId,
            messageType: envelope.messageType,
            error: String(error),
          },
        );

        pending.reject(error);
      }
    } else {
      log.error(
        'sender.unknown-response-type',
        `Received unknown response type ${communicationType}`,
      );
    }

    if (this.debug) {
      log.info(
        'sender.response-received',
        `Received response for ${messageId}`,
      );
    }

    // Remove from pending
    this.pendingRequests.delete(messageId);
  }

  /**
   * Cleanup a connection
   * @param endpoint - Endpoint string (host:port)
   * @private
   */
  private async cleanupConnection(endpoint: string): Promise<void> {
    const socket = this.connections.get(endpoint);

    if (socket) {
      try {
        await socket.close();

        if (this.debug) {
          log.info(
            'sender.connection-closed',
            `Closed connection to ${endpoint}`,
          );
        }
      } catch (error) {
        log.error(
          'sender.connection-close-error',
          `Error closing connection to ${endpoint}: ${error instanceof Error ? error.message : String(error)}`,
        );
      }

      this.connections.delete(endpoint);
    }
  }

  private generateMessageId(): number {
    this.messageCounter += 1;

    return this.messageCounter;
  }
}
