Hey guys,

I’ve been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it.

The goal of the program is this: It’s a TCP server that accepts connections. Any data/text received should be sent to the other connections.

The problem stems prom the face that I need write access to the vector of OwnedWriteHalfs in two places, simultaneously.

So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector.

Once, when I put it in the vector of OwnedWriteHalfs, and once when I go over the list to send the messages to the other clients.

Anybody got any ideas or pointers? Thanks!

The code:

use std::{io::Error, sync::Arc};

use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
            net::{tcp::{OwnedReadHalf, OwnedWriteHalf},
                  TcpListener},
            sync::{mpsc::{self, UnboundedReceiver, UnboundedSender},
                   RwLock}};

async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>)
{
    let addr = (&stream).peer_addr().unwrap().to_string();

    let mut reader = BufReader::new(stream);

    let mut buffer : Vec = vec![];

    while let Ok(n) = reader.read_until(b'\n', &mut buffer).await
    {
        if n == 0
        {
            break;
        }

        let message = String::from_utf8_lossy(&buffer[..]);

        sender.send((addr.clone(), message.to_string())).unwrap();

        buffer.clear();
    }
}

async fn send_to_others(
    recv : &mut UnboundedReceiver<(String, String)>,
    writes : Arc>>
)
{
    loop
    {
        let write_clone = writes.clone();
        let mut write = write_clone.write().await;
        if let Some(msg) = recv.recv().await
        {
            for stream in write.iter_mut()
            {
                let mut writer = BufWriter::new(stream);

                let message = format!("INCOMING: {} - {}", msg.0, msg.1);

                _ = writer.write_all(message.as_bytes()).await;
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Error>
{
    let listener = TcpListener::bind("0.0.0.0:6667").await?;

    let (send, mut receive) : (
        UnboundedSender<(String, String)>,
        UnboundedReceiver<(String, String)>
    ) = mpsc::unbounded_channel();

    let writes : Arc>> = Arc::new(RwLock::new(vec![]));

    let writes_clone = writes.clone();

    tokio::spawn(async move {
        send_to_others(&mut receive, writes_clone).await;
    });

    loop
    {
        println!("starting loop");
        let send = send.clone();
        let writes = writes.clone();

        // This runs once per connection, and starts a background thread running the 'stream_handler' function
        if let Ok((stream, address)) = listener.accept().await
        {
            let (read, write) = stream.into_split();

            let mut writes = writes.write().await;

            writes.push(write);

            println!("New client: {:#?}", address.to_string());

            tokio::spawn(async move {
                stream_handler(read, send).await;
            });
        }
    }
}

Marsh Ray
link
fedilink
1•1Y

@SuddenlyBlowGreen @brokenix I wouldn’t think you’d need an RWLock at all for that.
Just make an async task that listens on an mpsc channel and sends the data to all the tcp streams. That task can own outright the collection of outgoing streams.

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
1•
edit-2
1Y

deleted by creator

Marsh Ray
link
fedilink
1•1Y

@SuddenlyBlowGreen But you don’t need &mut to send to an mpsc channel.

That’s a big advantage of an async design !

It’s really pretty rare to need any locking.

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
1•
edit-2
1Y

deleted by creator

Marsh Ray
link
fedilink
1•1Y

@SuddenlyBlowGreen
I’d need to see the whole unmangled example to fully understand it. Sure you could send a newly connected Tcp stream.

The general principle is that whatever you’d need a mutex or rwlock to coordinate &mut access to, instead just send all that update information as a message over a channel to some async task that exclusively owns the thing needing to be mutated.

Like writing to a database over a network instead of synchronizing access to a shared file.

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
1•
edit-2
1Y

deleted by creator

Marsh Ray
link
fedilink
1•
edit-2
1Y

@SuddenlyBlowGreen My experience too! 🙂

Can you use https://play.rust-lang.org/?version=stable&mode=debug&edition=2021 ?

Just remember that when you edit the code you have to make a new share link.

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
1•
edit-2
1Y

deleted by creator

Marsh Ray
link
fedilink
1•1Y

@SuddenlyBlowGreen How about something like this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=5aba6de8e48ff97f30197684218d4a44

I know it barely compiles, and I haven’t tested it, but just to give the idea.

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
2•
edit-2
1Y

deleted by creator

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
1•
edit-2
1Y

deleted by creator

@SuddenlyBlowGreen@lemmy.world
creator
link
fedilink
2•
edit-2
1Y

deleted by creator

Rust Programming
!rust@lemmy.ml
    • 0 users online
    • 2 users / day
    • 2 users / week
    • 2 users / month
    • 74 users / 6 months
    • 1 subscriber
    • 258 Posts
    • 707 Comments
    • Modlog