I started coding with rust lately and I'm loving it. I'm coding on a project where I want to "wrap" a C-API. In one case I have to define callbacks in Rust, which C can call. I let bindgen created the callbacks. Since the code need to run somewhat asynchronous, I'm using tokio for that.
What I want to achieve
I create the the main function as tokio::main. In the main function i create 2 async tasks, one listens for channels, the other triggers the message queue in the C-API. If messages are available I want to send them via a channel on the callback function, so I can receive the message on the task, where I am listening for events. Later I want to send these messages via SSE or a GraphQL subscription to several clients.
I can't change the C-Callbacks as they need to be passable to the C-API, and I have to use callbacks, otherwise I don't get the messages.
My latest approach looks simplified like that:
use lazy_static::lazy_static;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
tokio::spawn(async move { // wait for a channel to have a message
loop {
tokio::select! {
// wating for a channel to receive a message
Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
habdle2.await.unwrap();
}
// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
// C-API is not async, so use synchronous lock
match BROADCAST_CONNECT.try_lock() {
Ok(value) => match value.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.try_lock() {
Ok(value) => match value.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
The problem:
error[E0716]: temporary value dropped while borrowed
--> src/main.rs:37:29
|
35 | / tokio::select! {
36 | | Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | | Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | | }
| | -
| | |
| |_____________temporary value is freed at the end of this statement
| borrow later captured here by closure
|
= note: consider using a `let` binding to create a longer lived value
I understand the message and why this happens, but I can't think of a solution how this could work.
I have a working example that uses crossbeam channels, but i'd rather use async channels from tokio, so i don't have so many dependencies and everything is async.
Working example:
use lazy_static::lazy_static;
use crossbeam::{
channel::{bounded, Receiver, Sender},
select,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
let handle1 = tokio::spawn(async move {
loop {
select! {
recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match &BROADCAST_CONNECT.0.send(is_connected) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
};
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
}
}
Alternative
One alternative, that i have not get to work either, would be to use some sort of local function or to work with closures. But I'm not sure if, and even if, how this would even work. Maybe someone has an idea. It would be nice, if something like this would work, so i dont have to work with lazy_static (i'd rather not have global/static variables in my code)
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
#[tokio::main]
async fn main() {
let app = app::App::new();
let mut broadcast_connect = channel::<bool>(128);
let mut broadcast_connectionstate = channel::<bool>(128);
let notify_connect = {
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match broadcast_connect.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
let notify_connectionstate = {
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match broadcast_connectionstate.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API
let handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
The problem with this approach
can't capture dynamic environment in a fn item
--> src/main.rs:47:19
|
47 | match broadcast_connectionstate.0.blocking_send(connectionstate) {
| ^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: use the `|| { ... }` closure form instead
It would be nice, if someone had a solution to either of my problems. If it's a completely new approach, that would be fine either. If channels or tokio or whatever is not the way to go, that's also fine. Mainly I used tokio, because a crate I'm also using tokio, so I don't have to have many more dependencies.
Already thank you, for reading until here.