/*! @license
* Shaka Player
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
goog.provide('shaka.msf.ControlStream');
goog.require('shaka.log');
goog.require('shaka.msf.BufferControlWriter');
goog.require('shaka.msf.Utils');
goog.require('shaka.util.Mutex');
goog.requireType('shaka.msf.Reader');
goog.requireType('shaka.msf.Writer');
shaka.msf.ControlStream = class {
/**
* @param {!shaka.msf.Reader} reader
* @param {!shaka.msf.Writer} writer
*/
constructor(reader, writer) {
/** @private {!shaka.msf.ControlStreamDecoder} */
this.decoder_ = new shaka.msf.ControlStreamDecoder(reader);
/** @private {!shaka.msf.ControlStreamEncoder} */
this.encoder_ = new shaka.msf.ControlStreamEncoder(writer);
/** @private {!shaka.util.Mutex} */
this.mutex_ = new shaka.util.Mutex();
}
/**
* Will error if two messages are read at once.
*
* @return {!Promise<shaka.msf.Utils.Message>}
*/
async receive() {
shaka.log.debug('Attempting to receive a control message...');
const msg = await this.decoder_.message();
shaka.log.debug('Received control message:', msg);
return msg;
}
/**
* @param {shaka.msf.Utils.Message} msg
* @return {!Promise}
*/
async send(msg) {
await this.mutex_.acquire('ControlStream.send');
try {
shaka.log.debug('Sending control message:', msg);
await this.encoder_.message(msg);
} finally {
this.mutex_.release();
}
}
};
shaka.msf.ControlStreamDecoder = class {
/**
* @param {!shaka.msf.Reader} reader
*/
constructor(reader) {
this.reader_ = reader;
}
/**
* @return {!Promise<shaka.msf.Utils.MessageType>}
* @private
*/
async messageType_() {
shaka.log.debug('Reading message type...');
const type = await this.reader_.u53();
shaka.log.debug(`Raw message type: 0x${type.toString(16)}`);
// Read the 16-bit MSB length field
const lengthBytes = await this.reader_.read(2);
const messageLength = (lengthBytes[0] << 8) | lengthBytes[1]; // MSB format
shaka.log.debug(`Message length (16-bit MSB): ${messageLength} bytes,
actual length: ${this.reader_.getByteLength()}`);
let msgType;
switch (type) {
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_OK:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_OK;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_DONE:
msgType = shaka.msf.Utils.MessageType.PUBLISH_DONE;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_ERROR:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.UNSUBSCRIBE:
msgType = shaka.msf.Utils.MessageType.UNSUBSCRIBE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_OK:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_ERROR:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.UNPUBLISH_NAMESPACE:
msgType = shaka.msf.Utils.MessageType.UNPUBLISH_NAMESPACE;
break;
case shaka.msf.Utils.MessageTypeId.REQUESTS_BLOCKED:
msgType = shaka.msf.Utils.MessageType.REQUESTS_BLOCKED;
break;
default:
throw new Error(`Unknown message type: 0x${type.toString(16)}`);
}
shaka.log.debug(`Parsed message type: ${msgType} (0x${type.toString(16)})`);
return msgType;
}
/**
* @return {!Promise<shaka.msf.Utils.Message>}
*/
async message() {
shaka.log.debug('Parsing control message...');
const type = await this.messageType_();
/** @type {shaka.msf.Utils.Message} */
let result;
switch (type) {
case shaka.msf.Utils.MessageType.SUBSCRIBE:
result = await this.subscribe_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_OK:
result = await this.subscribeOk_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR:
result = await this.subscribeError_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_DONE:
result = await this.publishDone_();
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE:
result = await this.unsubscribe_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE:
result = await this.publishNamespace_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK:
result = await this.publishNamespaceOk_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR:
result = await this.publishNamespaceError_();
break;
case shaka.msf.Utils.MessageType.UNPUBLISH_NAMESPACE:
result = await this.unpublishNamespace_();
break;
case shaka.msf.Utils.MessageType.REQUESTS_BLOCKED:
result = await this.requestsBlocked_();
break;
default:
throw new Error(`Unsupported message type: ${type}`);
}
shaka.log.debug('Successfully parsed control message:', result);
return result;
}
/**
* @return {!Promise<shaka.msf.Utils.Subscribe>}
* @private
*/
async subscribe_() {
shaka.log.debug('Parsing Subscribe message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`RequestID: ${requestId}`);
const namespace = await this.reader_.tuple();
shaka.log.debug(`Namespace: ${namespace.join('/')}`);
const name = await this.reader_.string();
shaka.log.debug(`Name: ${name}`);
const subscriberPriority = await this.reader_.u8();
shaka.log.debug(`Subscriber priority: ${subscriberPriority}`);
const groupOrder = await this.decodeGroupOrder_();
shaka.log.debug(`Group order: ${groupOrder}`);
const forward = await this.reader_.u8Bool();
shaka.log.debug(`Forward: ${forward}`);
const filterType = /** @type {shaka.msf.Utils.FilterType} */(
await this.reader_.u8());
shaka.log.debug(`Filter type: ${filterType}`);
let startLocation;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_START ||
filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
startLocation = await this.location_();
shaka.log.debug(`Start Location: ${JSON.stringify(startLocation)}`);
}
let endGroup;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
endGroup = await this.reader_.u62();
shaka.log.debug(`End group: ${endGroup}`);
}
const params = await this.reader_.keyValuePairs();
shaka.log.debug(`Parameters: ${params.length}`);
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE,
requestId,
namespace,
name,
subscriberPriority,
groupOrder,
forward,
filterType,
startLocation,
endGroup,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.GroupOrder>}
* @private
*/
async decodeGroupOrder_() {
const orderCode = await this.reader_.u8();
shaka.log.debug(`Raw group order code: ${orderCode}`);
switch (orderCode) {
case 0:
return shaka.msf.Utils.GroupOrder.PUBLISHER;
case 1:
return shaka.msf.Utils.GroupOrder.ASCENDING;
case 2:
return shaka.msf.Utils.GroupOrder.DESCENDING;
default:
throw new Error(`Invalid GroupOrder value: ${orderCode}`);
}
}
/**
* @return {!Promise<shaka.msf.Utils.Location>}
* @private
*/
async location_() {
return {
group: await this.reader_.u62(),
object: await this.reader_.u62(),
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeOk>}
* @private
*/
async subscribeOk_() {
shaka.log.debug('Parsing SubscribeOk message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Request ID: ${requestId}`);
const trackAlias = await this.reader_.u62();
shaka.log.debug(`Track Alias: ${trackAlias}`);
const expires = await this.reader_.u62();
shaka.log.debug(`Expires: ${expires}`);
const groupOrder = await this.decodeGroupOrder_();
shaka.log.debug(`Group order: ${groupOrder}`);
const contentExists = await this.reader_.u8Bool();
shaka.log.debug(`Content exists: ${contentExists}`);
let largest;
if (contentExists) {
largest = await this.location_();
shaka.log.debug(
`Largest: group ${largest.group}, object ${largest.object}`,
);
}
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_OK,
requestId,
trackAlias,
expires,
groupOrder,
contentExists,
largest,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeError>}
* @private
*/
async subscribeError_() {
shaka.log.debug('Parsing SubscribeError message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Subscribe ID: ${requestId}`);
const code = await this.reader_.u62();
shaka.log.debug(`Code: ${code}`);
const reason = await this.reader_.string();
shaka.log.debug(`Reason: ${reason}`);
const trackAlias = await this.reader_.u62();
shaka.log.debug(`Track Alias: ${trackAlias}`);
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR,
requestId,
code,
reason,
trackAlias,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishDone>}
* @private
*/
async publishDone_() {
shaka.log.debug('Parsing PublishDone message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Subscribe ID: ${requestId}`);
const code = await this.reader_.u62();
shaka.log.debug(`Code: ${code}`);
const reason = await this.reader_.string();
shaka.log.debug(`Reason: ${reason}`);
// Read the stream count
const streamCount = await this.reader_.u53();
shaka.log.debug(`Stream count: ${streamCount}`);
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_DONE,
requestId,
code,
streamCount,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.Unsubscribe>}
* @private
*/
async unsubscribe_() {
shaka.log.debug('Parsing Unsubscribe message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Subscribe ID: ${requestId}`);
return {
kind: shaka.msf.Utils.MessageType.UNSUBSCRIBE,
requestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespace>}
* @private
*/
async publishNamespace_() {
shaka.log.debug('Parsing PublishNamespace message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Request ID: ${requestId}`);
const namespace = await this.reader_.tuple();
shaka.log.debug(`Namespace: ${namespace.join('/')}`);
const params = await this.reader_.keyValuePairs();
shaka.log.debug(`Parameters: ${params.length}`);
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE,
requestId,
namespace,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceOk>}
* @private
*/
async publishNamespaceOk_() {
shaka.log.debug('Parsing PublishNamespaceOk message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Request ID: ${requestId}`);
const namespace = await this.reader_.tuple();
shaka.log.debug(`Namespace: ${namespace.join('/')}`);
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK,
requestId,
namespace,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceError>}
* @private
*/
async publishNamespaceError_() {
shaka.log.debug('Parsing PublishNamespaceError message...');
const requestId = await this.reader_.u62();
shaka.log.debug(`Request ID: ${requestId}`);
const code = await this.reader_.u62();
shaka.log.debug(`Error code: ${code}`);
const reason = await this.reader_.string();
shaka.log.debug(`Error reason: ${reason}`);
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR,
requestId,
code,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.UnpublishNamespace>}
* @private
*/
async unpublishNamespace_() {
shaka.log.debug('Parsing UnpublishNamespace message...');
const namespace = await this.reader_.tuple();
shaka.log.debug(`Namespace: ${namespace.join('/')}`);
return {
kind: shaka.msf.Utils.MessageType.UNPUBLISH_NAMESPACE,
namespace,
};
}
/**
* @return {!Promise<shaka.msf.Utils.RequestsBlocked>}
* @private
*/
async requestsBlocked_() {
shaka.log.debug('Parsing REQUESTS_BLOCKED message...');
const maximumRequestId = await this.reader_.u62();
shaka.log.debug(`Server sent REQUESTS_BLOCKED: maximum request ID is
${maximumRequestId}`);
return {
kind: shaka.msf.Utils.MessageType.REQUESTS_BLOCKED,
maximumRequestId,
};
}
};
shaka.msf.ControlStreamEncoder = class {
/**
* @param {!shaka.msf.Writer} writer
*/
constructor(writer) {
/** @private {!shaka.msf.Writer} */
this.writer_ = writer;
}
/**
* @param {shaka.msf.Utils.Message} msg
* @return {!Promise}
*/
async message(msg) {
shaka.log.debug(`Encoding message of type: ${msg.kind}`);
// Create a BufferControlWriter to marshal the message
const writer = new shaka.msf.BufferControlWriter();
// Marshal the message based on its type
switch (msg.kind) {
case shaka.msf.Utils.MessageType.SUBSCRIBE:
writer.marshalSubscribe(
/** @type {!shaka.msf.Utils.Subscribe} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_OK:
writer.marshalSubscribeOk(
/** @type {!shaka.msf.Utils.SubscribeOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR:
writer.marshalSubscribeError(
/** @type {!shaka.msf.Utils.SubscribeError} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_DONE:
writer.marshalPublishDone(
/** @type {!shaka.msf.Utils.PublishDone} */ (msg));
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE:
writer.marshalUnsubscribe(
/** @type {!shaka.msf.Utils.Unsubscribe} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE:
writer.marshalPublishNamespace(
/** @type {!shaka.msf.Utils.PublishNamespace} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK:
writer.marshalPublishNamespaceOk(
/** @type {!shaka.msf.Utils.PublishNamespaceOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR:
writer.marshalPublishNamespaceError(
/** @type {!shaka.msf.Utils.PublishNamespaceError} */ (msg));
break;
case shaka.msf.Utils.MessageType.UNPUBLISH_NAMESPACE:
writer.marshalUnpublishNamespace(
/** @type {!shaka.msf.Utils.UnpublishNamespace} */ (msg));
break;
default:
throw new Error(`Unsupported message type for encoding: ${msg.kind}`);
}
// Get the marshaled bytes and write them to the output stream
const bytes = writer.getBytes();
shaka.log.debug(
`Marshaled ${bytes.length} bytes for message type: ${msg.kind}`);
// Write the bytes directly to the output stream
await this.writer_.write(bytes);
}
};