Say I have a type T:
class T {
public int identifier; //Arbitrary but unique for each character (Guids in real-life)
public char character; //In real life not a char, but I chose char here for easy demo purposes
}
And I have a predefined ordered sequence of identifiers:
int[] identifierSequence = new int[]{
9, 3, 4, 4, 7
};
I now need to order an IObservable<T> which produces the following sequence of objects:
{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}
So that the resulting IObservable produces hello.
I don't want to use ToArray, as I want to receive objects as soon as they arrive and not wait until everything is observed.
More specifically, I would like to receive them like this:
Input: e h l l o
Output: he l l o
What would be the proper reactive way to do this? The best I could come up with is this:
Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;
inputObserable.SelectMany(item =>
{
buffer[item.identifier] = item;
IEnumerable<ReportTemplate> GetReadyElements()
{
while (true)
{
int nextItemIdentifier = identifierSequence[curIndex];
T nextItem;
if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
{
buffer.Remove(nextItem.identifier);
curIndex++;
yield return nextItem;
}
else
{
break;
}
}
}
return GetReadyElements();
});
EDIT:
Schlomo raised some very valid issues with my code, which is why I marked his answer as correct. I made some modifications to his to code for it to be usable:
- Generic identifier and object type
- Iteration instead of recursion to prevent potential stackoverflow on very large observables
- Convert the anonymous type to a real class for readability
- Wherever possible, lookup a value in a dictionary only once and store as variable instead of looking it up multiple times
- Fixed type
This gives me the following code:
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
return source.Scan(initialState, (oldState, item) =>
{
//Function to be called upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
//Otherwise, if nothing is available yet, just return the input state
OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
{
int index = state.Index;
ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
IList<T> output = new List<T>();
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
ImmutableList<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
{
//No values available yet
break;
}
T toOutput = nextValues[nextValues.Count - 1];
output.Add(toOutput);
buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
index++;
}
return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
}
//Before calling the recursive function, add the new item to the buffer
TId itemIdentifier = identifierFunc(item);
ImmutableList<T> valuesList;
if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = ImmutableList<T>.Empty;
}
var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));
return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Count)
{
notifications.Add(Notification.CreateOnCompleted<T>());
}
return notifications;
})
.Dematerialize();
}
class OrderByIdentifierSequenceState<T, TId>
{
//Index shows what T we're waiting on
public int Index { get; }
//Buffer holds T that have arrived that we aren't ready yet for
public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
//Output holds T that can be safely emitted.
public IEnumerable<T> Output { get; }
public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
{
this.Index = index;
this.Buffer = buffer;
this.Output = output;
}
}
However, this code still has a couple of problems:
- Constant copying of the state (mainly the
ImmutableDictionary), which can be very expensive. Possible solution: maintain a separate state per observer, instead of per item received. - When one or more of the elements in
identifierSequenceare not present in the source observable a problem appears. This currently blocks the ordered observable and it will never finish. Possible solutions: Timeout, throw exception when source observable is completed, return all available items when source observable is completed, ... - When the source observable contains more elements than
identifierSequence, we get a memory leak. Items that are in the source observable, but not inidentifierSequencecurrently get added to the dictionary, but will not be deleted before the source observable completes. This is a potential memory leak. Possible solutions: check whether the item is inidentifierSequencebefore adding it to the dictionary, bypass code and immediately output the item, ...
MY SOLUTION:
/// <summary>
/// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
/// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
/// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
/// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
/// </summary>
/// <typeparam name="T">The type that is produced by the source observable</typeparam>
/// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
/// <param name="source">The source observable</param>
/// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
/// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
/// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (identifierSequence == null)
{
throw new ArgumentNullException(nameof(identifierSequence));
}
if (identifierFunc == null)
{
throw new ArgumentNullException(nameof(identifierFunc));
}
if (identifierSequence.Count == 0)
{
return Observable.Empty<T>();
}
HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);
return Observable.Create<T>(observer =>
{
//current index of pending item in identifierSequence
int index = 0;
//buffer of items we have received but are not ready for yet
Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();
return source.Select(
item =>
{
//Function to be called upon receiving new item
//We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
//If it is not available yet, stop.
IEnumerable<T> GetAvailableOutput()
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available yet
break;
}
yield return nextValues[nextValues.Count - 1];
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
}
//Get the identifier for this item
TId itemIdentifier = identifierFunc(item);
//If this item is not in identifiersInSequence, we ignore it.
if (!identifiersInSequence.Contains(itemIdentifier))
{
return Enumerable.Empty<T>();
}
//Add the new item to the buffer
List<T> valuesList;
if (!buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = new List<T>();
buffer[itemIdentifier] = valuesList;
}
valuesList.Add(item);
//Return all available items
return GetAvailableOutput();
})
.Subscribe(output =>
{
foreach (T cur in output)
{
observer.OnNext(cur);
}
if (index == identifierSequence.Count)
{
observer.OnCompleted();
}
},(ex) =>
{
observer.OnError(ex);
}, () =>
{
//When source observable is completed, return the remaining available items
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available
index++;
continue;
}
observer.OnNext(nextValues[nextValues.Count - 1]);
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
//Mark observable as completed
observer.OnCompleted();
});
});
}