I'm rolling my own (simple) HLS implementation and for that, I need to download a list of MPEG Transport Stream video files:
For now, I have this, and it works:
public async Task<IObservable<(int Current, int Total, Stream Stream)>> FetchVideoSegments()
{
var playlist = await GetPlaylist();
var total = playlist.PlaylistItems.Count();
return playlist.PlaylistItems.ToObservable()
//=> Get the video file path, relative to the current playlist URI
.Select(item => item.Uri)
//=> Convert the relative URI to an absolute one
.Select(MakeRelativeAbsoluteUrl)
//=> Download the video transport file, sequentially
.Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
.Concat()
//=> Return the progress info tuple with the video file stream
.Select((stream, index) => (index, total, stream));
}
The subscriber is notified, one stream at a time, in the right order.
Often, when downloading files, a concurrency of 2-3 is often ideal. I'd like to add this to my observable pipeline, but I don't see any consice way to do it while keeping the insertion order of the original URIs in the emitted streams.
Take this:
return playlist.PlaylistItems.ToObservable()
//=> Get the video file path, relative to the current playlist URI
.Select(item => item.Uri)
//=> Convert the relative URI to an absolute one
.Select(MakeRelativeAbsoluteUrl)
//=> Download the video transport file
.Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
//=> Limit concurrent requests to a reasonable number
.Merge(FetchSegmentsMaxConcurrency)
//=> Return the progress info tuple with the video file stream
.Select((stream, index) => (index, total, stream));
Notice that .Concat()
has been replaced with .Merge(maxConcurrency)
.
This is the naïve solution and of course it doesn't work: video streams are emitted in a non-deterministic order.
What's the canonical way to achieve that? Should I maintain another "index" value that I keep through the observable pipeline?