• 1 Post
  • 3 Comments
Joined 1Y ago
cake
Cake day: Aug 08, 2023

help-circle
rss

You could schedule a task at 1 PM, then generate a random number between 0-60 inside that task, wait that many minutes, then launch the actual task.










[HELP] Sending data to multiple TcpStreams upon receiving data from another TcpStream
Hey guys, I've been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it. The goal of the program is this: It's a TCP server that accepts connections. Any data/text received should be sent to the other connections. The problem stems prom the face that I need write access to the vector of ```OwnedWriteHalf```s in two places, simultaneously. So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector. Once, when I put it in the vector of ```OwnedWriteHalf```s, and once when I go over the list to send the messages to the other clients. Anybody got any ideas or pointers? Thanks! The code: ``` use std::{io::Error, sync::Arc}; use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, net::{tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener}, sync::{mpsc::{self, UnboundedReceiver, UnboundedSender}, RwLock}}; async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>) { let addr = (&stream).peer_addr().unwrap().to_string(); let mut reader = BufReader::new(stream); let mut buffer : Vec = vec![]; while let Ok(n) = reader.read_until(b'\n', &mut buffer).await { if n == 0 { break; } let message = String::from_utf8_lossy(&buffer[..]); sender.send((addr.clone(), message.to_string())).unwrap(); buffer.clear(); } } async fn send_to_others( recv : &mut UnboundedReceiver<(String, String)>, writes : Arc>> ) { loop { let write_clone = writes.clone(); let mut write = write_clone.write().await; if let Some(msg) = recv.recv().await { for stream in write.iter_mut() { let mut writer = BufWriter::new(stream); let message = format!("INCOMING: {} - {}", msg.0, msg.1); _ = writer.write_all(message.as_bytes()).await; } } } } #[tokio::main] async fn main() -> Result<(), Error> { let listener = TcpListener::bind("0.0.0.0:6667").await?; let (send, mut receive) : ( UnboundedSender<(String, String)>, UnboundedReceiver<(String, String)> ) = mpsc::unbounded_channel(); let writes : Arc>> = Arc::new(RwLock::new(vec![])); let writes_clone = writes.clone(); tokio::spawn(async move { send_to_others(&mut receive, writes_clone).await; }); loop { println!("starting loop"); let send = send.clone(); let writes = writes.clone(); // This runs once per connection, and starts a background thread running the 'stream_handler' function if let Ok((stream, address)) = listener.accept().await { let (read, write) = stream.into_split(); let mut writes = writes.write().await; writes.push(write); println!("New client: {:#?}", address.to_string()); tokio::spawn(async move { stream_handler(read, send).await; }); } } } ```
fedilink

let mut scheduler = Scheduler::with_tz(chrono::Utc);

scheduler.every(10.minutes()).plus(30.seconds()).run(|| println!("Periodic task"));

scheduler.every(1.day()).at("3:20 pm").run(|| println!("Daily task")); 

scheduler.every(Tuesday).at("14:20:17").and_every(Thursday).at("15:00").run(|| println!("Biweekly task")); 

Damn, that a really ingenious and intuitive use of the builder pattern.

Kudos to the devs!