0
votes

I'd like a function which asynchronously processes a variable amount of (Sink, Stream) tuples.

use futures::channel::mpsc;
use futures::{Sink, Stream, SinkExt, StreamExt};

async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) {
    for (mut tx, mut rx) in v {
        let _ = tx.send(0);
        let _ = rx.next().await;
    }
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel(32);
    foo(vec![(Box::new(tx), Box::new(rx))]).await;
    
    Ok(())
}

But I get this compilation error:

error[E0107]: wrong number of type arguments: expected 1, found 0
 --> src/main.rs:4:30
  |
4 | async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) {
  |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected 1 type argument

I was prompted to declare the associated type for the trait object that way by the compiler itself. I'm unsure why it does not accept it.

1

1 Answers

1
votes

The compiler wants you to specify the "type argument" of the Sink. This is not the error type, but the type of the item being sent down the sink, as in Sink<Foo>. You specify u8 as the type of the stream, and are sending the value unchanged between one and the other, so you probably want a Sink<u8>.

Once you do that, the compiler will next complain that you need to specify the Error associated type (this time for real). However if you specify std::io::Error, the call to foo() from main() won't compile because the implementation of Sink for mpsc::Sender specifies its own mpsc::SendError as the error type.

Finally, both the sink and the stream need to be pinned so they can live across await points. This is done by using Pin<Box<...>> instead of Box<...> and Box::pin(...) instead of Box::new(...).

With the above changes, a version that compiles looks like this:

use futures::channel::mpsc;
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::pin::Pin;

async fn foo(
    v: Vec<(
        Pin<Box<dyn Sink<u8, Error = mpsc::SendError>>>,
        Pin<Box<dyn Stream<Item = u8>>>,
    )>,
) {
    for (mut tx, mut rx) in v {
        let _ = tx.send(0);
        let _ = rx.next().await;
    }
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = mpsc::channel(32);
    foo(vec![(Box::pin(tx), Box::pin(rx))]).await;

    Ok(())
}