3
votes

I want to leverage Tokio's runtime to handle a variable amount of async futures. Since the count of futures is unknown at compile time, it seems FuturesUnordered is my best option (macros such as select! require specifying your branches at compile time; join_all might be possible but the docs recommend FuturesUnordered "in a lot of cases" when order doesn't matter).

The logic of this snippet is a recv() loop getting pushed to the bucket of futures, which should always run. When new data arrives, its parsing/processing gets pushed to the futures bucket too (instead of being processed immediately). This ensures the receiver maintains low latency in responding to new events, and data processing (potentially computationally expensive decryption) occurs concurrently with all other data processing async blocks (plus the listening receiver).

This thread explains why the futures get .boxed(), by the way.

The problem is this cryptic error:

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
  --> src/main.rs:27:8
   |
27 |     }).boxed());
   |        ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
   = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
   = note: required because it appears within the type `impl futures::Future`

It looks like pushing to an UnorderedFutures "recursively" (not really I guess, but what else would you call it?) doesn't work, but I'm not sure why. This error indicates some Sync trait requirement isn't met for the Box'd & Pin'd async blocks being tended to by the FuturesUnordered -- a requirement I guess is only imposed because &FuturesUnordered (used during futures.push(...) because that method borrows &self) needs it for its Send trait... or something?

use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut futures = FuturesUnordered::new();
    let (tx, rx) = mpsc::channel(32);
    
    tokio::spawn( foo(tx) );    // Only the receiver is relevant; its transmitter is
                                // elsewhere, occasionally sending data.
    futures.push((async {                               // <--- NOTE: futures.push()
        loop {
            match rx.recv().await {
                Some(data) => {
                    futures.push((async move {          // <--- NOTE: nested futures.push()
                        let _ = data; // TODO: replace with code that processes 'data'
                    }).boxed());
                },
                None => {}
            }
        }
    }).boxed());
    
    while let Some(_) = futures.next().await {}

    Ok(())
}
2

2 Answers

3
votes

I will leave the low-level error for another answer, but I believe a more idiomatic way to solve the high-level problem here would be to combine the use of FuturesUnordered with something like tokio::select! as follows:

use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[tokio::main]
pub async fn main() {
    let mut futures = FuturesUnordered::new();
    let (tx, mut rx) = mpsc::channel(32);
    
    //turn foo into something more concrete
    tokio::spawn(async move {
        let _ = tx.send(42i32).await;
    });

    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                futures.push(async move {
                    data.to_string()
                });
            },
            Some(result) = futures.next() => {
                println!("{}", result)
            },
            else => break,
        }
    }
}

You can read more about the select macro here: https://tokio.rs/tokio/tutorial/select

2
votes

When you box the future created by the async block with the boxed method, you are trying to coerce it to a dyn Future + Send:

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

However, the created future is not Send. Why? Because inside of it, you try to push to the FuturesUnordered, which borrows it:

pub fn push(&self, future: Fut)

This means that the async block captures a &FuturesUnordered. For a type to be Send, all it's fields must be Send, so for the generated future to be Send, &FuturesUnordered must be Send.

For a reference to be Send, the type must also be Sync:

impl<'_, T> Send for &'_ T where
    T: Sync

And for FuturesUnordered to be Sync, the stored futures must also be Sync:

impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}

However, the future returned by boxed is not necessarily Sync:

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

Which means that the async generator is not Send, so you cannot coerce it to a dyn Future + Send, and you get a confusing error message.

The solution is to add a Sync bound to the future, and Box::pin manually:

type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

let mut futures = FuturesUnordered::<BoxedFuture>::new();

futures.push(Box::pin(async {
    loop {
        match rx.recv().await {
            Some(data) => {
                futures.push(Box::pin(async move {
                    let _ = data;
                }));
            }
            None => {}
        }
    }
}));

However, you will then run into a bunch of borrowing issues. A better solution would be to use tokio::select! instead of the outer push, as explained by Michael's answer.