338 lines
10 KiB
Rust
338 lines
10 KiB
Rust
use chrono::{DateTime, Local};
|
|
use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone};
|
|
use futures_util::FutureExt;
|
|
use futures_util::StreamExt;
|
|
use std::net::SocketAddr;
|
|
use std::str::FromStr;
|
|
use std::sync::Arc;
|
|
use structopt::StructOpt;
|
|
use tokio::net::UdpSocket;
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
use tokio_postgres::{Client, Config, NoTls, Statement};
|
|
|
|
// todo: do better errorhandling
|
|
|
|
type Error = Box<dyn std::error::Error>;
|
|
|
|
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
|
/// splits each line into its fields and writes them to a sql database for further analysis.
|
|
///
|
|
/// The main required argument is an postgres connection string, passed in the environment
|
|
/// variable LOG2DB_PSQL
|
|
#[derive(StructOpt)]
|
|
struct Options {
|
|
/// ip and (TCP) port to bind to
|
|
#[structopt(short, long, default_value = "[::]:514")]
|
|
addr: String,
|
|
/// space separated list of daemon/service names to not write to the db
|
|
#[structopt(default_value = "dnsmasq-dhcp")]
|
|
blacklist: Vec<String>,
|
|
}
|
|
|
|
#[tokio::main(flavor = "current_thread")]
|
|
async fn main() -> Result<(), Error> {
|
|
let options = Options::from_args();
|
|
|
|
let postgresurl = std::env::var("LOG2DB_PSQL")
|
|
.map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?;
|
|
|
|
let mut cfg = Config::from_str(&postgresurl)?;
|
|
cfg.application_name("log2db");
|
|
let (client, connection) = cfg.connect(NoTls).await?;
|
|
|
|
let connection = connection.map(|r| {
|
|
if let Err(e) = r {
|
|
eprintln!("could not connect to database: {}", e);
|
|
}
|
|
});
|
|
tokio::spawn(connection);
|
|
|
|
client
|
|
.execute(
|
|
"create table if not exists log(
|
|
prio smallint,
|
|
rcv_ip inet,
|
|
rcv_date timestamptz,
|
|
date timestamptz,
|
|
daemon varchar,
|
|
message varchar
|
|
)
|
|
",
|
|
&[],
|
|
)
|
|
.await?;
|
|
|
|
let client = Arc::new(client);
|
|
|
|
let insert_statement = client
|
|
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
|
|
|
let delete_statement = client
|
|
.prepare("delete from log where rcv_date < now() - interval '3 months'")
|
|
.await?;
|
|
|
|
let tcp_listener = TcpListener::bind(&options.addr)
|
|
.await
|
|
.map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?;
|
|
let udp_socket = UdpSocket::bind(&options.addr)
|
|
.await
|
|
.map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?;
|
|
|
|
let blacklist = Arc::new(options.blacklist);
|
|
|
|
// garbage collection
|
|
let gc = {
|
|
let client = client.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
if let Err(e) = client.execute(&delete_statement, &[]).await {
|
|
eprintln!("error deleting old records {e}");
|
|
};
|
|
// once per hour
|
|
tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await
|
|
}
|
|
})
|
|
};
|
|
|
|
// tcp handling
|
|
let tcp = {
|
|
let client = client.clone();
|
|
let insert_statement = insert_statement.clone();
|
|
let blacklist = blacklist.clone();
|
|
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match tcp_listener.accept().await {
|
|
Ok((socket, peer)) => {
|
|
handle_peer_and_error(
|
|
socket,
|
|
peer,
|
|
client.clone(),
|
|
insert_statement.clone(),
|
|
blacklist.clone(),
|
|
)
|
|
.await
|
|
}
|
|
Err(e) => eprintln!("tcp error: {:?}", e),
|
|
};
|
|
}
|
|
})
|
|
};
|
|
|
|
// udp handling
|
|
let udp = {
|
|
let client = client.clone();
|
|
let insert_statement = insert_statement.clone();
|
|
let blacklist = blacklist.clone();
|
|
tokio::spawn(async move {
|
|
// rfc says max length for messages is 1024
|
|
let mut buf = [0; 1024];
|
|
loop {
|
|
match udp_socket.recv_from(&mut buf).await {
|
|
Ok((len, addr)) => {
|
|
let line = &buf[0..len];
|
|
let line = match std::str::from_utf8(&line) {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
eprintln!("udp packet is not valid utf8: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
handle_udp_and_error(
|
|
line,
|
|
addr,
|
|
client.clone(),
|
|
insert_statement.clone(),
|
|
blacklist.clone(),
|
|
)
|
|
.await
|
|
}
|
|
Err(e) => eprintln!("udp error: {:?}", e),
|
|
};
|
|
}
|
|
})
|
|
};
|
|
tcp.await?;
|
|
udp.await?;
|
|
gc.await?;
|
|
// should be unreachable
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_udp_and_error<S: AsRef<str>>(
|
|
line: S,
|
|
peer: SocketAddr,
|
|
db: Arc<Client>,
|
|
insert_statement: Statement,
|
|
blacklist: Arc<Vec<String>>,
|
|
) {
|
|
if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await {
|
|
eprintln!("udp line error: {e}");
|
|
}
|
|
}
|
|
|
|
/// true if blacklist contains check
|
|
/// false otherwise
|
|
// currently linear time operation, but blacklist won't be very big
|
|
fn blacklist_matches<S: AsRef<str>> (blacklist: &[S], check: &str) -> bool{
|
|
for b in blacklist {
|
|
if check.starts_with(b.as_ref()) { return true }
|
|
}
|
|
false
|
|
}
|
|
|
|
async fn handle_udp(
|
|
line: &'_ str,
|
|
peer: SocketAddr,
|
|
db: Arc<Client>,
|
|
insert_statement: Statement,
|
|
blacklist: Arc<Vec<String>>,
|
|
) -> Result<(), Error> {
|
|
let ip = peer.ip();
|
|
let ParsedLine {
|
|
prio,
|
|
rcvtime,
|
|
logtime,
|
|
service,
|
|
entry,
|
|
} = parse_line(&line)?;
|
|
if !blacklist_matches(&blacklist, service) {
|
|
db.execute(
|
|
&insert_statement,
|
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
|
)
|
|
.await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_peer_and_error(
|
|
stream: TcpStream,
|
|
peer: SocketAddr,
|
|
db: Arc<Client>,
|
|
insert_statement: Statement,
|
|
blacklist: Arc<Vec<String>>,
|
|
) {
|
|
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
|
eprintln!("tcp line error: {}", e);
|
|
}
|
|
}
|
|
|
|
async fn handle_peer(
|
|
stream: TcpStream,
|
|
peer: SocketAddr,
|
|
db: Arc<Client>,
|
|
insert_statement: Statement,
|
|
blacklist: Arc<Vec<String>>,
|
|
) -> Result<(), Error> {
|
|
use tokio_util::codec::{FramedRead, LinesCodec};
|
|
|
|
let ip = peer.ip();
|
|
|
|
let mut lines = FramedRead::new(stream, LinesCodec::new());
|
|
while let Some(line) = lines.next().await.transpose()? {
|
|
let ParsedLine {
|
|
prio,
|
|
rcvtime,
|
|
logtime,
|
|
service,
|
|
entry,
|
|
} = parse_line(&line)?;
|
|
if !blacklist_matches(&blacklist, service) {
|
|
db.execute(
|
|
&insert_statement,
|
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
struct ParsedLine<'a> {
|
|
prio: i16,
|
|
// maybe this would be more correct, but i don't want to redo the database rn
|
|
//prio: u16
|
|
rcvtime: DateTime<Local>,
|
|
logtime: DateTime<Local>,
|
|
// maybe this would be more correct, but i don't want to redo the database rn
|
|
//logtime: NaiveDateTime,
|
|
service: &'a str,
|
|
entry: &'a str,
|
|
}
|
|
#[test]
|
|
fn tst_timeparse() {
|
|
let input = "Jul 8 01:20:30";
|
|
let year = 2022;
|
|
let parsed = parse_log_date(year, input).unwrap();
|
|
assert_eq!(
|
|
parsed,
|
|
NaiveDateTime::parse_from_str("2022-07-08 01:20:30", "%Y-%m-%d %H:%M:%S").unwrap()
|
|
)
|
|
}
|
|
|
|
fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
|
let (month, input) = input.split_at(3);
|
|
let (day, time) = input.split_at(4);
|
|
let day = day.trim();
|
|
|
|
let month = Month::from_str(month).map_err(|e| format!("month parsing error: {:?}", e))?;
|
|
let day: u32 = day.parse()?;
|
|
let date =
|
|
NaiveDate::from_ymd_opt(year, month.number_from_month(), day).ok_or("invalid day+moth")?;
|
|
let time = NaiveTime::parse_from_str(time, "%H:%M:%S")?;
|
|
Ok(NaiveDateTime::new(date, time))
|
|
}
|
|
|
|
/// Parses a line of RFC 3164 syslog messages
|
|
/// lines have 3 parts
|
|
/// * priority (somewhat irrelevant):
|
|
/// < followed by up to 3 ascii digits, followed by >
|
|
/// ex: <123>
|
|
/// * header, itself containing a timestamp and an identification (hostname or ip)
|
|
/// * timestamp is Mmm dd hh:mm:ss
|
|
/// ex: Jan 13 13:13:13
|
|
/// ex: Jan 1 01:01:01
|
|
/// 123456789012345
|
|
/// exactly 15 characters
|
|
/// * separator: exactly one space
|
|
//fimxe: ubi-devices don't seem to send a hostname, wrt-ones do, how to fix?
|
|
/// * hostname: ipv4, ipv6 or (dns)-hostname (containing no spaces
|
|
/// * separator: exactly one space
|
|
/// * message: tag (usually daemon name) and message
|
|
/// * tag: daemon name usually
|
|
/// alphanum and space and -
|
|
/// at most 32 characters
|
|
/// * termination: a non-{alphanum, space, -} character, usually [ or :
|
|
/// * content: rest of the line
|
|
fn parse_line(line: &'_ str) -> Result<ParsedLine<'_>, Error> {
|
|
let (prio, line) = line.split_once('>').ok_or("log did not contain priority")?;
|
|
let prio = &prio[1..];
|
|
let prio = prio
|
|
.parse()
|
|
.map_err(|e| format!("could not parse priority {}: {}", prio, e))?;
|
|
|
|
let (date, line) = line.split_at(15);
|
|
let rcvtime = chrono::Local::now();
|
|
// we need to prepend the current year and add timezone, as that is not stated in the logfile
|
|
let logtime = parse_log_date(rcvtime.date_naive().year(), date)
|
|
.map_err(|e| format!("could not parse logtime {}{} {}", date, line, e))?;
|
|
let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap();
|
|
|
|
// skip seperator
|
|
let line = &line[1..];
|
|
|
|
// this slightly differs from the rfc: - is considered non-terminating
|
|
let (service, entry) = line
|
|
.split_once(|c: char| !((c == '-') || c.is_alphanumeric()))
|
|
.ok_or("invalid service or message")?;
|
|
Ok(ParsedLine {
|
|
prio,
|
|
rcvtime,
|
|
logtime,
|
|
service,
|
|
entry,
|
|
})
|
|
}
|