redid the event loop

This commit is contained in:
Yannik 2023-04-26 20:13:05 +02:00
parent 87815efc24
commit 369c4ccdbe

View file

@ -9,7 +9,6 @@ use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::select;
use tokio_postgres::{Client, Config, NoTls, Statement}; use tokio_postgres::{Client, Config, NoTls, Statement};
// todo: do better errorhandling // todo: do better errorhandling
@ -79,41 +78,70 @@ async fn main() -> Result<(), Error> {
let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
let blacklist = Arc::new(blacklist); let blacklist = Arc::new(blacklist);
// rfc says max length for messages is 1024 // tcp handling
let mut buf = [0; 1024]; let tcp = {
let client = client.clone();
let insert_statement = insert_statement.clone();
let blacklist = blacklist.clone();
tokio::spawn(async move {
loop { loop {
//todo: possibly better implemented by just running two tokio::spawn tasks that loop match tcp_listener.accept().await {
select! {
tcp = tcp_listener.accept() => match tcp {
Ok((socket, peer)) => { Ok((socket, peer)) => {
tokio::spawn(handle_peer_and_error( handle_peer_and_error(
socket, socket,
peer, peer,
client.clone(), client.clone(),
insert_statement.clone(), insert_statement.clone(),
blacklist.clone(), blacklist.clone(),
)); )
.await
} }
Err(e) => eprintln!("tcp error: {:?}", e), Err(e) => eprintln!("tcp error: {:?}", e),
}, };
udp = udp_socket.recv_from(&mut buf) => match udp { }
})
};
// udp handling
let udp = {
let client = client.clone();
let insert_statement = insert_statement.clone();
let blacklist = blacklist.clone();
tokio::spawn(async move {
// rfc says max length for messages is 1024
let mut buf = [0; 1024];
loop {
match udp_socket.recv_from(&mut buf).await {
Ok((len, addr)) => { Ok((len, addr)) => {
let line = &buf[0..len]; let line = &buf[0..len];
let line = match std::str::from_utf8(&line) { let line = match std::str::from_utf8(&line) {
Ok(l) => l, Ok(l) => l,
Err(e) => {eprintln!("udp packet is not valid utf8: {e}"); continue}, Err(e) => {
eprintln!("udp packet is not valid utf8: {e}");
continue;
}
}; };
let line: String = line.into(); let line: String = line.into();
tokio::spawn(handle_udp_and_error( handle_udp_and_error(
line, addr, client.clone(), insert_statement.clone(),blacklist.clone())); line,
addr,
}, client.clone(),
insert_statement.clone(),
blacklist.clone(),
)
.await
}
Err(e) => eprintln!("udp error: {:?}", e), Err(e) => eprintln!("udp error: {:?}", e),
}, };
}
} }
})
};
tcp.await?;
udp.await?;
// should be unreachable
Ok(())
} }
async fn handle_udp_and_error<S: AsRef<str>>( async fn handle_udp_and_error<S: AsRef<str>>(