import { Inject, Injectable } from '@angular/core';
import { LogService } from '@comm-apps/log';
import { cad } from '../../common.module';
import * as _ from 'lodash-es';
import { filter, Observable, Subject } from 'rxjs';

/**
 * Handles connection operations to a web socket.
 * 
 * This service assumes all message payloads from the server will be text/plain and UTF-8. The content-type header from the server specifying
 * content type can be disregarded.
 * 
 * The webSocket used as the Observable for this class will complete once there are no more subscriptions (STOMP subscriptions) to the topic. This does not mean that the 
 * web socket connection to the server itself is closed, even if there are no more listeners to any topics.
 */
@Injectable({
  providedIn: 'root'
})
export class WebSocketService {
  private endpoint: string = 'ws://';
  private webSocketConnection: WebSocket;
  private shouldBeConnected: boolean = false;  //stores the state of if we should be connected or not.

  /** Keeps a list of topics that are being listened to by the frontend. <topicName, subscriptionID>  */
  private subscriptionRegistry: Map<string, number> = new Map<string, number>();

  private webSocket$: Subject<ServerMessageFrame> = new Subject<ServerMessageFrame>();
  baseUrl: string;
  wsUrl: string;

  constructor(@Inject('environment') environment: cad.Environment, private log: LogService) {
    this.baseUrl = environment.baseUrl
    this.wsUrl = environment.wsUrl
  }

  /**
   * Connect to the web socket server and immediately sends a CONNECT frame to obtain a session ID for the current host.
   * All topics that are subscribed to will exist under this session ID as long as the web socket connection is open.
   */
  connect(acceptedVersions?: Array<string>, receiptId?: string, heartbeat?: [number, number]): void {
    if (this.wsUrl) {
      this.webSocketConnection = new WebSocket(this.wsUrl);  
    } else {
      this.webSocketConnection = new WebSocket('ws://' + window.location.host + '/ws');  
    }

    const connectFrame: ClientConnectFrame = new ClientConnectFrame(this.baseUrl, acceptedVersions, receiptId, heartbeat);

    this.webSocketConnection.onclose = (ev:CloseEvent) => this.onClose(ev);
    this.webSocketConnection.onopen = () => this.onOpen(connectFrame); 

    this.shouldBeConnected = true;
  }

  private onOpen(connectFrame: ClientConnectFrame): void {
    //finish establishing the connection
    this.webSocketConnection.send(connectFrame.getEncodedMessage()); 

    //If we were previously open and died, re-establish subscriptions.
    if (this.subscriptionRegistry.size > 0) {
      let registryHold: Map<string, number> = _.clone(this.subscriptionRegistry);
      this.subscriptionRegistry.clear();
      registryHold.forEach((value: number, key: string) => {
        this.subscribeToTopic(key);
      });    
    }
  }

  private onClose(ev: CloseEvent): void {
    if (this.shouldBeConnected) {
      this.log.debug("Websocket was lost, reconnecting...");
      setTimeout(() => {
        this.connect();
      }, 5000);
    }
  }

  /**
   * Disconnects from the WebSocket after attempting to unsubscribe all currently subscribed topics.
   */
  public disconnect(): void {
    this.shouldBeConnected = false;
    this.unsubAllTopics();
    const disconnectFrame = new ClientDisconnectFrame('disconnection-000') //TODO this connection's host ID needs to be unique per client instance.
    this.webSocketConnection.send(disconnectFrame.getEncodedMessage())
    this.webSocketConnection.onmessage = (message) => {
      const response = this.getFrameFromStompMessage(message.data);
      if (!(response instanceof ServerReceiptFrame)) {
        this.log.error("Disconnection receipt not received. Server error possible.");
      } else {
        if (response.getReceiptId() != disconnectFrame.getReceiptId()) {
          this.log.error("Receipt-id of received frame does not match required receipt-id.")
        }
      }
      this.webSocketConnection.close();
      this.subscriptionRegistry.clear();
    }
  }

  /**
   * Returns a ServerStompFrame object corresponding to the server's message type. The STOMP sub-protocol
   * definition defines possible server frames as RECEIPT, MESSAGE, and ERROR.
   * 
   * @remarks Removes the null byte string representation (\x00) from the message payload so that it does not 
   * persist in the ServerStompFrame object.
   * 
   * @param stompMessageText Raw message content from websocket connection.
   * @returns A ServerStompFrame corresponding to the message type.
   */
  private getFrameFromStompMessage(stompMessageText: string): ServerMessageFrame {
    const messageHeadersAndPayload = stompMessageText.split(`\n\n`);
    const messageHeaderLines = messageHeadersAndPayload[0].split(`\n`);

    const stompHeaders = new Map<string, string>();
    stompHeaders.set('command', messageHeaderLines[0]);

    messageHeaderLines.forEach(headerAndValue => {
      stompHeaders.set(headerAndValue.split(':')[0], headerAndValue.split(':')[1]);
    });

    switch(stompHeaders.get('command')) {
      case 'RECEIPT':
        return new ServerMessageFrame(ServerFrameCommand.receipt, null, null, null, null, stompHeaders.get('receipt-id'));
      case 'MESSAGE':
        return new ServerMessageFrame(
          ServerFrameCommand.message,
          stompHeaders.get('destination'),
          stompHeaders.get('message-id'),
          parseInt(stompHeaders.get('subscription')),
          this.getObject(messageHeadersAndPayload[1].split('\x00')[0]),
          stompHeaders.get('receipt-id'));
      case 'ERROR': 
        return new ServerMessageFrame(ServerFrameCommand.error, null, null, null, stompHeaders.get('message'), stompHeaders.get('receipt-id'));
      default:
        this.log.error('Message data does not match known server STOMP frames.');
    }
  }

  private getObject(potentialJson: string): any {
    try {  //attempt to parse the JSON form a string - otherwise the payload IS a string.
      return JSON.parse(potentialJson);
    } catch(e) {}
    return potentialJson;
  }

  /**
   * Returns an Observable that allows subscribers to listen for messages from the server.
   * 
   * When subscribing to a topic, only the specific topic name is required, i.e. 'trades' or 'points.' Prefixing the topic name with '/topic/'
   * or prefixing the topic name with the '/' character is not necessary.
   * 
   * Topics ('trade,' 'point,' etc.) will be considered listened to one at a time. Multiple components listening to a topic will receive the same events and will not
   * be acknowledged by the server as two separate listeners. The application itself (the client frontend) will have one session ID and subscription ID per topic.
   * 
   * @param topicName Name of topic to subscribe to.
   * @param receiptId Unique string to check against the server RECEIPT frame for confirmation that the SUBSCRIPTION frame was processed.
   * 
   * @returns An observable that emits ServerStompFrame objects.
   */
  public subscribeToTopic(topicName: string, receiptId?: string, retryCount: number = 3): Observable<ServerMessageFrame> {

    const id = this.subscriptionRegistry.size === 0 ? 0 : this.subscriptionRegistry.size;
    const subFrame: ClientSubscriptionFrame = new ClientSubscriptionFrame(ClientFrameCommand.subscribe, id, `/topic/${topicName}`);

    if (this.webSocketConnection.readyState == this.webSocketConnection.OPEN) {
      if (!this.subscriptionRegistry.has(topicName)) {
        this.webSocketConnection.send(subFrame.getEncodedMessage());

        this.webSocketConnection.onmessage = (message) => {
          const serverStompFrame = this.getFrameFromStompMessage(message.data);

          if (serverStompFrame.getCommand() !== ServerFrameCommand.message) {
            this.log.error('Server did not consume SUBSCRIBE. Subscription will not be added to registry.');
          } else {
            this.webSocket$.next(serverStompFrame);
          }
        }
        
        this.subscriptionRegistry.set(topicName, id);
      } else {
        this.log.debug("Already have an open sendMessage to the topic " + topicName + " so just holding on to the current onmessage");
      }
    } else if (this.webSocketConnection.readyState == this.webSocketConnection.CLOSED) {
      this.log.warn("Web socket connection is closed.  Attempting to re-open...");
      this.connect();
    } else {
      var retryMessage: string = '';
      if (retryCount > 0) {
        setTimeout(() => { //retry - since we already returned the websocket$ pipe to the caller, just try to re-listen.
          this.subscribeToTopic(topicName, receiptId, retryCount - 1);
        }, 5000);
        retryMessage = ' retrying in 5 seconds...'
      }
      this.log.warn('Web socket connection not in state OPEN while attempting to send message.  Cannot open connection for topic ' + topicName + retryMessage);
    }

    this.log.debug("Returning the Observable for topic " + topicName);
    return this.webSocket$.pipe(
      filter(stompFrame => (stompFrame.getCommand() === ServerFrameCommand.message) && stompFrame.getDestination().includes(topicName))
    );
  }


  /**
   * Unsubscribes from a topic.  You probably do not need to do this.  If you unsub and someone else is subed to the same topic, you will prevent them from getting messages.
   * Subsequent subs to the same topic do not create additional Server subs, they all share the same topic sub.
   * 
   * When unsubscribing from a topic, only the specific topic name is required, i.e. 'trades' or 'points.' Prefixing the topic name with '/topic/'
   * or prefixing the topic name with the '/' character is not necessary.
   * 
   * @param topicName Name of topic being unsubscribed.
   * @param receiptId Unique string to check against the server RECEIPT frame for confirmation that the UNSUBSCRIBE frame was processed.
   */
  public unsubscribeFromTopic(topicName: string, receiptId?: string): void {
    const unsubFrame: ClientSubscriptionFrame = new ClientSubscriptionFrame(ClientFrameCommand.unsubscribe, this.subscriptionRegistry.get(topicName), `/topic/${topicName}`);

    let serverStompFrame;
    if (this.webSocketConnection.readyState == this.webSocketConnection.OPEN) {
      this.webSocketConnection.send(unsubFrame.getEncodedMessage());
      this.webSocketConnection.onmessage = (message) => {
        serverStompFrame = this.getFrameFromStompMessage(message.data)
      }
      this.subscriptionRegistry.delete(topicName);
      if (receiptId) {
        if (serverStompFrame.getReceiptId() != receiptId) {
          this.log.error('Server did not consume UNSUBSCRIBE. Subscription will remain in registry.');
        }
      }
    } else {
      this.log.error('Web socket connection not in state OPEN while attempting to send message.');
    }
  }

  /**
   * Unsubscribes from every subscription the subscriptionRegistry Map has currently stored.
   */
  private unsubAllTopics(): void {
    Array.from(this.subscriptionRegistry.keys()).forEach(sub => {
      this.unsubscribeFromTopic(sub);
    })
  }
}

/**
 * @privateRemarks ACK and NACK may not be required for now.
 */
enum ClientFrameCommand {
  subscribe = 'SUBSCRIBE',
  unsubscribe = 'UNSUBSCRIBE',
  connect = 'CONNECT',
  disconnect = 'DISCONNECT',
  ack = 'ACK',
  nack = 'NACK'
}

enum ServerFrameCommand {
  message = 'MESSAGE',
  receipt = 'RECEIPT',
  error = 'ERROR'
}

/**
 * Represents a STOMP protocol frame that is sent from the server to the client.
 * 
 * @privateRemarks It's possible the library implementation of this service will need to include the ACK and NACK frames for client/server frame consumption
 * acknowledgement of frames sent from a subscription, if that subscription is using the 'ack' header. If the subscription requires acknowledgement,
 * the server will not consider the message consumed until the ACK frame has been sent.
 */
abstract class ServerStompFrame {
  protected command: ServerFrameCommand;
  protected receiptId?: string;

  constructor(command: ServerFrameCommand, receiptId?: string) {
    this.command = command;
    this.receiptId = receiptId;
  }

  public getCommand(): ServerFrameCommand {
    return this.command;
  }

  public getReceiptId(): string {
    return this.receiptId;
  }
} 

class ServerMessageFrame {
  protected command: ServerFrameCommand;
  protected receiptId?: string;
  private destination: string;
  private messageId: string;
  private subscription: number;
  private messagePayload: any;

  constructor(command: ServerFrameCommand, destination: string, messageId: string,
    subscription: number, messagePayload: string, receiptId?: string
  ) {
    this.command = command;
    this.receiptId = receiptId;
    this.destination = destination;
    this.messageId = messageId;
    this.subscription = subscription;
    this.messagePayload = messagePayload;
  }

  public getCommand(): ServerFrameCommand {
    return this.command;
  }

  public getDestination(): string {
    return this.destination;
  }

  public getSubscription(): number {
    return this.subscription;
  }

  public getMessageId(): string {
    return this.messageId;
  }

  public getMessagePayload(): any {
    return this.messagePayload;
  }

  public getReceiptId(): string {
    return this.receiptId;
  }
}

class ServerReceiptFrame extends ServerStompFrame {
  constructor(receiptId: string) {
    super(ServerFrameCommand.receipt, receiptId);
    this.receiptId = receiptId;
  }
}

/**
 * @privateRemarks We may not need this class after all. Have to decide what to do on ERROR frames from the server.
 */
class ServerErrorFrame extends ServerStompFrame {
  private message: string;

  constructor(receiptId?: string, message?: string) {
    super(ServerFrameCommand.error, receiptId);
    this.message = message;
  }

  public getMessage(): string {
    return this.message;
  }
}

/**
 * Represents a STOMP protocol frame that can be sent by the client to a server.
 * 
 * @remarks The STOMP frames available in the protocol specification include more than SUBSCRIBE, UNSUBSCRIBE, and CONNECT, and DISCONNECT.
 * However, applications which only listen for updates on topics of interest (like objects in a backend service) do not need to use many of the other
 * commmands.
 * 
 * @privateRemarks It's possible the library implementation of this service will need to include the ACK and NACK frames for client/server frame consumption
 * acknowledgement.
 */
abstract class ClientStompFrame {
  protected command: ClientFrameCommand;
  protected encodedMessage: Uint8Array;
  protected receiptId?: string;

  constructor(command: ClientFrameCommand, receiptId?: string) {
    this.command = command;
    this.receiptId = receiptId;
  }

  public getCommand(): ClientFrameCommand {
    return this.command;
  }

  public getReceiptId(): string {
    return this.receiptId;
  }

  public getEncodedMessage(): Uint8Array {
    return this.encodedMessage;
  };

  /**
   * Constructs an encoded message to be sent as the payload of the transaction from client to server. The message here contains the entirety of the STOMP frame,
   * i.e, from the command of the frame to the ending of the frame, which is the Null Byte (\x0000).
   */
  protected abstract constructEncodedMessage(): Uint8Array;
}

/**
 * Represents a STOMP CONNECT frame.
 */
class ClientConnectFrame extends ClientStompFrame {
  private acceptedVersions?: Array<string>;
  private host: string;
  private heartbeat: [number, number];

  constructor(
    host: string,
    acceptedVersions?: Array<string>,
    receiptId?: string,
    heartbeat?: [number, number]
    ) {
    super(ClientFrameCommand.connect, receiptId);
    acceptedVersions = acceptedVersions;
    this.host = host;
    this.heartbeat = heartbeat;
    this.encodedMessage = this.constructEncodedMessage();
  }

  constructEncodedMessage(): Uint8Array {
    let message = `${this.command}\n`;
    if (this.acceptedVersions != null) {
      message += `accept-version:`
      this.acceptedVersions.forEach(version => {
        `${version},`
      })
      message += `\n`
    }
    if (this.heartbeat != null) {
      message += `heart-beat: ${this.heartbeat[0]}, ${this.heartbeat[1]}`
    }
    message += `host:${this.host}\n\n${String.fromCharCode(0)}`;

    let utf8Encode = new TextEncoder();
    return utf8Encode.encode(message);
  }
}

/**
 * Represents a STOMP DISCONNECT frame.
 */
class ClientDisconnectFrame extends ClientStompFrame {

  constructor(receiptId?: string) {
    super(ClientFrameCommand.disconnect, receiptId);
  }
  
  constructEncodedMessage(): Uint8Array {
    let message = `${this.command}\n`;
    if (this.receiptId) {
      message += `receipt-id: ${this.receiptId}`;
    }
    message += `\n\n${String.fromCharCode(0)}`;

    let utf8Encode = new TextEncoder();
    return utf8Encode.encode(message);
  }
}

/**
 * Represents a STOMP SUBSCRIBE or UNSUBSCRIBE frame.
 */
class ClientSubscriptionFrame extends ClientStompFrame{
  private id: number;
  private destination: string;

  constructor(command: ClientFrameCommand, id: number, destination: string, receiptId?: string) {
    super(command, receiptId);
    this.id = id;
    this.destination = destination;
    this.encodedMessage = this.constructEncodedMessage();
  }

  public getId(): number {
    return this.id;
  }

  public getDestination(): string {
    return this.destination;
  }

  constructEncodedMessage(): Uint8Array {
    let message = 
    `${this.command}\nid:${this.id}\ndestination:${this.destination}\n\n${String.fromCharCode(0)}`;
    let utf8Encode = new TextEncoder();
    return utf8Encode.encode(message);
  }
}
