Hey guys,
I've been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it.
The goal of the program is this:
It's a TCP server that accepts connections. Any data/text received should be sent to the other connections.
The problem stems prom the face that I need write access to the vector of ```OwnedWriteHalf```s in two places, simultaneously.
So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector.
Once, when I put it in the vector of ```OwnedWriteHalf```s, and once when I go over the list to send the messages to the other clients.
Anybody got any ideas or pointers? Thanks!
The code:
```
use std::{io::Error, sync::Arc};
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener},
sync::{mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock}};
async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>)
{
let addr = (&stream).peer_addr().unwrap().to_string();
let mut reader = BufReader::new(stream);
let mut buffer : Vec = vec![];
while let Ok(n) = reader.read_until(b'\n', &mut buffer).await
{
if n == 0
{
break;
}
let message = String::from_utf8_lossy(&buffer[..]);
sender.send((addr.clone(), message.to_string())).unwrap();
buffer.clear();
}
}
async fn send_to_others(
recv : &mut UnboundedReceiver<(String, String)>,
writes : Arc>>
)
{
loop
{
let write_clone = writes.clone();
let mut write = write_clone.write().await;
if let Some(msg) = recv.recv().await
{
for stream in write.iter_mut()
{
let mut writer = BufWriter::new(stream);
let message = format!("INCOMING: {} - {}", msg.0, msg.1);
_ = writer.write_all(message.as_bytes()).await;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error>
{
let listener = TcpListener::bind("0.0.0.0:6667").await?;
let (send, mut receive) : (
UnboundedSender<(String, String)>,
UnboundedReceiver<(String, String)>
) = mpsc::unbounded_channel();
let writes : Arc>> = Arc::new(RwLock::new(vec![]));
let writes_clone = writes.clone();
tokio::spawn(async move {
send_to_others(&mut receive, writes_clone).await;
});
loop
{
println!("starting loop");
let send = send.clone();
let writes = writes.clone();
// This runs once per connection, and starts a background thread running the 'stream_handler' function
if let Ok((stream, address)) = listener.accept().await
{
let (read, write) = stream.into_split();
let mut writes = writes.write().await;
writes.push(write);
println!("New client: {:#?}", address.to_string());
tokio::spawn(async move {
stream_handler(read, send).await;
});
}
}
}
```
You could schedule a task at 1 PM, then generate a random number between 0-60 inside that task, wait that many minutes, then launch the actual task.