2
votes

I'm experiencing weird behavior with cpu pools:

#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;

use std::time::Duration;

use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;

lazy_static! {
    static ref CPU_POOL: CpuPool = {
        Builder::new()
        .pool_size(10)
        .after_start(|| {
            println!("Pool started one thread");
        })
        .before_stop(|| {
            println!("Pool stopped one thread");
        })
        .create()
    };
    }

struct Producer {}

impl Producer {
    fn search_names(&self) -> Box<Stream<Item = String, Error = String> + Send> {
        let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);

        println!("Creating producer thread...");
        let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(lazy(move || {
                println!(" -- Begin to produce names");
                for i in 0..10 {
                    match tx.send(Ok("name".to_string())).wait() {
                        Ok(t) => {
                            println!(" -- sent the name");
                            tx = t
                        }
                        Err(err) => {
                            println!(" -- Error occured sending name! {:?}", err);
                            break;
                        }
                    }
                    std::thread::sleep(Duration::from_secs(1));
                }
                future::ok::<(), ()>(())
            })
            .then(|result| {
                match result {
                    Ok(data) => println!("Producer finished with data: {:?}", data),
                    Err(err) => println!("Producer finished with error: {:?}", err),
                }
                future::ok::<(), ()>(())
            }));

        rx.then(|r| r.unwrap()).boxed()
    }
}

fn main() {
    let producer = Producer {};

    let names = CPU_POOL.spawn(producer.search_names()
        .map(|name| {
            println!("name = {:?}", name);
            name
        })
        .collect()
        .then(|result| {
            match result {
                Ok(data) => println!("Finished to read producer {:?}", data),
                Err(err) => println!("Error reading stream of producer! {:?}", err),
            }
            future::ok::<(), ()>(())
        }));

    names.wait();
}

Here is the corresponding Cargo.toml

[package]
name = "example"
version = "0.1.0"

[dependencies]
lazy_static = "^0.1.*"

tokio-core = "^0.1"
futures = "^0.1"
futures-cpupool = "^0.1"

I'm running on Rust nightly (1.16.0-nightly (df8debf6d 2017-01-25))

I would expect this program to generate the 10 Strings, output it via println and exit. However, most of the time, the program doesn't generate the Strings and exits normally, other times the Strings are correctly generated.

Here is the output of the first case:

Creating producer thread...
Pool started one thread
Finished to read producer []
Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread

And the output when Strings get generated

Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread
Creating producer thread...
 -- Begin to produce names
 -- sent the name
name = "name"
Pool started one thread
 -- sent the name
name = "name"
Producer finished with data: ()
Finished to read producer ["name", "name"]

I got the feeling that, for the first case, the producer thread doesn't get scheduled on the thread pool for whatever reason. I must be missing something but I don't know what.

1

1 Answers

1
votes

The cause of the problem is the early drop of the producer future.

On the method search_names, the CpuFuture that produces value is dropped when search_names returns. When dropped, a CpuFuture is cancelled, thus skipping production of the values. The difference in behavior certainly comes from a race between the drop of the future and it's execution.

A solution is to reference the producer future all along the application like this:

#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;

use std::time::Duration;

use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;

lazy_static! {
static ref CPU_POOL: CpuPool = {
    Builder::new()
    .pool_size(5)
    .after_start(|| {
        println!("Pool started one thread");
    })
    .before_stop(|| {
        println!("Pool stopped one thread");
    })
    .create()
};
}

struct Producer {}

impl Producer {
    fn search_names(&self) -> (CpuFuture<(), ()>, Box<Stream<Item = String, Error = String> + Send>) {
        let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);

        println!("Creating producer thread...");
        let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(
            lazy(move || {
                println!(" -- Begin to produce names");
                for i in 0..2 {
                    match tx.send(Ok("name".to_string())).wait() {
                        Ok(t) => {
                            println!(" -- sent the name");
                            tx = t
                        },
                        Err(err) => {
                            println!(" -- Error occured sending name! {:?}", err);
                            break
                        },
                    }
                    std::thread::sleep(Duration::from_secs(1));
                }
                future::ok::<(), ()>(())
            }).then(|result| {
                match result {
                    Ok(data) => println!("Producer finished with data: {:?}", data),
                    Err(err) => println!("Producer finished with error: {:?}", err),
                }
                future::ok::<(), ()>(())
            })
        );

        (
            producer_cpu,
            rx.then(|r| r.unwrap()).boxed()
        )
    }
}

fn main() {
    let producer = Producer {};

    let (future, stream) = producer.search_names();
    let names = CPU_POOL.spawn(
        stream
            .map(|name| {
                println!("name = {:?}", name);
                name
            })
            .collect()
            .then(|result| {
                match result {
                    Ok(data) => println!("Finished to read producer {:?}", data),
                    Err(err) => println!("Error reading stream of producer! {:?}", err)
                }
                future::ok::<(), ()>(())
            })
    );

    names.wait();
}