diff --git a/src/main.rs b/src/main.rs index 32d29aa..bdbdf18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,13 +8,37 @@ use tokio::prelude::*; use std::net::SocketAddr; use std::sync::Arc; +use structopt::StructOpt; + +use std::collections::HashSet; + type Error = Box; + +/// This application recieves logfiles in ubiquitis weird logfile format from the network, +/// splits each line into its fields and writes them to a sql database for further analysis. +/// +/// The main required argument is an postgres connection string, passed in the environment +/// variable LOG2DB_PSQL +#[derive(StructOpt)] +struct Options { + /// ip and (TCP) port to bind to + #[structopt(short, long, default_value="[::]:514")] + addr : String, + /// space separated list of daemon/service names to not write to the db + #[structopt(default_value="dnsmasq-dhcp")] + blacklist: Vec +} + #[tokio::main] async fn main() -> Result<(), Error> { - //todo: read config from args/env/file + let options = Options::from_args(); - let mut cfg = Config::from_str("postgres://log:log@localhost")?; + let postgresurl = std::env::var("LOG2DB_PSQL") + .map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?; + + + let mut cfg = Config::from_str(&postgresurl)?; cfg.application_name("log2db"); let (client, connection) = cfg.connect(NoTls).await?; @@ -39,14 +63,18 @@ async fn main() -> Result<(), Error> { let insert_statement = client .prepare("insert into log(rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5)").await?; - let addr = "[::]:8080"; - let mut listener = TcpListener::bind(&addr).await?; + let mut listener = TcpListener::bind(&options.addr).await + .map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?; + + let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); + let blacklist = Arc::new(blacklist); + loop { match listener.accept().await { Ok((socket, peer)) => { tokio::spawn( - handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone()) + handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone(), blacklist.clone()) ); }, Err(e) => eprintln!("{:?}", e), @@ -56,13 +84,13 @@ async fn main() -> Result<(), Error> { use chrono::{Local, DateTime, FixedOffset}; -async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement) { - if let Err(e) = handle_peer(stream, peer, db, insert_statement).await { +async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>) { + if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { eprintln!("{}", e); } } -async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement) +async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement, blacklist: Arc>) -> Result<(), Error> { use tokio::codec::{FramedRead, LinesCodec}; @@ -73,8 +101,9 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc, inser match lines.next().await.transpose()? { Some(line) => { let (now, date, service, log) = parse_line(&line)?; - // filter out some services - db.execute(&insert_statement, &[&ip, &now, &date, &service, &log]).await?; + if !blacklist.contains(service) { + db.execute(&insert_statement, &[&ip, &now, &date, &service, &log]).await?; + } }, None => break, } @@ -100,7 +129,11 @@ fn parse_line(line: &'_ str) -> Result<(DateTime, DateTime, let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")?; let mut parts = line.splitn(2, ':'); - let service = parts.next().ok_or("could not parse service")?; + + let service_and_pid = parts.next().ok_or("could not parse service")?; + let mut service_parts = service_and_pid.splitn(2, '['); + let service = service_parts.next().ok_or("could not split pid from service")?.trim(); + let log = parts.next().ok_or("could not parse logfile")?.trim(); Ok((now, date, service, log)) }