0
votes

I'm rolling my own (simple) HLS implementation and for that, I need to download a list of MPEG Transport Stream video files:

For now, I have this, and it works:

public async Task<IObservable<(int Current, int Total, Stream Stream)>> FetchVideoSegments()
{
    var playlist = await GetPlaylist();

    var total = playlist.PlaylistItems.Count();

    return playlist.PlaylistItems.ToObservable()
        //=> Get the video file path, relative to the current playlist URI
        .Select(item => item.Uri)
        //=> Convert the relative URI to an absolute one
        .Select(MakeRelativeAbsoluteUrl)
        //=> Download the video transport file, sequentially
        .Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
        .Concat()
        //=> Return the progress info tuple with the video file stream
        .Select((stream, index) => (index, total, stream));
}

The subscriber is notified, one stream at a time, in the right order.

Often, when downloading files, a concurrency of 2-3 is often ideal. I'd like to add this to my observable pipeline, but I don't see any consice way to do it while keeping the insertion order of the original URIs in the emitted streams.

Take this:

return playlist.PlaylistItems.ToObservable()
    //=> Get the video file path, relative to the current playlist URI
    .Select(item => item.Uri)
    //=> Convert the relative URI to an absolute one
    .Select(MakeRelativeAbsoluteUrl)
    //=> Download the video transport file
    .Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
    //=> Limit concurrent requests to a reasonable number
    .Merge(FetchSegmentsMaxConcurrency)
    //=> Return the progress info tuple with the video file stream
    .Select((stream, index) => (index, total, stream));

Notice that .Concat() has been replaced with .Merge(maxConcurrency).

This is the naïve solution and of course it doesn't work: video streams are emitted in a non-deterministic order.

What's the canonical way to achieve that? Should I maintain another "index" value that I keep through the observable pipeline?

1
What you're asking for is for the observable to produce values as soon as possible, so long as it is in the same order as the source? - Enigmativity
@Enigmativity Exactly! - Morgan Touverey Quilling
I had written exactly that operator at one point. I'll see if I can find it for you. - Enigmativity
Shouldn't this be using .Buffer or .Window on the file list to achieve your concurrency? - Sentinel
Didn't knew these operators, thanks! I won't take a close look right now: finally, downloading the video files like this won't be necessary anymore and I'm currently refactoring the code to download just a big blob (the whole mp4 file) with one HTTP request. Less interesting but much simpler. But I'd still be very interested to see the solution for this as I'll probably hit this use case again, so @anyone feel free to post a (tested) answer that I can accept. - Morgan Touverey Quilling

1 Answers

0
votes

Would something like this be what you're looking for? Of course, replace the char with whatever your DownloadVideoSegment returns.

var random = new Random();

Observable
    .Range(65, 26)
    .Select(char.ConvertFromUtf32)
    .Select((letter, index) =>
        Observable
            .FromAsync(() =>
                Task.Delay(random.Next(5000))
                    .ContinueWith(t => (letter, index))))
    .Merge()
    .Do(tuple => Console.WriteLine($"{tuple.letter}:{tuple.index}"))
    .Buffer(Observable.Never<Unit>())
    .Subscribe(tuples =>
        Console.WriteLine($"{string.Join(",", tuples.OrderBy(t => t.index).Select(t => t.letter))}"));