Skip to content

Add ffmpeg-wasm-demuxer sample #549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions samples/ffmpeg-wasm-demuxer/_headers
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/*
Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corp
64 changes: 64 additions & 0 deletions samples/ffmpeg-wasm-demuxer/blocking_demuxer_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// The "blocking demuxer worker" is where FFmpeg demuxing actually takes place.
// Blocking occurs via Atomics.wait() within FFmpegDemuxerBlockingHelper.
// Blocking is required because FFmpeg's AVIO read callback is synchronous,
// while the data-to-be-read is being asynchronously fetched from the network.

self.debugLog = function(msg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is the same as function debugLog(msg) {

console.debug('[blocking worker]' + msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: , msg) would allow more message types.

}

debugLog(` -- worker started`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: -- not required with above change.
Nit: Template string literal not necessary for static string.


import {FFmpegDemuxerBlockingHelper} from './ffmpeg_demuxer_blocking_helper.js';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? When did it start working? (Even recently I was forced to use importSync() in worker contexts.)

import {AUDIO_STREAM_TYPE} from '../library/pull_demuxer_base.js'

let blockingDemuxer = null;
let messagePort = null;

self.addEventListener('message', onMessage);

async function onMessage(e) {
// debugLog(`Blocking demuxer worker message: ${JSON.stringify(e.data)}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove commented code.


switch (e.data.command) {
case 'initialize':
// Use the message port to pass messages after initialization.
messagePort = e.data.messagePort;
messagePort.onmessage = onMessage;

blockingDemuxer = new FFmpegDemuxerBlockingHelper(e.data.fileUri);
await blockingDemuxer.initialize(AUDIO_STREAM_TYPE);

messagePort.postMessage({
command: 'initialize-done',
decoderConfig: blockingDemuxer.getDecoderConfig()
});
break;

case 'get-next-chunk':
let chunk = await blockingDemuxer.getNextChunk();

if (chunk == null) {
console.error('FIXME! Proper EOF handling');
return;
}

// TODO: Avoid this copy by making chunks transferable!
let chunkData = new Uint8Array(chunk.byteLength);
chunk.copyTo(chunkData);

messagePort.postMessage({
command: 'get-next-chunk-done',
chunkType: chunk.type,
chunkTimestamp: chunk.timestamp,
chunkData: chunkData
},
{
transfer: [chunkData.buffer]
});
break;

default:
console.error(`Worker bad message: ${e.data}`);
}
}
105 changes: 105 additions & 0 deletions samples/ffmpeg-wasm-demuxer/download_reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import {SharedReadBuffer} from './shared_read_buffer.js'

class DownloadSink {
#onChunk = null;
#onEOF = null;

constructor(onChunk, onEOF) {
this.#onChunk = onChunk;
this.#onEOF = onEOF;
}

write(chunk) {
this.#onChunk(chunk);
}

close() {
this.#onEOF();
}
}

export class DownloadReader {
#fileStorage = null;
#writePtr = 0;
#readPtr = 0;
#eof = false;
#pendingReadNumBytes = 0;
#sharedReadBuffer = null;
#enableLogging = false;

async initialize(fileUri, sab) {
this.#sharedReadBuffer = new SharedReadBuffer(sab);

// Fetch the file and pipe the data through.
const downloadSink = new DownloadSink(this._onChunk.bind(this), this._onEOF.bind(this));
this._log('starting fetch');
fetch(fileUri).then(response => {
// highWaterMark should be large enough for smooth streaming, but lower is
// better for memory usage.
response.body.pipeTo(new WritableStream(downloadSink, {highWaterMark: 2}));
});
}

read(numBytes) {
this._log(`read(${numBytes})`);
let bytesUnread = this.#writePtr - this.#readPtr;
let cappedForEOF = false;
if (numBytes > bytesUnread) {
if (!this.#eof) {
this._log(`waiting for more chunks`);
// Read will complete in _onChunk() once enough bytes become available.
this.#pendingReadNumBytes = numBytes;
return;
} else {
this._log(`capping read for EOF`);
cappedForEOF = true;
numBytes = bytesUnread;
}
}

this._log(`read() - fulfilling now!`);

this.#pendingReadNumBytes = 0;
this.#sharedReadBuffer.write(new Uint8Array(this.#fileStorage.buffer,
this.#readPtr, numBytes), cappedForEOF);
this.#readPtr += numBytes;
}

_onChunk(chunk) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Private methods can also just be # (although there may be cases where that is impossible because a "caller" is external; # private is very private).

this._log(`got chunk w/ ${chunk.byteLength} bytes`);

if (this.#fileStorage == null) {
// Arbitrary. Most files will be bigger than this. We'll grow it by 2x as
// needed.
const INIT_CAPACITY = 100_000;
this.#fileStorage = new Uint8Array(INIT_CAPACITY);
}

let remainingBytes = this.#fileStorage.byteLength - this.#writePtr;
if (remainingBytes < chunk.byteLength) {
let oldStorage = this.#fileStorage;
let newSize = 2 * Math.max(chunk.byteLength, oldStorage.byteLength);
this.#fileStorage = new Uint8Array(newSize);
this.#fileStorage.set(oldStorage);
}

this.#fileStorage.set(chunk, this.#writePtr);
this.#writePtr += chunk.byteLength;

if (this.#pendingReadNumBytes > 0) {
this._log('Trying to complete pending read');
this.read(this.#pendingReadNumBytes);
}
}

_onEOF() {
this._log(`got EOF, buffered ${this.#writePtr} total bytes`);
this.#eof = true;
}


_log(msg) {
if (this.#enableLogging)
debugLog('[DownloadReader]: ' + msg);
}
}
31 changes: 31 additions & 0 deletions samples/ffmpeg-wasm-demuxer/download_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// The "download worker" is where the download fetch and buffering occurs. This
// worker is connected to the "blocking demuxer worker" which will send us
// "read" requests to fulfill.

self.debugLog = function(msg) {
console.debug('[download worker]' + msg);
}

debugLog(` -- worker started`);

import {DownloadReader} from './download_reader.js';

let downloadReader = null;

self.addEventListener('message', async function(e) {
// debugLog(`Download worker message: ${JSON.stringify(e.data)}`);

switch (e.data.command) {
case 'initialize':
downloadReader = new DownloadReader();
await downloadReader.initialize(e.data.file, e.data.sab);
postMessage({command: 'initialize-done'});
break;
case 'read':
downloadReader.read(e.data.numBytes);
break;
default:
console.error(`Download worker bad message: ${e.data}`);
}

});
16 changes: 16 additions & 0 deletions samples/ffmpeg-wasm-demuxer/exported_functions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
_calloc
_malloc
_free
_avformat_version
_avformat_open_input
_av_find_best_stream
_av_packet_alloc
_av_read_frame
_av_rescale_q
_avformat_free_context
_av_packet_free
_av_log_set_level
_avformat_find_stream_info
_avformat_alloc_context
_avio_alloc_context
_av_malloc
8 changes: 8 additions & 0 deletions samples/ffmpeg-wasm-demuxer/exported_runtime_methods.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FS
cwrap
getValue
setValue
allocate
intArrayFromString
ALLOC_NORMAL
addFunction
96 changes: 96 additions & 0 deletions samples/ffmpeg-wasm-demuxer/ffmpeg_demuxer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// FFmpegDemuxer is a simple proxy to FFmpegDemuxerBlocking, which lives in a
// separate worker where actual FFmpeg API calls (and blocking) take place.
// This design allows the "media worker" to use the FFmpeg demuxer without being
// blocked.

import {PullDemuxerBase, AUDIO_STREAM_TYPE} from '../library/pull_demuxer_base.js'

let Module = null;

export class FFmpegDemuxer extends PullDemuxerBase {
#fileUri = null;
#decoderConfig = null;
#initResolver = null;
#chunkResolver = null;
#blockingWorker = null;
#messageChannel = null;

constructor(fileUri) {
super();
this.#fileUri = fileUri
}

async initialize(streamType) {
// It would be easy to support video, but this is intended as a demo, not a
// full featured library.
console.assert(streamType == AUDIO_STREAM_TYPE,
'This demuxer currently supports audio');

// Message channel is used to facilitate defining message handling within
// the scope of this class (vs the global 'message' event).
this.#messageChannel = new MessageChannel();
this.#messageChannel.port1.onmessage = this._onMessage.bind(this);

this.#blockingWorker = new Worker('./blocking_demuxer_worker.js',
{type: 'module'});

this.#blockingWorker.postMessage({
command: 'initialize',
fileUri: this.#fileUri,
messagePort: this.#messageChannel.port2
},
{ transfer: [this.#messageChannel.port2] });

// Wait to for worker to message 'initialize-done'.
let promise = new Promise((resolver) => {this.#initResolver = resolver});
await promise;
}


getDecoderConfig() {
return this.#decoderConfig;
}

async getNextChunk() {
this.#messageChannel.port1.postMessage({command: 'get-next-chunk'});

// Wait for worker to read and send the chunk.
let promise = new Promise((resolver) => {this.#chunkResolver = resolver});
let chunk = await promise;

return chunk;
}

_onMessage(e) {
// console.log(`got message ${JSON.stringify(e.data)}`);

switch (e.data.command) {
case 'initialize-done':
this.#decoderConfig = e.data.decoderConfig;
console.assert(this.#initResolver != null);
this.#initResolver();
this.#initResolver = null;
break;

case 'get-next-chunk-done':
let chunk = new EncodedAudioChunk({
type: e.data.chunkType,
timestamp: e.data.chunkTimestamp,
data: e.data.chunkData
});
console.assert(this.#chunkResolver != null);
this.#chunkResolver(chunk);
this.#chunkResolver = null;
break;

case 'get-next-chunk-done-EOF':
console.assert(this.#chunkResolver != null);
this.#chunkResolver(null);
this.#chunkResolver = null;
break;

default:
console.error(`Unexpected message: ${JSON.stringify(e.data)}`);
}
}
}
Loading