use chrono::{DateTime, Local}; use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone}; use futures_util::FutureExt; use futures_util::StreamExt; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use structopt::StructOpt; use tokio::net::UdpSocket; use tokio::net::{TcpListener, TcpStream}; use tokio_postgres::{Client, Config, NoTls, Statement}; // todo: do better errorhandling type Error = Box; /// This application recieves logfiles in ubiquitis weird logfile format from the network, /// splits each line into its fields and writes them to a sql database for further analysis. /// /// The main required argument is an postgres connection string, passed in the environment /// variable LOG2DB_PSQL #[derive(StructOpt)] struct Options { /// ip and (TCP) port to bind to #[structopt(short, long, default_value = "[::]:514")] addr: String, /// space separated list of daemon/service names to not write to the db #[structopt(default_value = "dnsmasq-dhcp")] blacklist: Vec, } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Error> { let options = Options::from_args(); let postgresurl = std::env::var("LOG2DB_PSQL") .map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?; let mut cfg = Config::from_str(&postgresurl)?; cfg.application_name("log2db"); let (client, connection) = cfg.connect(NoTls).await?; let connection = connection.map(|r| { if let Err(e) = r { eprintln!("could not connect to database: {}", e); } }); tokio::spawn(connection); client .execute( "create table if not exists log( prio smallint, rcv_ip inet, rcv_date timestamptz, date timestamptz, daemon varchar, message varchar ) ", &[], ) .await?; let client = Arc::new(client); let insert_statement = client .prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?; let delete_statement = client .prepare("delete from log where rcv_date < now() - interval '3 months'") .await?; let tcp_listener = TcpListener::bind(&options.addr) .await .map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?; let udp_socket = UdpSocket::bind(&options.addr) .await .map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?; let blacklist = Arc::new(options.blacklist); // garbage collection let gc = { let client = client.clone(); tokio::spawn(async move { loop { if let Err(e) = client.execute(&delete_statement, &[]).await { eprintln!("error deleting old records {e}"); }; // once per hour tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await } }) }; // tcp handling let tcp = { let client = client.clone(); let insert_statement = insert_statement.clone(); let blacklist = blacklist.clone(); tokio::spawn(async move { loop { match tcp_listener.accept().await { Ok((socket, peer)) => { handle_peer_and_error( socket, peer, client.clone(), insert_statement.clone(), blacklist.clone(), ) .await } Err(e) => eprintln!("tcp error: {:?}", e), }; } }) }; // udp handling let udp = { let client = client.clone(); let insert_statement = insert_statement.clone(); let blacklist = blacklist.clone(); tokio::spawn(async move { // rfc says max length for messages is 1024 let mut buf = [0; 1024]; loop { match udp_socket.recv_from(&mut buf).await { Ok((len, addr)) => { let line = &buf[0..len]; let line = match std::str::from_utf8(&line) { Ok(l) => l, Err(e) => { eprintln!("udp packet is not valid utf8: {e}"); continue; } }; handle_udp_and_error( line, addr, client.clone(), insert_statement.clone(), blacklist.clone(), ) .await } Err(e) => eprintln!("udp error: {:?}", e), }; } }) }; tcp.await?; udp.await?; gc.await?; // should be unreachable Ok(()) } async fn handle_udp_and_error>( line: S, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) { if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await { eprintln!("udp line error: {e}"); } } /// true if blacklist contains check /// false otherwise // currently linear time operation, but blacklist won't be very big fn blacklist_matches> (blacklist: &[S], check: &str) -> bool{ for b in blacklist { if check.starts_with(b.as_ref()) { return true } } false } async fn handle_udp( line: &'_ str, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) -> Result<(), Error> { let ip = peer.ip(); let ParsedLine { prio, rcvtime, logtime, service, entry, } = parse_line(&line)?; if !blacklist_matches(&blacklist, service) { db.execute( &insert_statement, &[&prio, &ip, &rcvtime, &logtime, &service, &entry], ) .await?; } Ok(()) } async fn handle_peer_and_error( stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) { if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { eprintln!("tcp line error: {}", e); } } async fn handle_peer( stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) -> Result<(), Error> { use tokio_util::codec::{FramedRead, LinesCodec}; let ip = peer.ip(); let mut lines = FramedRead::new(stream, LinesCodec::new()); while let Some(line) = lines.next().await.transpose()? { let ParsedLine { prio, rcvtime, logtime, service, entry, } = parse_line(&line)?; if !blacklist_matches(&blacklist, service) { db.execute( &insert_statement, &[&prio, &ip, &rcvtime, &logtime, &service, &entry], ) .await?; } } Ok(()) } struct ParsedLine<'a> { prio: i16, // maybe this would be more correct, but i don't want to redo the database rn //prio: u16 rcvtime: DateTime, logtime: DateTime, // maybe this would be more correct, but i don't want to redo the database rn //logtime: NaiveDateTime, service: &'a str, entry: &'a str, } #[test] fn tst_timeparse() { let input = "Jul 8 01:20:30"; let year = 2022; let parsed = parse_log_date(year, input).unwrap(); assert_eq!( parsed, NaiveDateTime::parse_from_str("2022-07-08 01:20:30", "%Y-%m-%d %H:%M:%S").unwrap() ) } fn parse_log_date(year: i32, input: &'_ str) -> Result { let (month, input) = input.split_at(3); let (day, time) = input.split_at(4); let day = day.trim(); let month = Month::from_str(month).map_err(|e| format!("month parsing error: {:?}", e))?; let day: u32 = day.parse()?; let date = NaiveDate::from_ymd_opt(year, month.number_from_month(), day).ok_or("invalid day+moth")?; let time = NaiveTime::parse_from_str(time, "%H:%M:%S")?; Ok(NaiveDateTime::new(date, time)) } /// Parses a line of RFC 3164 syslog messages /// lines have 3 parts /// * priority (somewhat irrelevant): /// < followed by up to 3 ascii digits, followed by > /// ex: <123> /// * header, itself containing a timestamp and an identification (hostname or ip) /// * timestamp is Mmm dd hh:mm:ss /// ex: Jan 13 13:13:13 /// ex: Jan 1 01:01:01 /// 123456789012345 /// exactly 15 characters /// * separator: exactly one space //fimxe: ubi-devices don't seem to send a hostname, wrt-ones do, how to fix? /// * hostname: ipv4, ipv6 or (dns)-hostname (containing no spaces /// * separator: exactly one space /// * message: tag (usually daemon name) and message /// * tag: daemon name usually /// alphanum and space and - /// at most 32 characters /// * termination: a non-{alphanum, space, -} character, usually [ or : /// * content: rest of the line fn parse_line(line: &'_ str) -> Result, Error> { let (prio, line) = line.split_once('>').ok_or("log did not contain priority")?; let prio = &prio[1..]; let prio = prio .parse() .map_err(|e| format!("could not parse priority {}: {}", prio, e))?; let (date, line) = line.split_at(15); let rcvtime = chrono::Local::now(); // we need to prepend the current year and add timezone, as that is not stated in the logfile let logtime = parse_log_date(rcvtime.date_naive().year(), date) .map_err(|e| format!("could not parse logtime {}{} {}", date, line, e))?; let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap(); // skip seperator let line = &line[1..]; // this slightly differs from the rfc: - is considered non-terminating let (service, entry) = line .split_once(|c: char| !((c == '-') || c.is_alphanumeric())) .ok_or("invalid service or message")?; Ok(ParsedLine { prio, rcvtime, logtime, service, entry, }) }