1
votes

I'm trying to implement a pool of 10 Redis of conections using a SyncArbiter for different actors to use. Say that we have an actor named Bob that has to use a Redis actor to accomplish it's task.

While this is achievable in the following manner:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: &str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", &msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}

From the above handle implementation in Bob, it's unclear how Bob could get the address of an Redis actor. Or send any message to any Actor running in a SyncArbiter.

The same could be achieved using a regular Arbiter and a Registry but, as far as I am aware of, Actix doesn't allow multiple same actors (e.g. we can't start 10 Redis actors using a regular Arbiter).

To formalize my questions:

  • Is there a Registry for SyncArbiter actors
  • Can I start multiple same type actors in a regular Arbiter
  • Is there a better / more canonical way to implement a connection pool

EDIT

Versions:

  • actix 0.7.9
  • actix_web 0.7.19
  • futures = "0.1.26"
  • rust 1.33.0
1

1 Answers

2
votes

I found the answer myself.

Out-of-the box there is no way for an Actor with a SyncContext to be retrieved from the registry.

Given my above example. For the actor Bob to send any kind of message to the Redis actor it needs to know the address of the Redis actor. Bob can get the address of Redis explicitly - contained in a message sent to it or read from some kind of shared state.

I wanted a system similar to Erlang's so I decided against passing the addresses of actors through messages as it seemed too laborious, error prone and in my mind it defeats the purpose of having an actor based concurrency model (since no one actor can message any other actor).

Therefore I investigated the idea of a shared state, and decided to implement my own SyncRegistry that would be an analog to the Actix standard Registry - whic does exactly what I want but not for Actors with a SyncContext.

Here is the naive solution i coded up: https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

With the following setup:

fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

The actor Bob can get the address of Redis in the following manner, from any point in the program:

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(&msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}