Hey guys,
I’ve been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it.
The goal of the program is this: It’s a TCP server that accepts connections. Any data/text received should be sent to the other connections.
The problem stems prom the face that I need write access to the vector of OwnedWriteHalf
s in two places, simultaneously.
So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector.
Once, when I put it in the vector of OwnedWriteHalf
s, and once when I go over the list to send the messages to the other clients.
Anybody got any ideas or pointers? Thanks!
The code:
use std::{io::Error, sync::Arc};
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener},
sync::{mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock}};
async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>)
{
let addr = (&stream).peer_addr().unwrap().to_string();
let mut reader = BufReader::new(stream);
let mut buffer : Vec = vec![];
while let Ok(n) = reader.read_until(b'\n', &mut buffer).await
{
if n == 0
{
break;
}
let message = String::from_utf8_lossy(&buffer[..]);
sender.send((addr.clone(), message.to_string())).unwrap();
buffer.clear();
}
}
async fn send_to_others(
recv : &mut UnboundedReceiver<(String, String)>,
writes : Arc>>
)
{
loop
{
let write_clone = writes.clone();
let mut write = write_clone.write().await;
if let Some(msg) = recv.recv().await
{
for stream in write.iter_mut()
{
let mut writer = BufWriter::new(stream);
let message = format!("INCOMING: {} - {}", msg.0, msg.1);
_ = writer.write_all(message.as_bytes()).await;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error>
{
let listener = TcpListener::bind("0.0.0.0:6667").await?;
let (send, mut receive) : (
UnboundedSender<(String, String)>,
UnboundedReceiver<(String, String)>
) = mpsc::unbounded_channel();
let writes : Arc>> = Arc::new(RwLock::new(vec![]));
let writes_clone = writes.clone();
tokio::spawn(async move {
send_to_others(&mut receive, writes_clone).await;
});
loop
{
println!("starting loop");
let send = send.clone();
let writes = writes.clone();
// This runs once per connection, and starts a background thread running the 'stream_handler' function
if let Ok((stream, address)) = listener.accept().await
{
let (read, write) = stream.into_split();
let mut writes = writes.write().await;
writes.push(write);
println!("New client: {:#?}", address.to_string());
tokio::spawn(async move {
stream_handler(read, send).await;
});
}
}
}
@SuddenlyBlowGreen @brokenix I wouldn’t think you’d need an RWLock at all for that.
Just make an async task that listens on an mpsc channel and sends the data to all the tcp streams. That task can own outright the collection of outgoing streams.
deleted by creator
@SuddenlyBlowGreen But you don’t need &mut to send to an mpsc channel.
That’s a big advantage of an async design !
It’s really pretty rare to need any locking.
deleted by creator
@SuddenlyBlowGreen
I’d need to see the whole unmangled example to fully understand it. Sure you could send a newly connected Tcp stream.
The general principle is that whatever you’d need a mutex or rwlock to coordinate &mut access to, instead just send all that update information as a message over a channel to some async task that exclusively owns the thing needing to be mutated.
Like writing to a database over a network instead of synchronizing access to a shared file.
deleted by creator
@SuddenlyBlowGreen My experience too! 🙂
Can you use https://play.rust-lang.org/?version=stable&mode=debug&edition=2021 ?
Just remember that when you edit the code you have to make a new share link.
deleted by creator
@SuddenlyBlowGreen How about something like this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=5aba6de8e48ff97f30197684218d4a44
I know it barely compiles, and I haven’t tested it, but just to give the idea.
deleted by creator
deleted by creator
deleted by creator