diff --git a/src/main.rs b/src/main.rs index 088ad77..8562c63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use chrono::{DateTime, Local}; use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone}; use futures_util::FutureExt; use futures_util::StreamExt; -use std::collections::HashSet; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; @@ -68,6 +67,10 @@ async fn main() -> Result<(), Error> { let insert_statement = client .prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?; + let delete_statement = client + .prepare("delete from log where rcv_date < now() - interval '3 months'") + .await?; + let tcp_listener = TcpListener::bind(&options.addr) .await .map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?; @@ -75,8 +78,21 @@ async fn main() -> Result<(), Error> { .await .map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?; - let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); - let blacklist = Arc::new(blacklist); + let blacklist = Arc::new(options.blacklist); + + // garbage collection + let gc = { + let client = client.clone(); + tokio::spawn(async move { + loop { + if let Err(e) = client.execute(&delete_statement, &[]).await { + eprintln!("error deleting old records {e}"); + }; + // once per hour + tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await + } + }) + }; // tcp handling let tcp = { @@ -140,6 +156,7 @@ async fn main() -> Result<(), Error> { }; tcp.await?; udp.await?; + gc.await?; // should be unreachable Ok(()) } @@ -149,19 +166,29 @@ async fn handle_udp_and_error>( peer: SocketAddr, db: Arc, insert_statement: Statement, - blacklist: Arc>, + blacklist: Arc>, ) { if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await { eprintln!("udp line error: {e}"); } } +/// true if blacklist contains check +/// false otherwise +// currently linear time operation, but blacklist won't be very big +fn blacklist_matches> (blacklist: &[S], check: &str) -> bool{ + for b in blacklist { + if check.starts_with(b.as_ref()) { return true } + } + false +} + async fn handle_udp( line: &'_ str, peer: SocketAddr, db: Arc, insert_statement: Statement, - blacklist: Arc>, + blacklist: Arc>, ) -> Result<(), Error> { let ip = peer.ip(); let ParsedLine { @@ -171,7 +198,7 @@ async fn handle_udp( service, entry, } = parse_line(&line)?; - if !blacklist.contains(service) { + if !blacklist_matches(&blacklist, service) { db.execute( &insert_statement, &[&prio, &ip, &rcvtime, &logtime, &service, &entry], @@ -186,7 +213,7 @@ async fn handle_peer_and_error( peer: SocketAddr, db: Arc, insert_statement: Statement, - blacklist: Arc>, + blacklist: Arc>, ) { if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { eprintln!("tcp line error: {}", e); @@ -198,7 +225,7 @@ async fn handle_peer( peer: SocketAddr, db: Arc, insert_statement: Statement, - blacklist: Arc>, + blacklist: Arc>, ) -> Result<(), Error> { use tokio_util::codec::{FramedRead, LinesCodec}; @@ -213,7 +240,7 @@ async fn handle_peer( service, entry, } = parse_line(&line)?; - if !blacklist.contains(service) { + if !blacklist_matches(&blacklist, service) { db.execute( &insert_statement, &[&prio, &ip, &rcvtime, &logtime, &service, &entry], @@ -271,6 +298,7 @@ fn parse_log_date(year: i32, input: &'_ str) -> Result { /// 123456789012345 /// exactly 15 characters /// * separator: exactly one space +//fimxe: ubi-devices don't seem to send a hostname, wrt-ones do, how to fix? /// * hostname: ipv4, ipv6 or (dns)-hostname (containing no spaces /// * separator: exactly one space /// * message: tag (usually daemon name) and message