added configurability and blacklist
This commit is contained in:
parent
d8ebd51ec6
commit
af56e7d67a
1 changed files with 44 additions and 11 deletions
53
src/main.rs
53
src/main.rs
|
@ -8,13 +8,37 @@ use tokio::prelude::*;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use structopt::StructOpt;
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error>;
|
type Error = Box<dyn std::error::Error>;
|
||||||
|
|
||||||
|
|
||||||
|
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
||||||
|
/// splits each line into its fields and writes them to a sql database for further analysis.
|
||||||
|
///
|
||||||
|
/// The main required argument is an postgres connection string, passed in the environment
|
||||||
|
/// variable LOG2DB_PSQL
|
||||||
|
#[derive(StructOpt)]
|
||||||
|
struct Options {
|
||||||
|
/// ip and (TCP) port to bind to
|
||||||
|
#[structopt(short, long, default_value="[::]:514")]
|
||||||
|
addr : String,
|
||||||
|
/// space separated list of daemon/service names to not write to the db
|
||||||
|
#[structopt(default_value="dnsmasq-dhcp")]
|
||||||
|
blacklist: Vec<String>
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
//todo: read config from args/env/file
|
let options = Options::from_args();
|
||||||
|
|
||||||
let mut cfg = Config::from_str("postgres://log:log@localhost")?;
|
let postgresurl = std::env::var("LOG2DB_PSQL")
|
||||||
|
.map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?;
|
||||||
|
|
||||||
|
|
||||||
|
let mut cfg = Config::from_str(&postgresurl)?;
|
||||||
cfg.application_name("log2db");
|
cfg.application_name("log2db");
|
||||||
let (client, connection) = cfg.connect(NoTls).await?;
|
let (client, connection) = cfg.connect(NoTls).await?;
|
||||||
|
|
||||||
|
@ -39,14 +63,18 @@ async fn main() -> Result<(), Error> {
|
||||||
let insert_statement = client
|
let insert_statement = client
|
||||||
.prepare("insert into log(rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5)").await?;
|
.prepare("insert into log(rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5)").await?;
|
||||||
|
|
||||||
let addr = "[::]:8080";
|
let mut listener = TcpListener::bind(&options.addr).await
|
||||||
let mut listener = TcpListener::bind(&addr).await?;
|
.map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?;
|
||||||
|
|
||||||
|
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
||||||
|
let blacklist = Arc::new(blacklist);
|
||||||
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match listener.accept().await {
|
match listener.accept().await {
|
||||||
Ok((socket, peer)) => {
|
Ok((socket, peer)) => {
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone())
|
handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone(), blacklist.clone())
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Err(e) => eprintln!("{:?}", e),
|
Err(e) => eprintln!("{:?}", e),
|
||||||
|
@ -56,13 +84,13 @@ async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
use chrono::{Local, DateTime, FixedOffset};
|
use chrono::{Local, DateTime, FixedOffset};
|
||||||
|
|
||||||
async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement) {
|
async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>) {
|
||||||
if let Err(e) = handle_peer(stream, peer, db, insert_statement).await {
|
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
||||||
eprintln!("{}", e);
|
eprintln!("{}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement)
|
async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>)
|
||||||
-> Result<(), Error> {
|
-> Result<(), Error> {
|
||||||
use tokio::codec::{FramedRead, LinesCodec};
|
use tokio::codec::{FramedRead, LinesCodec};
|
||||||
|
|
||||||
|
@ -73,8 +101,9 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, inser
|
||||||
match lines.next().await.transpose()? {
|
match lines.next().await.transpose()? {
|
||||||
Some(line) => {
|
Some(line) => {
|
||||||
let (now, date, service, log) = parse_line(&line)?;
|
let (now, date, service, log) = parse_line(&line)?;
|
||||||
// filter out some services
|
if !blacklist.contains(service) {
|
||||||
db.execute(&insert_statement, &[&ip, &now, &date, &service, &log]).await?;
|
db.execute(&insert_statement, &[&ip, &now, &date, &service, &log]).await?;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
|
@ -100,7 +129,11 @@ fn parse_line(line: &'_ str) -> Result<(DateTime<Local>, DateTime<FixedOffset>,
|
||||||
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")?;
|
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")?;
|
||||||
|
|
||||||
let mut parts = line.splitn(2, ':');
|
let mut parts = line.splitn(2, ':');
|
||||||
let service = parts.next().ok_or("could not parse service")?;
|
|
||||||
|
let service_and_pid = parts.next().ok_or("could not parse service")?;
|
||||||
|
let mut service_parts = service_and_pid.splitn(2, '[');
|
||||||
|
let service = service_parts.next().ok_or("could not split pid from service")?.trim();
|
||||||
|
|
||||||
let log = parts.next().ok_or("could not parse logfile")?.trim();
|
let log = parts.next().ok_or("could not parse logfile")?.trim();
|
||||||
Ok((now, date, service, log))
|
Ok((now, date, service, log))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue