import {
  ABORT,
  CLOSE,
  ERROR,
  PULL,
  WRITE,
} from './MessagePortSignals';


export class MessagePortSink {
  public _controller;
  public _port: MessagePort;
  public _readyPromise: Promise<void>;
  public _readyResolve: () => void;
  public _readyReject: (reason) => void;
  public _readyPending: boolean;

  constructor(port: MessagePort) {
    this._port = port;
    this._resetReady();
    this._port.onmessage = event => this._onMessage(event.data);
  }

  start(controller) {
    this._controller = controller;
    // Apply initial backpressure
    return this._readyPromise;
  }

  write(chunk) {
    const message = {type: WRITE, chunk};

    // Send chunk
    this._port.postMessage(message, [chunk.buffer]);

    // Assume backpressure after every write, until sender pulls
    this._resetReady();

    // Apply backpressure
    return this._readyPromise;
  }

  close() {
    this._port.postMessage({type: CLOSE});
    this._port.close();
  }

  abort(reason) {
    this._port.postMessage({type: ABORT, reason});
    this._port.close();
  }

  _onMessage(message) {
    if (message.type === PULL) this._resolveReady();
    if (message.type === ERROR) this._onError(message.reason);
  }

  _onError(reason) {
    this._controller.error(reason);
    this._rejectReady(reason);
    this._port.close();
  }

  _resetReady() {
    this._readyPromise = new Promise((resolve, reject) => {
      this._readyResolve = resolve;
      this._readyReject = reject;
    });
    this._readyPending = true;
  }

  _resolveReady() {
    this._readyResolve();
    this._readyPending = false;
  }

  _rejectReady(reason) {
    if (!this._readyPending) this._resetReady();
    // eslint-disable-next-line lodash/prefer-noop
    this._readyPromise.catch(() => {
    });
    this._readyReject(reason);
    this._readyPending = false;
  }
}
