use std::str::FromStr; use tokio_postgres::{Config, NoTls, Client, Statement}; use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; use std::net::SocketAddr; use std::sync::Arc; type Error = Box; #[tokio::main] async fn main() -> Result<(), Error> { //todo: read config from args/env/file let mut cfg = Config::from_str("postgres://log:log@localhost")?; cfg.application_name("log2db"); let (client, connection) = cfg.connect(NoTls).await?; let connection = connection.map(|r| { if let Err(e) = r { eprintln!("could not connect to database: {}", e); } }); tokio::spawn(connection); client.execute("create table if not exists log( rcv_ip inet, rcv_date timestamptz, date timestamptz, daemon varchar, message varchar ) ", &[]).await?; let client = Arc::new(client); let insert_statement = client .prepare("insert into log(rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5)").await?; let addr = "[::]:8080"; let mut listener = TcpListener::bind(&addr).await?; loop { match listener.accept().await { Ok((socket, peer)) => { tokio::spawn( handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone()) ); }, Err(e) => eprintln!("{:?}", e), } } } use chrono::{Local, DateTime, FixedOffset}; async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement) { if let Err(e) = handle_peer(stream, peer, db, insert_statement).await { eprintln!("{}", e); } } async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc, insert_statement: Statement) -> Result<(), Error> { use tokio::codec::{FramedRead, LinesCodec}; let ip = peer.ip(); let mut lines = FramedRead::new(stream, LinesCodec::new()); loop { match lines.next().await.transpose()? { Some(line) => { let (now, date, service, log) = parse_line(&line)?; // filter out some services db.execute(&insert_statement, &[&ip, &now, &date, &service, &log]).await?; }, None => break, } } Ok(()) } /** parses a line, returning * ( time the log was recieved * , time the log was written according to logger * , name of the service that wrote the log * , log entry * ) */ fn parse_line(line: &'_ str) -> Result<(DateTime, DateTime, &'_ str, &'_ str), Error> { let (date, line) = line.split_at(16); // we need to prepend the current year and timezone, as that is not stated in the logfile let now = chrono::Local::now(); let mut base = format!("{}", now.format("%Y %z ")); base.push_str(date); let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")?; let mut parts = line.splitn(2, ':'); let service = parts.next().ok_or("could not parse service")?; let log = parts.next().ok_or("could not parse logfile")?.trim(); Ok((now, date, service, log)) }