0
votes

I'm trying to create async Reader and Writer in tokio, these require Send, and must be thread safe. (does not seem to be a way to write single threaded tokio code that avoids mutexts)

The reader and writer both need to interact, e.g read data might result in a response.

I would like both Reader and Writer to have a thread safe pointer to a Session that can ensure communication between the two.

/// function on the 
impl Session {

pub fn split(&mut self, sock: TcpStream) -> (Reader, Writer) {
    let (read_half, write_half) = sock.split();

    let session = Arc::new(RwLock::new(self)); // <- expected lifetime

    ( Reader::new(session, read_half), Writer::new(Arc::clone(session), write_half) )
}
...
}

pub struct Reader {
    session: Arc<RwLock<&mut StompSession>>,
    read_half: ReadHalf<TcpStream>,
    ...

pub struct Writer {
    session: Arc<RwLock<&mut StompSession>>,
    write_half: WriteHalf<TcpStream>,
    ....

I dont really understand the difference between Arc<RwLock<&mut StompSession>> and Arc<RwLock<StompSession>> both can only be talking about pointers.

Naturally come at this by stuggling with the borrow checker and rust book only has examples for RRwLock with integers, not mutable "objects".

1
Do you know MPSC (or the crossbeam crate)? - hellow

1 Answers

2
votes

Let's start by clearing a few things:

does not seem to be a way to write single threaded tokio code that avoids mutexes

The Mutex requirement has nothing to do with single-threadedness but with mutable borrows. Whenever you spawn a future, that future is its own entity; it isn't magically part of your struct and most definitely does not know how to keep a &mut self. That is the point of the Mutex - it allows you to dynamically acquire a mutable reference to inner state - and the Arc allows you to have access to the Mutex itself in multiple places.

Their non-synchronized equivalents are Rc and Cell/RefCell, by the way, and their content (whether synchronized or unsynchronized) should be an owned type.

The Send requirement actually shows up when you use futures on top of tokio, as an Executor requires futures spawned on it to be Send (for obvious reasons - you could make a spawn_local method, but that'd cause more problems than it solves).

Now, back to your problem. I'm going to give you the shortest path to an answer to your problem. This, however, will not be completely the right way of doing things; however, since I do not know what protocol you're going to lay on top of TcpStream or what kind of requirements you have, I cannot point you in the right direction (yet). Comments are there for that reason - give me more requirements and I'll happily edit this!

Anyway. Back to the problem. Since Mutex<_> is better used with an owned type, we're going to do that right now and "fix" both your Reader and Writer:

pub struct Reader {
    session: Arc<RwLock<Session>>,
    read_half: ReadHalf<TcpStream>,
}

pub struct Writer {
    session: Arc<RwLock<StompSession>>,
    write_half: WriteHalf<TcpStream>,
}

Since we changed this, we also need to reflect it elsewhere, but to be able to do this we're going to need to consume self. It is fine, however, as we're going to have an Arc<Mutex<_>> copy in either object we return:

impl Session {

    pub fn split(self, sock: TcpStream) -> (Reader, Writer) {
        let (read_half, write_half) = sock.split();

        let session = Arc::new(RwLock::new(self));
        ( Reader::new(session.clone(), read_half), Writer::new(session, write_half) )
    }
}

And lo and behold, it compiles and each Writer/Reader pair now has its own borrowable (mutably and non-mutably) reference to our session!

The playground snippet highlights the changes made. As I said, it works now, but it's going to bite you in the ass the moment you try to do something as you're going to need something on top of both ReadHalf and WriteHalf to be able to use them properly.