Source: lib/msf/msf_transport.js

/*! @license
 * Shaka Player
 * Copyright 2025 Google LLC
 * SPDX-License-Identifier: Apache-2.0
 */


goog.provide('shaka.msf.MSFTransport');

goog.require('shaka.log');
goog.require('shaka.msf.ControlStream');
goog.require('shaka.msf.Reader');
goog.require('shaka.msf.Receiver');
goog.require('shaka.msf.Sender');
goog.require('shaka.msf.TracksManager');
goog.require('shaka.msf.Utils');
goog.require('shaka.msf.Writer');
goog.require('shaka.util.Error');
goog.require('shaka.util.IReleasable');


/**
 * MoQT (Media over QUIC Transport).
 *
 * @implements {shaka.util.IReleasable}
 * @export
 */
shaka.msf.MSFTransport = class {
  constructor() {
    /** @private {?WebTransport} */
    this.webTransport_ = null;

    /** @private {number} */
    this.nextRequestId_ = 0;

    /** @private {!Set<function(Array<string>)>} */
    this.publishNamespaceCallbacks_ = new Set();

    /**
     * @private {!Map<shaka.msf.Utils.MessageType,
     *                Map<number, shaka.msf.Utils.MessageHandler>>}
     */
    this.messageHandlers_ = new Map();

    /** @private {shaka.msf.TracksManager} */
    this.tracksManager_ = null;
  }

  /**
   * @param {string} uri
   * @param {?Uint8Array} fingerprint
   */
  async connect(uri, fingerprint) {
    if (!('WebTransport' in window)) {
      throw new shaka.util.Error(
          shaka.util.Error.Severity.CRITICAL,
          shaka.util.Error.Category.MANIFEST,
          shaka.util.Error.Code.WEBTRANSPORT_NOT_AVAILABLE);
    }

    const options = {
      allowPooling: false,
      congestionControl: 'low-latency',
    };
    if (fingerprint) {
      options.serverCertificateHashes = [
        {
          algorithm: 'sha-256',
          value: fingerprint,
        },
      ];
    }
    this.webTransport_ = new WebTransport(uri, options);
    await this.webTransport_.ready;
    shaka.log.debug('WebTransport connection established');

    /** @type {!WebTransportBidirectionalStream} */
    const stream = await this.webTransport_.createBidirectionalStream();
    shaka.log.debug('Bidirectional stream created', stream);

    const writer = new shaka.msf.Writer(stream.writable);
    const reader = new shaka.msf.Reader(new Uint8Array([]), stream.readable);

    const sender = new shaka.msf.Sender(writer);
    // Send the client setup message
    shaka.log.debug('Sending client setup message');
    const versions = [
      shaka.msf.Utils.Version.DRAFT_14,
    ];
    const params = [
      {
        // MAX_REQUEST_ID parameter type
        type: 0x02,
        // Allow server to send up to 64 request-type messages
        value: 64,
      },
    ];
    await sender.client({versions, params});

    const receiver = new shaka.msf.Receiver(reader);
    // Receive the server setup message
    shaka.log.debug('Waiting for server setup message');
    const server = await receiver.server();
    shaka.log.debug('Received server setup:', server);

    if (!versions.includes(server.version)) {
      throw new Error(`Unsupported server version: ${server.version}`);
    }

    // Create control stream for handling control messages
    const controlStream = new shaka.msf.ControlStream(reader, writer);
    shaka.log.debug('Control stream established');

    // Create tracks manager for handling data streams
    this.tracksManager_ = new shaka.msf.TracksManager(
        this.webTransport_, controlStream, this);
    shaka.log.debug(
        'Tracks manager created with control stream and client reference');

    // Create a Connection object with the client instance to access
    // request ID management
    const connection = new shaka.msf.MSFConnection(
        this.webTransport_, controlStream, this);

    // Start listening for control messages
    this.listenForControlMessages_(controlStream);

    return connection;
  }

  /** @override */
  release() {
    shaka.log.debug('Closing client connection');
    this.publishNamespaceCallbacks_.clear();

    this.tracksManager_?.release();
    this.tracksManager_ = null;
  }

  /**
   * Get the next available request ID and increment for future use.
   * According to the MoQ Transport spec, client request IDs are even numbers
   * starting at 0 and increment by 2 for each new request.
   *
   * @return {number}
   */
  getNextRequestId() {
    const requestId = this.nextRequestId_;
    this.nextRequestId_ += 2;
    shaka.log.debug(`Generated new request ID: ${requestId}`);
    return requestId;
  }

  /**
   * Listen for control messages and dispatch them to registered handlers
   *
   * @param {shaka.msf.ControlStream} controlStream
   * @return {!Promise}
   * @private
   */
  async listenForControlMessages_(controlStream) {
    shaka.log.debug('Starting to listen for control messages');
    try {
      while (true) {
        // eslint-disable-next-line no-await-in-loop
        const msg = await controlStream.receive();
        if (msg.kind === shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE) {
          const publishNamespaceMessage =
          /** @type {shaka.msf.Utils.PublishNamespace} */(msg);
          shaka.log.info(`Received PublishNamespace message with namespace:
              ${publishNamespaceMessage.namespace.join('/')}`);

          // Notify all registered PublishNamespace callbacks
          for (const callback of this.publishNamespaceCallbacks_) {
            try {
              callback(publishNamespaceMessage.namespace);
            } catch (error) {
              shaka.log.error(`Error in PublishNamespace callback: ${
                  error instanceof Error ? error.message : String(error)}`);
            }
          }
        } else if ('requestId' in msg) {
          // For messages with request IDs, check if we have a handler
          // registered.
          // Since not all messages have a requestId, we have to use
          // this notation.
          const requestId = msg['requestId'];
          const handlersForKind = this.messageHandlers_.get(msg.kind);

          if (handlersForKind && handlersForKind.has(requestId)) {
            shaka.log.debug(`Found handler for message kind ${msg.kind} with
                requestId ${requestId}`);
            try {
              // Call the handler with the message
              const handler = handlersForKind.get(requestId);
              if (handler) {
                handler(msg);
              } else {
                shaka.log.warning(`Handler for message kind ${msg.kind} with
                    requestId ${requestId} was null`);
              }
              // Remove the handler after it's been called (one-time use)
              handlersForKind.delete(requestId);
            } catch (error) {
              shaka.log.error(`Error in message handler for kind ${msg.kind}
                  with requestId ${requestId}: ${
                  error instanceof Error ? error.message : String(error)}`);
            }
          } else {
            shaka.log.debug(`No handler found for message kind ${msg.kind}
                with requestId ${requestId}`);
          }
        } else {
          shaka.log.debug(
              `Received message of kind ${msg.kind} without a request ID`);
        }
      }
    } catch (error) {
      // Check if this is a WebTransportError due to session closure
      if (error instanceof Error &&
        error.message.includes('session is closed')) {
        shaka.log.debug('Control message listener stopped: connection closed');
      } else {
        shaka.log.error('Error while listening for control messages:', error);
      }
    }
  }

  /**
   * Register a handler for a specific message kind and request ID
   *
   * @param {shaka.msf.Utils.MessageType} kind
   * @param {number} requestId
   * @param {shaka.msf.Utils.MessageHandler} handler
   * @return {function()} A function to unregister the handler
   */
  registerMessageHandler(kind, requestId, handler) {
    shaka.log.debug(`Registering handler for message kind ${kind} with
        requestId ${requestId}`);

    // Initialize the map for this message kind if it doesn't exist
    if (!this.messageHandlers_.has(kind)) {
      this.messageHandlers_.set(kind, new Map());
    }

    // Get the map for this message kind
    const handlersForKind = this.messageHandlers_.get(kind);

    // This should never be null since we just initialized it if needed
    if (!handlersForKind) {
      throw new Error(`Handler map for message kind ${kind} not found`);
    }

    // Register the handler for this request ID
    handlersForKind.set(requestId, handler);

    // Return a function to unregister the handler
    return () => {
      shaka.log.debug(`Unregistering handler for message kind ${kind} with
          requestId ${requestId}`);
      const handlersMap = this.messageHandlers_.get(kind);
      if (handlersMap) {
        handlersMap.delete(requestId);
      }
    };
  }

  /**
   * Subscribe to a track by namespace and track name
   *
   * @param {string} namespace
   * @param {string} trackName
   * @param {shaka.msf.Utils.ObjectCallback} callback
   * @return {!Promise<number>}
   */
  subscribeTrack(namespace, trackName, callback) {
    if (!this.tracksManager_) {
      throw new Error('Cannot subscribe: Tracks manager not initialized');
    }

    shaka.log.debug(`Client subscribing to track ${namespace}:${trackName}`);
    return this.tracksManager_.subscribeTrack(namespace, trackName, callback);
  }

  /**
   * Unsubscribe from a track by track alias
   *
   * @param {number} trackAlias
   * @return {!Promise}
   */
  async unsubscribeTrack(trackAlias) {
    if (!this.tracksManager_) {
      throw new Error('Cannot unsubscribe: Tracks manager not initialized');
    }

    shaka.log.debug(`Client unsubscribing from track with alias ${trackAlias}`);
    await this.tracksManager_.unsubscribeTrack(trackAlias);
  }

  /**
   * Register a callback to be notified when an PublishNamespace message is
   * received.
   * @param {function(Array<string>)} callback
   * @return {function()} A function to unregister the callback
   */
  registerPublishNamespaceCallback(callback) {
    shaka.log.debug('Registering PublishNamespace callback');
    this.publishNamespaceCallbacks_.add(callback);

    // Return a function to unregister the callback
    return () => {
      shaka.log.debug('Unregistering PublishNamespace callback');
      this.publishNamespaceCallbacks_.delete(callback);
    };
  }
};


shaka.msf.MSFConnection = class {
  /**
   * @param {!WebTransport} webTransport
   * @param {!shaka.msf.ControlStream} controlStream
   * @param {!shaka.msf.MSFTransport} moqtTransport
   */
  constructor(webTransport, controlStream, moqtTransport) {
    /** @private {!WebTransport} */
    this.webTransport_ = webTransport;
    /** @private {!shaka.msf.ControlStream} */
    this.controlStream_ = controlStream;
    /** @private {!shaka.msf.MSFTransport} */
    this.msfTransport_ = moqtTransport;
  }

  /**
   * Get the control stream for sending messages
   * @return {!shaka.msf.ControlStream}
   */
  control() {
    return this.controlStream_;
  }

  /**
   * Get the next request ID from the client
   * @return {number}
   */
  getNextRequestId() {
    return this.msfTransport_.getNextRequestId();
  }

  /**
   * @param {number=} code
   * @param {string=} reason
   * @return {!Promise}
   */
  async close(code = 0, reason = '') {
    shaka.log.debug(`Closing connection with code ${code}: ${reason}`);
    this.webTransport_.close({closeCode: code, reason});
    await this.webTransport_.closed;
  }
};