It is not possible to spawn a non-'static
future from async Rust. This is because any async function might be cancelled at any time, so there is no way to guarantee that the caller really outlives the spawned tasks.
It is true that there are various crates that allow scoped spawns of async tasks, but these crates cannot be used from async code. What they do allow is to spawn scoped async tasks from non-async code. This doesn't violate the problem above, because the non-async code that spawned them cannot be cancelled at any time, as it is not async.
Generally there are two approaches to this:
- Spawn a
'static
task by using Arc
rather than ordinary references.
- Use the concurrency primitives from the futures crate instead of spawning.
Note that this answer applies to both Tokio 0.2.x
and 0.3.x
.
Generally to spawn a static task and use Arc
, you must have ownership of the values in question. This means that since your function took the argument by reference, you cannot use this technique without cloning the data.
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
Note that if you have a mutable reference to the data, and the data is Sized
, i.e. not a slice, it is possible to temporarily take ownership of it.
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
Another option is to use the concurrency primitives from the futures crate. These have the advantage of working with non-'static
data, but the disadvantage that the tasks will not be able to run on multiple threads at the same time.
For many workflows this is perfectly fine, as async code should spend most of its time waiting for IO anyway.
One approach is to use FuturesUnordered
. This is a special collection that can store many different futures, and it has a next
function that runs all of them concurrently, and returns once the first of them finished. (The next
function is only available when StreamExt
is imported)
You can use it like this:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
Note: The FuturesUnordered
must be defined after the shared value. Otherwise you will get a borrow error that is caused by them being dropped in the wrong order.
Another approach is to use a Stream
. With streams, you can use buffer_unordered
. This is a utility that uses FuturesUnordered
internally.
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
Note that in both cases, importing StreamExt
is important as it provides various methods that are not available on streams without importing the extension trait.
async_scoped
– Ibraheem Ahmedspawn
do not need to have a'static
lifetime they only need to be bounded by a'static
lifetime. This is a common Rust lifetime misconception. – pretzelhammer