1
votes

A common pattern for Node.js apps is to split them into many "sub-apps" that share some state. Of course, all the "sub-apps" should be handled asynchronously.

Here's a simple example of such a Node app, with three "sub-apps":

  1. An interval timer => Every 10 seconds, a shared itv_counter is incremented
  2. A TCP server => For every TCP message received, a shared tcp_counter is incremented
  3. A UDP server => For every UDP message received, a shared udp_counter is incremented

Every time one of the counters is incremented, all three counters must be printed (hence why the "sub-apps" need to share state).

Here's an implementation in Node. The nice thing about Node is that you can assume that pretty much all I/O operations are handled asynchronously by default. There's no cognitive overhead for the developer.

const dgram = require('dgram');
const net = require('net');
const tcp_port = 3000;
const udp_port = 3001;
const tcp_listener = net.createServer();
const udp_listener = dgram.createSocket('udp4');

// state shared by the 3 asynchronous applications
const shared_state = {
    itv_counter: 0,
    tcp_counter: 0,
    udp_counter: 0,
};

// itv async app: increment itv_counter every 10 seconds and print shared state
setInterval(() => {
    shared_state.itv_counter += 1;
    console.log(`itv async app: ${JSON.stringify(shared_state)}`);
}, 10_000);

// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
tcp_listener.on('connection', (client) => {
    client.on('data', (_data) => {
        shared_state.tcp_counter += 1;
        console.log(`tcp async app: ${JSON.stringify(shared_state)}`);
    });
});
tcp_listener.listen(tcp_port, () => {
    console.log(`TCP listener on port ${tcp_port}`);
});

// udp async app: increment udp_counter every time a UDP message is received and print shared state
udp_listener.on('message', (_message, _client) => {
    shared_state.udp_counter += 1;
    console.log(`udp async app: ${JSON.stringify(shared_state)}`);
});
udp_listener.on('listening', () => {
    console.log(`UDP listener on port ${udp_port}`);
});
udp_listener.bind(udp_port);

Now, here's an implementation in Rust with Tokio as the asynchronous runtime.

use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::{TcpListener, UdpSocket};
use tokio::prelude::*;

// state shared by the 3 asynchronous applications
#[derive(Clone, Debug)]
struct SharedState {
    state: Arc<Mutex<State>>,
}

#[derive(Debug)]
struct State {
    itv_counter: usize,
    tcp_counter: usize,
    udp_counter: usize,
}

impl SharedState {
    fn new() -> SharedState {
        SharedState {
            state: Arc::new(Mutex::new(State {
                itv_counter: 0,
                tcp_counter: 0,
                udp_counter: 0,
            })),
        }
    }
}

#[tokio::main]
async fn main() {
    let shared_state = SharedState::new();
    // itv async app: increment itv_counter every 10 seconds and print shared state
    let itv_shared_state = shared_state.clone();
    let itv_handle = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(10));
        interval.tick().await;
        loop {
            interval.tick().await;
            let mut state = itv_shared_state.state.lock().unwrap();
            state.itv_counter += 1;
            println!("itv async app: {:?}", state);
        }
    });
    // tcp async app: increment tcp_counter every time a TCP message is received and print shared state
    let tcp_shared_state = shared_state.clone();
    let tcp_handle = tokio::spawn(async move {
        let mut tcp_listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
        println!("TCP listener on port 3000");
        while let Ok((mut tcp_stream, _)) = tcp_listener.accept().await {
            let tcp_shared_state = tcp_shared_state.clone();
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                while let Ok(byte_count) = tcp_stream.read(&mut buffer).await {
                    if byte_count == 0 {
                        break;
                    }
                    let mut state = tcp_shared_state.state.lock().unwrap();
                    state.tcp_counter += 1;
                    println!("tcp async app: {:?}", state);
                }
            });
        }
    });
    // udp async app: increment udp_counter every time a UDP message is received and print shared state
    let udp_shared_state = shared_state.clone();
    let udp_handle = tokio::spawn(async move {
        let mut udp_listener = UdpSocket::bind("127.0.0.1:3001").await.unwrap();
        println!("UDP listener on port 3001");
        let mut buffer = [0; 1024];
        while let Ok(_byte_count) = udp_listener.recv(&mut buffer).await {
            let mut state = udp_shared_state.state.lock().unwrap();
            state.udp_counter += 1;
            println!("udp async app: {:?}", state);
        }
    });
    itv_handle.await.unwrap();
    tcp_handle.await.unwrap();
    udp_handle.await.unwrap();
}

First of all, as I'm not super comfortable with Tokio and async Rust yet, there might be things that are dead wrong in this implementation, or bad practice. Please let me know if that's the case (e.g. I have no clue if the three JoinHandle .await are necessary at the very end). That said, it behaves the same as the Node implementation for my simple tests.

But I'm still not sure if it's equivalent under the hood in terms of asynchronicity. Should there be a tokio::spawn for every callback in the Node app? In that case, I should wrap tcp_stream.read() and udp_listener.recv() in another tokio::spawn to mimic the Node callbacks for TCP's on('data') and UDP's on('message'), respectively. Not sure...

What would be the tokio implementation that would be totally equivalent to the Node.js app in terms of asynchronicity? In general, what's a good rule of thumb to know when something should be wrapped in a tokio::spawn?

1

1 Answers

1
votes

I see you have three different counters for your tasks and so I think there is a meaningful way to use a token of your state struct and turn it around between tasks. So every task is responsible to update its own counter. As a suggestion I suggest to use tokio::sync::mpsc::channel and implement three mpsc value each one directed from one task to next one.

Of course if there is an update period difference between tasks there is a risk that some values update a little bit late but I think in general cases it can be ignored.