// Use namespaced logging
import net from 'net';

import { makeNamespacedLog } from '@sb/log';

import type { DefaultService } from './auto_generated/messenger/messenger';
import type { RegistryService } from './auto_generated/messenger/registry_service';
import {
  LivePeersRequest,
  LivePeersResponse,
  LiveServicesRequest,
  LiveServicesResponse,
  RegisterRequest,
  RegisterResponse,
  UnregisterRequest,
  UnregisterResponse,
  RegistryRequests,
  protobufPackage as Registry,
} from './auto_generated/messenger/registry_service';
import { DefaultConnector } from './connectors/generated';
import { MessengerNode } from './node';

const log = makeNamespacedLog('messenger-registry');

/**
 * MessengerRegistry - Extends MessengerNode to track services across hosts
 */
export class MessengerRegistry
  extends MessengerNode
  implements RegistryService
{
  // Port allocation range
  private minPort: number = 5556;

  private maxPort: number = 5585;

  private nextPort: number = this.minPort;

  // Health check settings
  private checkInterval: NodeJS.Timeout | null = null;

  private activeCheckInterval: NodeJS.Timeout | null = null;

  private inactiveCheckInterval: NodeJS.Timeout | null = null;

  private checkIntervalMs: number = 1000;

  private maxHealthCheckAttempts: number = 5;

  private serviceClients: Map<string, DefaultService> = new Map();

  private serviceActive: Map<string, boolean> = new Map();

  // Track failed health check attempts per service
  private serviceFailedAttempts: Map<string, number> = new Map();

  // How often to check inactive services (30 seconds)
  private inactiveCheckIntervalMs: number = 30000;

  private externalUsedPorts: Set<number> = new Set<number>();

  /**
   * Create a new Registry
   * @param options - Configuration options for the registry
   */
  public constructor(options: any = {}) {
    super(Registry, options);

    this.port = this.registryPort;

    log.info(
      'registry.constructor',
      `Registry created on port ${this.port} at ${this.host}`,
    );

    this.serviceActive.set(this.serviceKey, true);
  }

  /**
   * Start the registry
   * @returns A promise that resolves to true if the registry was started successfully
   * @throws {Error} When the registry is already running
   * @throws {Error} When binding to the socket fails
   * @throws {Error} When any other error occurs during startup
   */
  public async start(): Promise<boolean> {
    if (this.running) {
      return Promise.resolve(true);
    }

    try {
      log.info('registry.starting', 'Starting messenger registry');

      this.registered = true; // Registry is always registered
      this.running = true;

      this.requestSender.start();

      await this.performStartupHealthCheck(); // Recover any services that were active but not responding

      this.requestHandler.start(this.port);

      this.registerDefaultHandlers();

      this.requestHandler.onRequest(
        RegistryRequests[RegistryRequests.RegisterClient],
        {
          handler: this.RegisterClient.bind(this),
          requestType: RegisterRequest,
          responseType: RegisterResponse,
        },
      );

      this.requestHandler.onRequest(
        RegistryRequests[RegistryRequests.UnregisterClient],
        {
          handler: this.UnregisterClient.bind(this),
          requestType: UnregisterRequest,
          responseType: UnregisterResponse,
        },
      );

      this.requestHandler.onRequest(
        RegistryRequests[RegistryRequests.GetLiveServices],
        {
          handler: this.GetLiveServices.bind(this),
          requestType: LiveServicesRequest,
          responseType: LiveServicesResponse,
        },
      );

      this.requestHandler.onRequest(
        RegistryRequests[RegistryRequests.GetLivePeers],
        {
          handler: this.GetLivePeers.bind(this),
          requestType: LivePeersRequest,
          responseType: LivePeersResponse,
        },
      );

      // Start health check interval for monitoring clients
      this.startHealthCheck();

      return Promise.resolve(true);
    } catch (error) {
      log.error(
        'registry.start-error',
        `Error starting registry: ${error instanceof Error ? error.message : String(error)}`,
      );

      // Clean up any partially initialized resources
      this.running = false;
      throw error;
    }
  }

  /**
   * Stop the registry and health check
   * @returns A promise that resolves when the registry has been stopped
   */
  public async stop(): Promise<void> {
    this.stopHealthCheck();
    await super.stop();
  }

  /**
   * Start periodic health checks
   * @private
   */
  private startHealthCheck(): void {
    if (this.checkInterval) {
      return;
    }

    log.info(
      'registry.health-check.starting',
      'Starting health check for registered services',
      {
        activeCheckIntervalMs: this.checkIntervalMs,
        inactiveCheckIntervalMs: this.inactiveCheckIntervalMs,
      },
    );

    this.stopHealthCheck();

    // Check active services every 1 second
    this.activeCheckInterval = setInterval(() => {
      const activeServices = this.getServicesByStatus(true);

      this.checkHealth(activeServices).catch((error) => {
        log.error(
          'registry.health-check-error',
          `Error checking active services: ${error instanceof Error ? error.message : String(error)}`,
        );
      });
    }, this.checkIntervalMs);

    // Check inactive services every 30 seconds
    this.inactiveCheckInterval = setInterval(() => {
      const inactiveServices = this.getServicesByStatus(false);

      this.checkHealth(inactiveServices).catch((error) => {
        log.error(
          'registry.health-check-error',
          `Error checking inactive services: ${error instanceof Error ? error.message : String(error)}`,
        );
      });
    }, this.inactiveCheckIntervalMs);
  }

  /**
   * Stop health checks
   * @private
   */
  private stopHealthCheck(): void {
    if (this.activeCheckInterval) {
      clearInterval(this.activeCheckInterval);
      this.activeCheckInterval = null;
    }

    if (this.inactiveCheckInterval) {
      clearInterval(this.inactiveCheckInterval);
      this.inactiveCheckInterval = null;
    }

    if (this.checkInterval) {
      clearInterval(this.checkInterval);
      this.checkInterval = null;
    }
  }

  /**
   * Helper method to get services by their active status
   * @private
   * @param isActive - Whether to get active or inactive services
   * @returns Array of services with the specified active status
   */
  private getServicesByStatus(isActive: boolean): Array<any> {
    return Array.from(this.serviceRegistry.values()).filter(
      (service) =>
        service.serviceKey !== this.serviceKey &&
        this.serviceActive.get(service.serviceKey) === isActive,
    );
  }

  /**
   * Check health of a list of services
   * @private
   * @param services - Array of services to check
   */
  private async checkHealth(services: Array<any>): Promise<void> {
    if (services.length === 0) {
      return;
    }

    for (const service of services) {
      const wasActive = this.serviceActive.get(service.serviceKey);
      await this.checkServiceHealth(service, wasActive === true);
    }
  }

  /**
   * Check health of a single service
   * @private
   * @param service - The service to check
   * @param wasActive - Whether the service was previously active
   */
  private async checkServiceHealth(
    service: any,
    wasActive: boolean,
  ): Promise<void> {
    const { serviceKey, host, port } = service;

    try {
      const client = this.serviceClients.get(serviceKey);

      if (!client) {
        throw new Error(`Client not found for ${serviceKey}`);
      }

      const isActive = await this.pingService(client);

      if (isActive) {
        // Service is active
        this.serviceFailedAttempts.set(serviceKey, 0);
        this.serviceRegistry.set(serviceKey, service);

        // If service was inactive before, log the change and update status
        if (!wasActive) {
          log.info(
            'registry.service-active',
            `Service ${serviceKey} at ${host}:${port} is active again`,
          );

          this.serviceActive.set(serviceKey, true);
        }
      } else if (wasActive) {
        // Only track failures for active services
        const currentAttempts = this.serviceFailedAttempts.get(serviceKey) || 0;

        const newAttempts = currentAttempts + 1;
        this.serviceFailedAttempts.set(serviceKey, newAttempts);

        // If max attempts reached, set as inactive
        if (newAttempts === this.maxHealthCheckAttempts) {
          log.info(
            'registry.service-inactive',
            `Service ${serviceKey} at ${host}:${port} is inactive after ${newAttempts} attempts`,
          );

          this.serviceActive.set(serviceKey, false);
          this.serviceFailedAttempts.set(serviceKey, 0);
        }
      }
    } catch (error) {
      // Handle errors
      if (wasActive) {
        // Only track failures for active services
        const currentAttempts = this.serviceFailedAttempts.get(serviceKey) || 0;

        const newAttempts = currentAttempts + 1;
        this.serviceFailedAttempts.set(serviceKey, newAttempts);

        // If max attempts reached, set as inactive
        if (newAttempts === this.maxHealthCheckAttempts) {
          log.info(
            'registry.service-inactive',
            `Service ${serviceKey} at ${host}:${port} is inactive after ${newAttempts} attempts: ${error instanceof Error ? error.message : String(error)}`,
          );

          this.serviceActive.set(serviceKey, false);
          this.serviceFailedAttempts.set(serviceKey, 0);
        }
      }
    }
  }

  /**
   * Ping a service to check if it's active
   * @private
   * @param client - The client to use for the ping
   * @returns A promise that resolves to true if the service is active, false otherwise
   */
  private async pingService(client: DefaultService): Promise<boolean> {
    try {
      const response = await client.Ping({});

      return response.pong;
    } catch (error) {
      return false;
    }
  }

  /**
   * Get the next available port
   * @private
   * @returns The next available port
   * @throws {Error} When all ports are allocated
   * @throws {Error} When unable to find an available port
   */
  private async getNextPort(): Promise<number> {
    // Generate a set of all used ports in the range from our registry
    const usedPorts = new Set<number>();

    for (const service of Array.from(this.serviceRegistry.values())) {
      if (service.port >= this.minPort && service.port <= this.maxPort) {
        usedPorts.add(service.port);
      }
    }

    // Check if we've allocated all ports in our registry
    if (usedPorts.size >= this.maxPort - this.minPort + 1) {
      throw new Error('All ports are allocated in registry');
    }

    // Find an available port starting from nextPort
    const portCount = this.maxPort - this.minPort + 1;
    let port = this.nextPort;

    // Track how many ports we've tried
    let portsChecked = 0;

    while (portsChecked < portCount) {
      // Skip if already registered in our registry
      if (!usedPorts.has(port) && !this.externalUsedPorts.has(port)) {
        try {
          // Check if the port is in use by directly pinging it
          const isPortInUse = await this.pingPort(this.host, port);

          if (!isPortInUse) {
            // Port is available! Not in registry and not responding to TCP
            // Update nextPort for next allocation
            this.nextPort = port + 1 > this.maxPort ? this.minPort : port + 1;

            log.info(
              'registry.port-allocated',
              `Allocated available port ${port}`,
            );

            return port;
          }

          // Port is in use by some external process
          log.debug(
            'registry.port-in-use',
            `Port ${port} is in use by an external process, skipping`,
          );

          // Add to external used ports set for future reference
          this.externalUsedPorts.add(port);
        } catch (error) {
          log.error(
            'registry.port-check-error',
            `Error checking port ${port}: ${error instanceof Error ? error.message : String(error)}`,
          );
        }
      }

      // Try next port with wraparound
      port = port + 1 > this.maxPort ? this.minPort : port + 1;
      portsChecked += 1;
    }

    // If we get here, we tried all ports and none were available
    throw new Error(
      'Unable to find available port - all ports in range are in use',
    );
  }

  /**
   * Handle REGISTER_CLIENT request
   * @param payload - Registration request data
   * @returns A promise resolving to the registration response
   */
  public async RegisterClient(
    payload: RegisterRequest,
  ): Promise<RegisterResponse> {
    const { serviceKey, host, messageTypes } = payload;

    log.info(
      'registry.register-client',
      `Registering client ${serviceKey} at ${host}`,
      { serviceKey, host },
    );

    if (!serviceKey) {
      return Promise.resolve({
        success: false,
        error: 'Invalid registration data: missing serviceKey',
      });
    }

    try {
      let port = 0;

      log.info(
        'registry.register-client-check-active-result',
        `Service ${serviceKey} is active: ${this.serviceActive.get(serviceKey)}`,
        {
          serviceKey,
        },
      );

      log.info(
        'registry.register-client-check-port-result',
        `Service ${serviceKey} is in registry: ${this.serviceRegistry.has(serviceKey)}`,
        {
          serviceKey,
        },
      );

      if (
        this.serviceRegistry.has(serviceKey) &&
        !this.serviceActive.get(serviceKey)
      ) {
        // Try to recover the old port
        port =
          this.serviceRegistry.get(serviceKey)?.port ||
          (await this.getNextPort());

        log.info(
          'registry.service-recovered',
          `Recovering service ${serviceKey} at ${this.host}:${port}`,
        );
      } else {
        port = await this.getNextPort();
      }

      // Register the service in the shared serviceRegistry
      this.serviceRegistry.set(serviceKey, {
        serviceKey,
        host: this.host,
        port,
        messageTypes,
      });

      this.serviceClients.set(
        serviceKey,
        new DefaultConnector(this, serviceKey),
      );

      // Add to tracking maps
      this.serviceFailedAttempts.set(serviceKey, 0);
      this.serviceActive.set(serviceKey, true);

      log.info(
        'registry.service-registered',
        `Registered service ${serviceKey} at ${this.host} with port ${port}`,
      );

      return Promise.resolve({
        success: true,
        port,
        message: `Registered service ${serviceKey} at ${host}:${port}`,
      });
    } catch (error) {
      log.error(
        'registry.registration-error',
        `Error processing registration: ${error instanceof Error ? error.message : String(error)}`,
      );

      return Promise.resolve({
        success: false,
        error: `Registration failed: ${error instanceof Error ? error.message : String(error)}`,
      });
    }
  }

  /**
   * Handle UNREGISTER_CLIENT request
   * @param payload - Unregistration request data
   * @returns A promise resolving to the unregistration response
   */
  public UnregisterClient(
    payload: UnregisterRequest,
  ): Promise<UnregisterResponse> {
    const { serviceKey } = payload;

    log.info(
      'registry.unregister-client-request',
      `Unregistration request from ${serviceKey}`,
    );

    if (!serviceKey) {
      return Promise.resolve({
        success: false,
        error: 'Invalid unregistration data: missing serviceKey',
      });
    }

    try {
      // Find the service in our registry
      let found = false;

      for (const [_, service] of Array.from(this.serviceRegistry.entries())) {
        if (service.serviceKey === serviceKey) {
          this.serviceRegistry.delete(serviceKey);
          this.serviceClients.delete(serviceKey);
          this.serviceActive.delete(serviceKey);

          found = true;
          break;
        }
      }

      if (!found) {
        return Promise.resolve({
          success: false,
          error: `Service ${serviceKey} not found`,
        });
      }

      log.info(
        'registry.service-unregistered',
        `Unregistered service ${serviceKey}`,
      );

      return Promise.resolve({
        success: true,
        message: `Unregistered service ${serviceKey}`,
      });
    } catch (error) {
      log.error(
        'registry.unregistration-error',
        `Error processing unregistration: ${error instanceof Error ? error.message : String(error)}`,
      );

      return Promise.resolve({
        success: false,
        error: `Unregistration failed: ${error instanceof Error ? error.message : String(error)}`,
      });
    }
  }

  /**
   * Handle LIVE_SERVICES request
   * @returns A promise resolving to the list of all services
   */
  public GetLiveServices(
    _: LiveServicesRequest,
  ): Promise<LiveServicesResponse> {
    return Promise.resolve({
      services: Array.from(this.serviceRegistry.values()).filter((service) =>
        this.serviceActive.get(service.serviceKey),
      ),
    });
  }

  /**
   * Handle LIVE_PEERS request
   * @returns A promise resolving to the list of unique peer hosts
   */
  public GetLivePeers(_: LivePeersRequest): Promise<LivePeersResponse> {
    // TODO: track peers as map of host -> services, keep updated with live services

    // Get unique set of hosts from services
    const peers = new Set<string>();

    for (const service of Array.from(this.serviceRegistry.values())) {
      peers.add(service.host);
    }

    return Promise.resolve({
      peers: Array.from(peers),
    });
  }

  /**
   * Perform initial health check with optimized TCP scanning
   * @private
   */
  private async performStartupHealthCheck(): Promise<void> {
    log.info(
      'registry.startup-health-check',
      'Performing startup health check to recover active services',
    );

    // First, quickly identify open ports using TCP ping, then check those ports with the ZeroMQ protocol
    const openPorts: number[] = [];
    const pingPromises: Promise<void>[] = [];
    const batchSize = 30; // Scan more ports simultaneously since TCP ping is lightweight

    log.info('registry.startup-tcp-scan', 'Scanning for open ports');

    // Create batches of port scans
    for (
      let batchStart = this.minPort;
      batchStart <= this.maxPort;
      batchStart += batchSize
    ) {
      const batchPromises: Promise<void>[] = [];

      for (
        let port = batchStart;
        port < batchStart + batchSize && port <= this.maxPort;
        port += 1
      ) {
        if (port === this.port) continue; // Skip registry's own port

        batchPromises.push(
          this.pingPort(this.host, port).then((isOpen) => {
            if (isOpen) {
              openPorts.push(port);
              this.externalUsedPorts.add(port); // Add to external used ports so we don't assign them
            }
          }),
        );
      }

      // Process each batch
      pingPromises.push(
        Promise.all(batchPromises).then(() => {
          log.debug(
            'registry.startup-tcp-scan-progress',
            `Scanned ports ${batchStart} to ${Math.min(batchStart + batchSize - 1, this.maxPort)}`,
          );
        }),
      );
    }

    // Wait for all TCP ping scans to complete
    await Promise.all(pingPromises);

    log.info(
      'registry.startup-tcp-scan-complete',
      `TCP scan complete. Found ${openPorts.length} potentially active ports.`,
    );

    // Now check only the open ports with full ZeroMQ protocol
    if (openPorts.length > 0) {
      const serviceCheckPromises: Promise<void>[] = [];

      for (const port of openPorts) {
        serviceCheckPromises.push(this.checkPortForService(port));
      }

      await Promise.allSettled(serviceCheckPromises);
    }

    const recoveredCount = Array.from(this.serviceRegistry.values()).filter(
      (service) => service.serviceKey !== this.serviceKey,
    ).length;

    log.info(
      'registry.startup-health-check-complete',
      `Startup health check complete. Recovered ${recoveredCount} services.`,
    );
  }

  /**
   * Modified checkPortForService to be used after TCP ping
   * Uses the same approach as before but assumes the port is likely to respond
   */
  private async checkPortForService(port: number): Promise<void> {
    try {
      const probeServiceKey = `registry-probe-${port}`;

      // Temporarily add the probe service to the registry
      this.serviceRegistry.set(probeServiceKey, {
        serviceKey: probeServiceKey,
        host: this.host,
        port,
        messageTypes: [],
      });

      try {
        // Create a connector that will use the temporary registry entry
        const tempClient = new DefaultConnector(this, probeServiceKey);

        // Try to get service info - we can use a longer timeout since we're only checking
        // ports that responded to TCP ping
        const infoResponse = await tempClient.GetServiceInfo({});

        if (infoResponse && infoResponse.serviceInfo) {
          const { serviceKey, host, messageTypes } = infoResponse.serviceInfo;

          // Skip if already registered (and it's not our probe)
          if (
            this.serviceRegistry.has(serviceKey) &&
            !serviceKey.startsWith('registry-probe-')
          ) {
            // Clean up the temporary registry entry
            this.serviceRegistry.delete(probeServiceKey);

            return;
          }

          log.info(
            'registry.service-recovered',
            `Recovered service ${serviceKey} at ${host}:${port} during startup`,
          );

          // Register the rediscovered service
          this.serviceRegistry.set(serviceKey, {
            serviceKey,
            host,
            port,
            messageTypes,
          });

          // Create a client for future health checks
          this.serviceClients.set(
            serviceKey,
            new DefaultConnector(this, serviceKey),
          );

          // Mark as active
          this.serviceFailedAttempts.set(serviceKey, 0);
          this.serviceActive.set(serviceKey, true);

          // Update nextPort if necessary to avoid collisions
          if (port >= this.nextPort) {
            this.nextPort = port + 1 > this.maxPort ? this.minPort : port + 1;
          }
        }

        // Clean up the temporary registry entry
        this.serviceRegistry.delete(probeServiceKey);
      } catch (error) {
        // Clean up the temporary registry entry on error
        this.serviceRegistry.delete(probeServiceKey);

        // No service or unresponsive service at this port, even though TCP responded
        log.debug(
          'registry.service-probe-failed',
          `Port ${port} is open but not running a compatible service`,
        );
      }
    } catch (error) {
      // Handle unexpected errors
      log.debug(
        'registry.port-check-error',
        `Error checking port ${port}: ${error instanceof Error ? error.message : String(error)}`,
      );
    }
  }

  /**
   * Quick TCP connection test to check if a port is open
   * @param host - Host to check
   * @param port - Port to check
   * @param timeout - Connection timeout in milliseconds
   * @returns Promise resolving to true if port is open, false otherwise
   */
  private async pingPort(
    host: string,
    port: number,
    timeout: number = 500,
  ): Promise<boolean> {
    return new Promise<boolean>((resolve) => {
      // Create a socket
      const socket = new net.Socket();
      let resolved = false;

      // Set timeout
      socket.setTimeout(timeout);

      // Handle connection success
      socket.on('connect', () => {
        resolved = true;
        socket.destroy();
        resolve(true);
      });

      // Handle errors
      socket.on('error', () => {
        if (!resolved) {
          resolved = true;
          socket.destroy();
          resolve(false);
        }
      });

      // Handle timeout
      socket.on('timeout', () => {
        if (!resolved) {
          resolved = true;
          socket.destroy();
          resolve(false);
        }
      });

      // Attempt connection
      socket.connect(port, host);
    });
  }
}
