1
votes

I am currently sending closures/functions across threads.

This works perfectly fine for sync functions.

I am specifically passing pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>;

Example function being sent

pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
    let deserialized = serde_json::from_value(req.data)?;
    let league = db::update_league(&conn, deserialized)?;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

However Now I would like to switch to sending async functions,

i.e.

pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
    let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
    if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
        sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
    }
    publish_competitions(ws_conns, &competitions_out).await;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

It's the exact same function signature, it's just async.

Where I box the functions so they can be sent around, I get this error

Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type

full:

288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    |                                                                                                                ------------------------ the `Output` of this `async fn`'s found opaque type
    |
    = note:     expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future`
    = note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`

I have tried attaching .await to method(req, conn, ws_conns, user_ws_id).await, the call-site of the passed method.

This causes compiler errors here due to Future not being implemented for Result. Therefore

I change type from: Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync> -> Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Future<Output=Result<String, BoxError>>) + Send + Sync>

it complains about sizing of futures, so I box the Future, then another error (see unpin), so I pin the error.

Eventually leading to Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>

Error now is

Box::new(upsert_competitions) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::pin::Pin`, found opaque type

expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future

I dont understand how to go from here. I dont think I should be pinning/boxing the functions results, I want to be pin/boxing the future returned when the function is called, but I dont think I can do this,

as surely i want to be boxing/pinning the future after its created when I call the func, not earlier.

I also tried stuff like

Box::new(Pin::new(Box::new(upsert_competitions)))) based on above error,

and it gives me expected an Fn<blah>....rather than a Pin<Box<....

Source of full up-to-date code:

CLosure type-def

closure being successfully passed as a regular function

closure being unsuccessfully passed as an async func

closure being called

Edit:

Latest updates (progressed the error)

pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
        let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
        println!("{:?}", &deserialized);
        let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
        // assume anything upserted the user wants to subscribe to
        if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
            sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
        }
        // TODO ideally would return response before awaiting publishing going out
        publish_competitions(ws_conns, &competitions_out).await;
        println!("{:?}", &competitions_out);
        let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
        let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
        out
    }
    Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 |     Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by 
`hmmm` is not `Sync`

So now just need to work out how to make this future sync

note: future is not `Sync` as this value is used across an await

gives me good clue

299 |         publish_competitions(ws_conns, &competitions_out).await;
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await

occurs here, with conn maybe used later

Worked out that I have to keep use of conn outside of inner-async function, and not use across an await.

After fixing variables across await, I now arrive at

error[E0621]: explicit lifetime required in the type of `ws_conns`
   --> src/handlers.rs:305:5
    |
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    |                                                                ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 |     Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required

Tried making &'static references, but eventually I get to point where that's not ok.

I also tried using upsert_competitions<U: lock_api::RawMutex + 'static> generic types instead,

however getting the trait lock_api::mutex::RawMutex is not implemented for std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>

I need to find a U that implements .lock(), but is also a trait that Arc implements.

2

2 Answers

2
votes

The return type of an async function when converted to a Fn is wrapped in a Future, not a pinned future, as you only need to pin it to start polling. Creating the future pinned from the start would make the process of building up composed futures from multiple async functions less efficient and more complicated. So the correct type is pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> [[UNNAMED TYPE implementing Future]]<Result<String, BoxError> + Send + Sync>>; But you can't name that type [[UNNAMED TYPE implementing Future]] so you need to box it manually. The easiest way of doing this is with the boxed method from FutureExt in future.

So you need to combine changing the type to Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync> with replacing the taking a reference to the method with Box::new(|req, conn, connections, uuid| upsert_competitions(req, conn, connections, uuid).boxed())

0
votes

user1937 simple answer likely works (will test later),

however overnight I realised that approach of putting functions into a hashmap and moving around references to the function.....was a bit overkill.

This is a use of traits (In one place I dont know implementation, but I can define an interface, and the other place impl that interface)

Instead I defined an async-trait (currently requires async-trait crate) in my lib

pub trait WSHandler<T: Subscriptions>{
    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
    ) -> Result<String, BoxError>;
}

And told it's funcs to expect a generic WsHandler

async fn handle_ws_msg<T: Subscriptions, U: WSHandler<T>>(
    msg: ws::Message, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
) -> ws::Message{
    match msg.to_str(){
        // Can't get await inside `and_then`/`map` function chains to work properly
        Ok(msg_str) => match U::ws_req_resp(msg_str.to_string(), conn, ws_conns, user_ws_id).await{
            Ok(text) => ws::Message::text(text),
            Err(e) => ws_error_resp(e.to_string())
        },
        Err(_) => ws_error_resp(String::from("wtf. How does msg.to_str fail?"))
    }
}

then in my main program I was able to impl the trait

struct A{
}

#[async_trait]
impl WSHandler<subscriptions::Subscriptions> for A{

    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<subscriptions::Subscriptions>, user_ws_id: Uuid
    ) -> Result<String, BoxError>{
        let req: WSReq = serde_json::from_str(&msg)?;
        println!("{}", &req.data);
        let stringybob = String::from("upsert_competitions");
        match req.method.clone(){
            a if a == stringybob => upsert_competitions2(req, conn, ws_conns, user_ws_id).await,
            // imagine the other methods here
            uwotm8 => Err(Box::new(InvalidRequestError{description: uwotm8.to_string()}))
        }
    }
}
ws.on_upgrade(move |socket| warp_ws_server::handle_ws_conn::<subscriptions::Subscriptions, A>(socket, pool, ws_conns))

after 14 hours it finally runs. hooray :D