/*! @license
* Shaka Player
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
goog.provide('shaka.msf.TracksManager');
goog.require('shaka.log');
goog.require('shaka.msf.Reader');
goog.require('shaka.msf.Utils');
goog.require('shaka.util.IReleasable');
goog.require('shaka.util.Timer');
goog.requireType('shaka.msf.ControlStream');
goog.requireType('shaka.msf.MSFTransport');
/**
* Tracks manager to handle incoming data streams
*
* @implements {shaka.util.IReleasable}
*/
shaka.msf.TracksManager = class {
/**
* @param {!WebTransport} webTransport
* @param {!shaka.msf.ControlStream} controlStream
* @param {!shaka.msf.MSFTransport} msfTransport
*/
constructor(webTransport, controlStream, msfTransport) {
/** @private {!WebTransport} */
this.webTransport_ = webTransport;
/** @private {!shaka.msf.ControlStream} */
this.controlStream_ = controlStream;
/** @private {!shaka.msf.MSFTransport} */
this.msfTransport_ = msfTransport;
/** @private {shaka.msf.TrackAliasRegistry} */
this.trackRegistry_ = new shaka.msf.TrackAliasRegistry();
/** @private {number} */
this.nextRequestId_ = 0;
/** @private {!Set<shaka.util.Timer>} */
this.timersSet_ = new Set();
/** @private {boolean} */
this.isClosing_ = false;
this.startListeningForStreams_();
}
/**
* Get the next request ID (even numbers for client requests)
*
* @return {number}
*/
getNextRequestId() {
const requestId = this.nextRequestId_;
this.nextRequestId_ += 2;
return requestId;
}
/**
* Start listening for incoming unidirectional streams
*
* @return {!Promise}
* @private
*/
async startListeningForStreams_() {
shaka.log.debug('Starting to listen for incoming unidirectional streams');
try {
const reader =
this.webTransport_.incomingUnidirectionalStreams.getReader();
while (true) {
// eslint-disable-next-line no-await-in-loop
const {value: stream, done} = await reader.read();
if (done) {
shaka.log.debug('Incoming stream reader is done');
break;
}
// Handle the stream in a separate task
this.handleIncomingStream_(stream).catch((error) => {
shaka.log.error('Error handling incoming stream:', error);
});
}
} catch (error) {
shaka.log.error('Error listening for incoming streams:', error);
}
}
/**
* Handle an incoming unidirectional stream
*
* @param {!ReadableStream} stream
* @return {!Promise}
* @private
*/
async handleIncomingStream_(stream) {
shaka.log.debug('Received new incoming unidirectional stream');
const reader = new shaka.msf.Reader(new Uint8Array([]), stream);
try {
// Read the stream type
const streamType = await reader.u62();
shaka.log.debug(`Incoming Unidirectional Stream. Type: ${streamType}`);
// Check if this is a SUBGROUP_HEADER stream
if (streamType >= shaka.msf.TracksManager.SUBGROUP_HEADER_START_BIGINT &&
streamType <= shaka.msf.TracksManager.SUBGROUP_HEADER_END_BIGINT) {
await this.handleSubgroupStream_(reader, streamType);
} else if (streamType === shaka.msf.TracksManager.FETCH_HEADER_BIGINT) {
// Handle FETCH_HEADER streams if needed
shaka.log.debug('Received FETCH_HEADER stream (not implemented yet)');
} else {
shaka.log.warning(`Unknown stream type: ${streamType}`);
}
} catch (error) {
// Suppress errors during shutdown - they are expected
if (!this.isClosing_) {
shaka.log.error('Error processing incoming stream:', error);
} else {
shaka.log.debug('Stream processing ended during shutdown');
}
} finally {
reader.close();
}
}
/**
* Handle a SUBGROUP_HEADER stream with automatic buffering and retry.
*
* @param {!shaka.msf.Reader} reader
* @param {number} streamType
* @return {!Promise}
* @private
*/
async handleSubgroupStream_(reader, streamType) {
try {
// Read the track alias
const trackAlias = await reader.u62();
// Read the group ID
const groupId = await reader.u62();
shaka.log.debug(`Track alias: ${trackAlias} Group ID: ${groupId}`);
// Determine subgroup ID based on the stream type
// According to section 9.4.2, there are 6 defined Type values for
// SUBGROUP_HEADER (0x08-0x0D)
let subgroupId = null;
const hasExtensions =
streamType === 0x09 || streamType === 0x0b || streamType === 0x0d;
if (streamType === 0x08 || streamType === 0x09) {
// Type 0x08-0x09: Subgroup ID is implicitly 0
subgroupId = 0;
shaka.log.debug(`Subgroup ID: ${subgroupId} (implicit)`);
} else if (streamType === 0x0a || streamType === 0x0b) {
// Type 0x0A-0x0B: Subgroup ID is the first Object ID
// (will be set when first object is read)
shaka.log.debug('Subgroup ID will be set to the first Object ID');
} else if (streamType === 0x0c || streamType === 0x0d) {
// Type 0x0C-0x0D: Subgroup ID is explicitly provided
subgroupId = await reader.u62();
shaka.log.debug(`Subgroup ID: ${subgroupId} (explicit)`);
}
// Read the Publisher Priority (as specified in the SUBGROUP_HEADER)
const publisherPriority = await reader.u8();
shaka.log.debug(`Publisher Priority: ${publisherPriority}`);
// Buffer for objects while waiting for track registration
/** @type {!Array<!shaka.msf.Utils.MoQObject>} */
const bufferedObjects = [];
const retryInterval = 0.1;
const maxRetries = 5;
const maxBufferedObjects = 50;
// Process objects in the stream
let isFirstObject = true;
// eslint-disable-next-line no-await-in-loop
while (!(await reader.done())) {
// Read the object ID
// eslint-disable-next-line no-await-in-loop
const objectId = await reader.u62();
shaka.log.debug(`Object ID: ${objectId}`);
// If this is the first object and subgroupId is null
// (types 0x0A-0x0B), set the subgroupId to the objectId
if (isFirstObject && subgroupId === null) {
subgroupId = objectId;
shaka.log.debug(`Subgroup ID set to first Object ID: ${subgroupId}`);
}
isFirstObject = false;
// Handle extension headers if present
let extensions = null;
if (hasExtensions) {
// eslint-disable-next-line no-await-in-loop
const extensionHeadersLength = await reader.u62();
if (extensionHeadersLength > 0) {
// eslint-disable-next-line no-await-in-loop
extensions = await reader.read(extensionHeadersLength);
shaka.log.debug(
`Read ${extensionHeadersLength} bytes of extension headers`);
}
}
// Read the object payload length
// eslint-disable-next-line no-await-in-loop
const payloadLength = await reader.u62();
shaka.log.debug(`Object payload length: ${payloadLength}`);
// Read object status if payload length is zero
let objectStatus = null;
if (payloadLength === 0) {
// eslint-disable-next-line no-await-in-loop
objectStatus = await reader.u62();
shaka.log.debug(`Object status: ${objectStatus}`);
}
// Read the object data
const data = payloadLength > 0 ?
// eslint-disable-next-line no-await-in-loop
await reader.read(Number(payloadLength)) : new Uint8Array([]);
if (payloadLength > 0) {
shaka.log.debug(`Read ${data.byteLength} bytes of object data`);
}
/** @type {shaka.msf.Utils.MoQObject} */
const obj = {
trackAlias,
location: {
group: groupId,
object: objectId,
subgroup: subgroupId,
},
data,
extensions,
status: objectStatus,
};
// Try to deliver immediately with retry logic
let delivered = false;
let retryCount = 0;
while (!delivered && retryCount < maxRetries) {
if (this.isClosing_) {
shaka.log.debug(`Track ${trackAlias} data discarded during shutdown
(buffered ${bufferedObjects.length} objects)`);
return;
}
const trackInfo =
this.trackRegistry_.getTrackInfoFromAlias(trackAlias);
if (trackInfo && trackInfo.callbacks.length > 0) {
// Track registered! Deliver buffered objects first
if (bufferedObjects.length > 0) {
shaka.log.info(`Track ${trackAlias} now registered, delivering
${bufferedObjects.length} buffered objects`);
for (const bufferedObj of bufferedObjects) {
for (const callback of trackInfo.callbacks) {
callback(bufferedObj);
}
}
bufferedObjects.length = 0;
}
// Deliver current object
for (const callback of trackInfo.callbacks) {
callback(obj);
}
delivered = true;
} else {
// Track not registered yet, buffer and retry
if (retryCount === 0) {
shaka.log.debug(`Track ${trackAlias} not registered yet, buffering
object (group=${groupId}, obj=${objectId})`);
bufferedObjects.push(obj);
// Enforce buffer size limit
if (bufferedObjects.length > maxBufferedObjects) {
shaka.log.warning(`Buffer overflow for track ${trackAlias},
dropping oldest object
(buffered: ${bufferedObjects.length})`);
bufferedObjects.shift();
}
}
retryCount++;
if (retryCount < maxRetries) {
shaka.log.debug(`Retry ${retryCount}/${maxRetries} for track
${trackAlias} (buffered: ${bufferedObjects.length})`);
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
new shaka.util.Timer(resolve).tickAfter(retryInterval);
});
// Check again after waiting in case close() was called during
// sleep
if (this.isClosing_) {
shaka.log.debug(`Track ${trackAlias} data discarded during
shutdown (buffered ${bufferedObjects.length} objects)`);
return;
}
} else {
if (this.isClosing_) {
// During shutdown, this is expected - just log and discard
shaka.log.debug(`Track ${trackAlias} data discarded during
shutdown (buffered ${bufferedObjects.length} objects)`);
return;
} else {
// Connection is broken, fail the stream
const errorMsg = `Track ${trackAlias} not registered after
${maxRetries * retryInterval}s. SUBSCRIBE_OK not received in
time. Connection may be broken. (buffered
${bufferedObjects.length} objects that will be discarded)`;
throw new Error(errorMsg);
}
}
}
}
}
shaka.log.debug(`Finished processing SUBGROUP_HEADER stream for track
${trackAlias}`);
} catch (error) {
// Suppress errors during shutdown - they are expected
if (!this.isClosing_) {
shaka.log.error('Error processing SUBGROUP_HEADER stream:', error);
throw error;
} else {
shaka.log.debug(
'SUBGROUP_HEADER stream processing ended during shutdown');
}
}
}
/**
* Notify all callbacks registered for a track
*
* @param {number} trackAlias
* @param {shaka.msf.Utils.MoQObject} obj
* @private
*/
notifyCallbacks_(trackAlias, obj) {
const key = trackAlias.toString();
shaka.log.debug(`Notifying callbacks for track ${trackAlias} (key: ${key}),
object ID: ${obj.location.object}`);
const trackInfo = this.trackRegistry_.getTrackInfoFromAlias(trackAlias);
if (trackInfo && trackInfo.callbacks.length > 0) {
shaka.log.debug(`Found ${trackInfo.callbacks.length} callbacks in
registry for track ${trackAlias}` );
for (let i = 0; i < trackInfo.callbacks.length; i++) {
try {
shaka.log.debug(`Executing registry callback #${i + 1} for track
${trackAlias}`);
trackInfo.callbacks[i](obj);
shaka.log.debug(`Successfully executed registry callback #${i + 1}
for track ${trackAlias}`);
} catch (error) {
shaka.log.error(`Error in registry object callback #${i + 1} for
track ${trackAlias}:`, error);
}
}
}
}
/**
* @override
*/
release() {
shaka.log.debug('Release tracks manager');
// Set closing flag to suppress errors from ongoing streams
this.isClosing_ = true;
this.trackRegistry_.clear();
for (const timer of this.timersSet_) {
timer.stop();
}
this.timersSet_.clear();
}
/**
* Subscribe to a track by namespace and track name
* Returns the track alias that can be used to unsubscribe later
*
* @param {string} namespace
* @param {string} trackName
* @param {shaka.msf.Utils.ObjectCallback} callback
* @return {!Promise<number>}
*/
async subscribeTrack(namespace, trackName, callback) {
shaka.log.debug(`Subscribing to track ${namespace}:${trackName}`);
// Generate a request ID for this subscription
const requestId = this.getNextRequestId();
/** @type {shaka.msf.Utils.Subscribe} */
const subscribeMsg = {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE,
requestId,
namespace: [namespace],
name: trackName,
// Default priority
subscriberPriority: 0,
// Use publisher's order by default
groupOrder: shaka.msf.Utils.GroupOrder.PUBLISHER,
// Forward mode by default
forward: true,
// No filtering by default
filterType: shaka.msf.Utils.FilterType.NEXT_GROUP_START,
params: [],
};
shaka.log.debug(`Sending subscribe message for ${namespace}:${trackName}
with requestId ${requestId}`);
try {
// Store msfTransport reference for use in Promise callbacks
const msfTransport = this.msfTransport_;
// Set up Promise for SUBSCRIBE_OK response
const subscribePromise = new Promise((resolve, reject) => {
// Register handler for SUBSCRIBE_OK
const unregisterOk = msfTransport.registerMessageHandler(
shaka.msf.Utils.MessageType.SUBSCRIBE_OK,
requestId,
(response) => {
const msg =
/** @type {shaka.msf.Utils.SubscribeOk} */(response);
shaka.log.debug(`Received SubscribeOk for
${namespace}:${trackName} with requestId ${requestId},
trackAlias ${msg.trackAlias}`);
resolve(msg.trackAlias);
});
// Register handler for SUBSCRIBE_ERROR
const unregisterErr = msfTransport.registerMessageHandler(
shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR,
requestId,
(response) => {
unregisterOk();
shaka.log.error(`Received SubscribeError for
${namespace}:${trackName}: ${JSON.stringify(response)}`);
reject(
new Error(`Subscribe failed: ${JSON.stringify(response)}`));
});
// Timeout after 2 seconds
const timer = new shaka.util.Timer(() => {
unregisterOk();
unregisterErr();
reject(new Error(`Subscribe timeout (2000ms) for
${namespace}:${trackName} with requestId ${requestId}`));
});
timer.tickAfter(/* seconds= */ 2);
});
// Send the subscribe message
await this.controlStream_.send(subscribeMsg);
// Wait for the SUBSCRIBE_OK
const trackAlias = await subscribePromise;
// Register the callback
// Stream handler will immediately find it and deliver any buffered
// objects
this.trackRegistry_.registerTrackWithAlias(
namespace, trackName, requestId, trackAlias);
this.trackRegistry_.registerCallback(trackAlias, callback);
shaka.log.debug(`Successfully subscribed to
${namespace}:${trackName} with trackAlias ${trackAlias}`);
return trackAlias;
} catch (error) {
shaka.log.error(`Error subscribing to track ${namespace}:${trackName}:`,
error);
// We'll keep the registration in the registry even if the subscription
// fails. This allows for retry attempts without creating new aliases
throw error;
}
}
/**
* Unsubscribe from a track by track alias
*
* @param {number} trackAlias
* @return {!Promise}
*/
async unsubscribeTrack(trackAlias) {
shaka.log.debug(`Unsubscribing from track with alias ${trackAlias}`);
// Get track info from registry if available
const trackInfo = this.trackRegistry_.getTrackInfoFromAlias(trackAlias);
if (!trackInfo) {
throw new Error(`Cannot unsubscribe: No track info found for alias
${trackAlias}`);
}
const trackDescription = `${trackInfo.namespace}:${trackInfo.trackName}`;
// According to MoQ Transport draft-14, the unsubscribe message must use the
// same request ID that was used in the original subscribe message
const requestId = trackInfo.requestId;
/** @type {shaka.msf.Utils.Message} */
const unsubscribeMsg = {
kind: shaka.msf.Utils.MessageType.UNSUBSCRIBE,
requestId,
};
shaka.log.debug(`Sending unsubscribe message for track ${trackDescription}
with original requestId ${requestId}`);
try {
// Create a Promise that will be resolved after a short delay
// Note: The MoQ spec doesn't require an acknowledgment for unsubscribe
// messages, so we'll just wait a short time to allow the message to be
// sent
const unsubscribePromise = new Promise((resolve) => {
new shaka.util.Timer(() => {
resolve();
}).tickAfter(/* seconds= */ 0.5);
});
// Send the unsubscribe message
await this.controlStream_.send(unsubscribeMsg);
// Wait for the unsubscribe to complete (or timeout)
await unsubscribePromise;
// Unregister all callbacks for this track
this.trackRegistry_.unregisterAllCallbacks(trackAlias);
shaka.log.debug(
`Successfully unsubscribed from track ${trackDescription}`);
} catch (error) {
shaka.log.error(`Error unsubscribing from track ${trackDescription}:`,
error);
throw error;
}
}
};
/**
* @private @const {number}
*/
shaka.msf.TracksManager.FETCH_HEADER_BIGINT = 0x05;
/**
* @private @const {number}
*/
shaka.msf.TracksManager.SUBGROUP_HEADER_START_BIGINT = 0x08;
/**
* @private @const {number}
*/
shaka.msf.TracksManager.SUBGROUP_HEADER_END_BIGINT = 0x0d;
/**
* Registry for track aliases that maps between namespace+trackName and
* trackAlias and stores callbacks for incoming objects.
*/
shaka.msf.TrackAliasRegistry = class {
constructor() {
/** @private {Map<string, shaka.msf.Utils.TrackInfo>} */
this.trackNameToInfo_ = new Map();
/** @private {Map<string, shaka.msf.Utils.TrackInfo>} */
this.trackAliasToInfo_ = new Map();
/** @private {number} */
this.nextTrackAlias_ = 1;
}
/**
* Generate a key for namespace+trackName
*
* @param {string} namespace
* @param {string} trackName
* @return {string}
* @private
*/
getNamespaceTrackKey_(namespace, trackName) {
return `${namespace}:${trackName}`;
}
/**
* Register a track with a specific alias (draft-14: server assigns the alias)
* @param {string} namespace
* @param {string} trackName
* @param {number} requestId
* @param {number} trackAlias
*/
registerTrackWithAlias(namespace, trackName, requestId, trackAlias) {
const key = this.getNamespaceTrackKey_(namespace, trackName);
// Check if the track is already registered
if (this.trackNameToInfo_.has(key)) {
shaka.log.warning( `Track ${namespace}:${trackName} already registered,
updating with new alias ${trackAlias}`);
}
/** @type {shaka.msf.Utils.TrackInfo} */
const info = {
namespace,
trackName,
trackAlias,
requestId,
callbacks: [],
};
// Store in both maps
this.trackNameToInfo_.set(key, info);
this.trackAliasToInfo_.set(trackAlias.toString(), info);
shaka.log.debug(`Registered track ${namespace}:${trackName} with
server-assigned alias ${trackAlias} and request ID ${requestId}`);
}
/**
* Get track info from namespace+trackName
*
* @param {string} namespace
* @param {string} trackName
* @return {shaka.msf.Utils.TrackInfo}
*/
getTrackInfoFromName(namespace, trackName) {
const key = this.getNamespaceTrackKey_(namespace, trackName);
return this.trackNameToInfo_.get(key);
}
/**
* Get track info from trackAlias
*
* @param {number} trackAlias
* @return {shaka.msf.Utils.TrackInfo}
*/
getTrackInfoFromAlias(trackAlias) {
return this.trackAliasToInfo_.get(trackAlias.toString());
}
/**
* Register a callback for a track
*
* @param {number} trackAlias
* @param {shaka.msf.Utils.ObjectCallback} callback
*/
registerCallback(trackAlias, callback) {
const info = this.trackAliasToInfo_.get(trackAlias.toString());
if (!info) {
shaka.log.warning(`Attempted to register callback for unknown track
alias ${trackAlias}`);
return;
}
info.callbacks.push(callback);
shaka.log.debug(`Registered callback for track
${info.namespace}:${info.trackName} (alias: ${trackAlias}),
total callbacks: ${info.callbacks.length}`);
}
/**
* Unregister a specific callback for a track
*
* @param {number} trackAlias
* @param {shaka.msf.Utils.ObjectCallback} callback
*/
unregisterCallback(trackAlias, callback) {
const info = this.trackAliasToInfo_.get(trackAlias.toString());
if (!info) {
shaka.log.warning(`Attempted to unregister callback for unknown track
alias ${trackAlias}`);
return;
}
const index = info.callbacks.indexOf(callback);
if (index !== -1) {
info.callbacks.splice(index, 1);
shaka.log.debug(`Unregistered callback for track
${info.namespace}:${info.trackName} (alias: ${trackAlias}),
remaining callbacks: ${info.callbacks.length}`);
}
}
/**
* Unregister all callbacks for a track
*
* @param {number} trackAlias
*/
unregisterAllCallbacks(trackAlias) {
const info = this.trackAliasToInfo_.get(trackAlias.toString());
if (!info) {
shaka.log.warning(`Attempted to unregister callback for unknown track
alias ${trackAlias}`);
return;
}
const callbackCount = info.callbacks.length;
info.callbacks = [];
shaka.log.debug(`Unregistered all ${callbackCount} callbacks for track
${info.namespace}:${info.trackName} (alias: ${trackAlias})`);
}
/**
* Get all callbacks for a track
*
* @param {number} trackAlias
* @return {!Array<shaka.msf.Utils.ObjectCallback>}
*/
getCallbacks(trackAlias) {
const info = this.trackAliasToInfo_.get(trackAlias.toString());
return info ? [...info.callbacks] : [];
}
/**
* Clear all registered tracks and callbacks
*/
clear() {
this.trackNameToInfo_.clear();
this.trackAliasToInfo_.clear();
this.nextTrackAlias_ = 1;
shaka.log.debug('Cleared all track registrations');
}
};