From 87815efc2466d3d219e36624f61c08f0fb9b006d Mon Sep 17 00:00:00 2001 From: Yannik <> Date: Wed, 26 Apr 2023 19:52:17 +0200 Subject: [PATCH] finished (untested) udp support --- src/main.rs | 101 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index b499759..ab77bc8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,13 @@ use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use structopt::StructOpt; +use tokio::net::UdpSocket; use tokio::net::{TcpListener, TcpStream}; +use tokio::select; use tokio_postgres::{Client, Config, NoTls, Statement}; +// todo: do better errorhandling + type Error = Box; /// This application recieves logfiles in ubiquitis weird logfile format from the network, @@ -65,29 +69,90 @@ async fn main() -> Result<(), Error> { let insert_statement = client .prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?; - let listener = TcpListener::bind(&options.addr) + let tcp_listener = TcpListener::bind(&options.addr) .await - .map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?; + .map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?; + let udp_socket = UdpSocket::bind(&options.addr) + .await + .map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?; let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); let blacklist = Arc::new(blacklist); + // rfc says max length for messages is 1024 + let mut buf = [0; 1024]; + loop { - match listener.accept().await { - Ok((socket, peer)) => { - tokio::spawn(handle_peer_and_error( - socket, - peer, - client.clone(), - insert_statement.clone(), - blacklist.clone(), - )); - } - Err(e) => eprintln!("{:?}", e), + //todo: possibly better implemented by just running two tokio::spawn tasks that loop + select! { + tcp = tcp_listener.accept() => match tcp { + Ok((socket, peer)) => { + tokio::spawn(handle_peer_and_error( + socket, + peer, + client.clone(), + insert_statement.clone(), + blacklist.clone(), + )); + } + Err(e) => eprintln!("tcp error: {:?}", e), + }, + udp = udp_socket.recv_from(&mut buf) => match udp { + Ok((len, addr)) => { + let line = &buf[0..len]; + let line = match std::str::from_utf8(&line) { + Ok(l) => l, + Err(e) => {eprintln!("udp packet is not valid utf8: {e}"); continue}, + }; + let line: String = line.into(); + + tokio::spawn(handle_udp_and_error( + line, addr, client.clone(), insert_statement.clone(),blacklist.clone())); + + }, + Err(e) => eprintln!("udp error: {:?}", e), + }, } } } +async fn handle_udp_and_error>( + line: S, + peer: SocketAddr, + db: Arc, + insert_statement: Statement, + blacklist: Arc>, +) { + if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await { + eprintln!("udp line error: {e}"); + } +} + +async fn handle_udp( + line: &'_ str, + peer: SocketAddr, + db: Arc, + insert_statement: Statement, + blacklist: Arc>, +) -> Result<(), Error> { + let ip = peer.ip(); + let ParsedLine { + prio, + rcvtime, + logtime, + service, + entry, + } = parse_line(&line)?; + if !blacklist.contains(service) { + db.execute( + &insert_statement, + &[&prio, &ip, &rcvtime, &logtime, &service, &entry], + ) + .await?; + } + Ok(()) +} + async fn handle_peer_and_error( stream: TcpStream, peer: SocketAddr, @@ -96,7 +161,7 @@ async fn handle_peer_and_error( blacklist: Arc>, ) { if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { - eprintln!("{}", e); + eprintln!("tcp line error: {}", e); } } @@ -154,7 +219,6 @@ fn tst_timeparse() { } fn parse_log_date(year: i32, input: &'_ str) -> Result { - let mut parts = input.split(" ").map(|p| p.trim()).filter(|p| p.len() > 0); let (month, input) = input.split_at(3); let (day, time) = input.split_at(4); let day = day.trim(); @@ -188,9 +252,7 @@ fn parse_log_date(year: i32, input: &'_ str) -> Result { /// * termination: a non-{alphanum, space, -} character, usually [ or : /// * content: rest of the line fn parse_line(line: &'_ str) -> Result, Error> { - let (prio, line) = line - .split_once('>') - .expect("log did not contain priority")?; + let (prio, line) = line.split_once('>').ok_or("log did not contain priority")?; let prio = &prio[1..]; let prio = prio .parse() @@ -204,9 +266,8 @@ fn parse_line(line: &'_ str) -> Result, Error> { let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap(); // this slightly differs from the rfc: - is considered non-terminating - let terminator = |c| !((c == '-') || c.is_alphanumeric()); let (service, entry) = line - .split_once(terminator) + .split_once(|c: char| !((c == '-') || c.is_alphanumeric())) .ok_or("invalid service or message")?; Ok(ParsedLine { prio,