use chrono::{DateTime, Local}; use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone}; use futures_util::FutureExt; use futures_util::StreamExt; use std::collections::HashSet; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use structopt::StructOpt; use tokio::net::{TcpListener, TcpStream}; use tokio_postgres::{Client, Config, NoTls, Statement}; type Error = Box; /// This application recieves logfiles in ubiquitis weird logfile format from the network, /// splits each line into its fields and writes them to a sql database for further analysis. /// /// The main required argument is an postgres connection string, passed in the environment /// variable LOG2DB_PSQL #[derive(StructOpt)] struct Options { /// ip and (TCP) port to bind to #[structopt(short, long, default_value = "[::]:514")] addr: String, /// space separated list of daemon/service names to not write to the db #[structopt(default_value = "dnsmasq-dhcp")] blacklist: Vec, } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Error> { let options = Options::from_args(); let postgresurl = std::env::var("LOG2DB_PSQL") .map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?; let mut cfg = Config::from_str(&postgresurl)?; cfg.application_name("log2db"); let (client, connection) = cfg.connect(NoTls).await?; let connection = connection.map(|r| { if let Err(e) = r { eprintln!("could not connect to database: {}", e); } }); tokio::spawn(connection); client .execute( "create table if not exists log( prio smallint, rcv_ip inet, rcv_date timestamptz, date timestamptz, daemon varchar, message varchar ) ", &[], ) .await?; let client = Arc::new(client); let insert_statement = client .prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?; let listener = TcpListener::bind(&options.addr) .await .map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?; let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); let blacklist = Arc::new(blacklist); loop { match listener.accept().await { Ok((socket, peer)) => { tokio::spawn(handle_peer_and_error( socket, peer, client.clone(), insert_statement.clone(), blacklist.clone(), )); } Err(e) => eprintln!("{:?}", e), } } } async fn handle_peer_and_error( stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) { if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { eprintln!("{}", e); } } async fn handle_peer( stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>, ) -> Result<(), Error> { use tokio_util::codec::{FramedRead, LinesCodec}; let ip = peer.ip(); let mut lines = FramedRead::new(stream, LinesCodec::new()); while let Some(line) = lines.next().await.transpose()? { let ParsedLine { prio, rcvtime, logtime, service, entry, } = parse_line(&line)?; if !blacklist.contains(service) { db.execute( &insert_statement, &[&prio, &ip, &rcvtime, &logtime, &service, &entry], ) .await?; } } Ok(()) } struct ParsedLine<'a> { prio: i16, rcvtime: DateTime, logtime: DateTime, // maybe this would be more correct, but i don't want to redo the database rn //logtime: NaiveDateTime, service: &'a str, entry: &'a str, } #[test] fn tst_timeparse() { let input = "Jul 8 01:20:30"; let year = 2022; let parsed = parse_log_date(year, input).unwrap(); assert_eq!( parsed, NaiveDateTime::parse_from_str("2022-07-08 01:20:30", "%Y-%m-%d %H:%M:%S").unwrap() ) } fn parse_log_date(year: i32, input: &'_ str) -> Result { let mut parts = input.split(" ").map(|p| p.trim()).filter(|p| p.len() > 0); let month = Month::from_str(parts.next().ok_or("no month")?) .map_err(|e| format!("month parsing error: {:?}", e))?; let day: u32 = parts.next().ok_or("no day")?.parse()?; let date = NaiveDate::from_ymd_opt(year, month.number_from_month(), day).ok_or("invalid day+moth")?; let time = NaiveTime::parse_from_str(parts.next().ok_or("no time")?, "%H:%M:%S")?; Ok(NaiveDateTime::new(date, time)) } fn parse_line(line: &'_ str) -> Result, Error> { let mut prio_and_remainder = line.splitn(2, '>'); let prio = prio_and_remainder .next() .ok_or("log did not contain priority")?; let prio = &prio[1..]; let prio = prio .parse() .map_err(|e| format!("could not parse priority {}: {}", prio, e))?; let line = prio_and_remainder .next() .expect("splitn should always return a second part"); let (date, line) = line.split_at(16); let rcvtime = chrono::Local::now(); // we need to prepend the current year and add timezone, as that is not stated in the logfile let logtime = parse_log_date(rcvtime.date_naive().year(), date) .map_err(|e| format!("could not parse logtime {}{} {}", date, line, e))?; let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap(); let mut parts = line.splitn(2, ':'); let service_and_pid = parts.next().ok_or("could not parse service")?; let mut service_parts = service_and_pid.splitn(2, '['); let service = service_parts .next() .ok_or("could not split pid from service")? .trim(); let entry = parts.next().ok_or("could not parse logfile")?.trim(); Ok(ParsedLine { prio, rcvtime, logtime, service, entry, }) }