Compare commits

..

3 commits

Author SHA1 Message Date
Yannik
bcacc8b664 added readme 2022-10-26 22:03:08 +02:00
Yannik
f317d9be58 edition upgrade 2022-10-26 21:54:33 +02:00
Yannik
731b94b046 upgrade to less ancient versions, cargo fmt 2022-10-26 21:51:54 +02:00
4 changed files with 713 additions and 843 deletions

1415
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -2,20 +2,15 @@
name = "log2db" name = "log2db"
version = "0.1.0" version = "0.1.0"
authors = ["Yannik"] authors = ["Yannik"]
edition = "2018" edition = "2021"
[dependencies] [dependencies]
structopt = "*" structopt = "*"
tokio = "0.2.0-alpha.6" tokio = {version="*", features=["rt", "macros"]}
tokio-util = {version="*", features=["codec"]}
tokio-postgres = {version = "*", features =["with-chrono-0_4"]}
chrono = "*" chrono = "*"
futures-util = "*"
[dependencies.tokio-postgres]
git = "https://github.com/sfackler/rust-postgres.git"
features = ["with-chrono-0_4"]
[dependencies.futures-preview]
version = "=0.3.0-alpha.19"
features = [ "async-await" ]
[profile.release] [profile.release]
lto = "fat" lto = "fat"

4
readme.md Normal file
View file

@ -0,0 +1,4 @@
nimmt syslog von antennen und tuts in postgress.
für weniger glibc-versionsabhängingkeit bauen mit:
cargo build --release --target=x86_64-unknown-linux-musl

View file

@ -1,9 +1,8 @@
use std::str::FromStr; use std::str::FromStr;
use tokio_postgres::{Config, NoTls, Client, Statement}; use tokio_postgres::{Client, Config, NoTls, Statement};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -12,8 +11,10 @@ use structopt::StructOpt;
use std::collections::HashSet; use std::collections::HashSet;
type Error = Box<dyn std::error::Error>; use futures_util::FutureExt;
use futures_util::StreamExt;
type Error = Box<dyn std::error::Error>;
/// This application recieves logfiles in ubiquitis weird logfile format from the network, /// This application recieves logfiles in ubiquitis weird logfile format from the network,
/// splits each line into its fields and writes them to a sql database for further analysis. /// splits each line into its fields and writes them to a sql database for further analysis.
@ -23,21 +24,20 @@ type Error = Box<dyn std::error::Error>;
#[derive(StructOpt)] #[derive(StructOpt)]
struct Options { struct Options {
/// ip and (TCP) port to bind to /// ip and (TCP) port to bind to
#[structopt(short, long, default_value="[::]:514")] #[structopt(short, long, default_value = "[::]:514")]
addr : String, addr: String,
/// space separated list of daemon/service names to not write to the db /// space separated list of daemon/service names to not write to the db
#[structopt(default_value="dnsmasq-dhcp")] #[structopt(default_value = "dnsmasq-dhcp")]
blacklist: Vec<String> blacklist: Vec<String>,
} }
#[tokio::main] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
let options = Options::from_args(); let options = Options::from_args();
let postgresurl = std::env::var("LOG2DB_PSQL") let postgresurl = std::env::var("LOG2DB_PSQL")
.map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?; .map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?;
let mut cfg = Config::from_str(&postgresurl)?; let mut cfg = Config::from_str(&postgresurl)?;
cfg.application_name("log2db"); cfg.application_name("log2db");
let (client, connection) = cfg.connect(NoTls).await?; let (client, connection) = cfg.connect(NoTls).await?;
@ -49,7 +49,9 @@ async fn main() -> Result<(), Error> {
}); });
tokio::spawn(connection); tokio::spawn(connection);
client.execute("create table if not exists log( client
.execute(
"create table if not exists log(
prio smallint, prio smallint,
rcv_ip inet, rcv_ip inet,
rcv_date timestamptz, rcv_date timestamptz,
@ -57,43 +59,61 @@ async fn main() -> Result<(), Error> {
daemon varchar, daemon varchar,
message varchar message varchar
) )
", &[]).await?; ",
&[],
)
.await?;
let client = Arc::new(client); let client = Arc::new(client);
let insert_statement = client let insert_statement = client
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?; .prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
let mut listener = TcpListener::bind(&options.addr).await let listener = TcpListener::bind(&options.addr)
.await
.map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?; .map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?;
let blacklist: HashSet<_> = options.blacklist.into_iter().collect(); let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
let blacklist = Arc::new(blacklist); let blacklist = Arc::new(blacklist);
loop { loop {
match listener.accept().await { match listener.accept().await {
Ok((socket, peer)) => { Ok((socket, peer)) => {
tokio::spawn( tokio::spawn(handle_peer_and_error(
handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone(), blacklist.clone()) socket,
); peer,
}, client.clone(),
insert_statement.clone(),
blacklist.clone(),
));
}
Err(e) => eprintln!("{:?}", e), Err(e) => eprintln!("{:?}", e),
} }
} }
} }
use chrono::{Local, DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset, Local};
async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>) { async fn handle_peer_and_error(
stream: TcpStream,
peer: SocketAddr,
db: Arc<Client>,
insert_statement: Statement,
blacklist: Arc<HashSet<String>>,
) {
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await { if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
eprintln!("{}", e); eprintln!("{}", e);
} }
} }
async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>) async fn handle_peer(
-> Result<(), Error> { stream: TcpStream,
use tokio::codec::{FramedRead, LinesCodec}; peer: SocketAddr,
db: Arc<Client>,
insert_statement: Statement,
blacklist: Arc<HashSet<String>>,
) -> Result<(), Error> {
use tokio_util::codec::{FramedRead, LinesCodec};
let ip = peer.ip(); let ip = peer.ip();
@ -103,9 +123,13 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, inser
Some(line) => { Some(line) => {
let (prio, now, date, service, log) = parse_line(&line)?; let (prio, now, date, service, log) = parse_line(&line)?;
if !blacklist.contains(service) { if !blacklist.contains(service) {
db.execute(&insert_statement, &[&prio, &ip, &now, &date, &service, &log]).await?; db.execute(
&insert_statement,
&[&prio, &ip, &now, &date, &service, &log],
)
.await?;
}
} }
},
None => break, None => break,
} }
} }
@ -120,15 +144,30 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, inser
* , log entry * , log entry
* ) * )
*/ */
fn parse_line(line: &'_ str) -> Result<(i16, DateTime<Local>, DateTime<FixedOffset>, &'_ str, &'_ str), Error> { fn parse_line(
line: &'_ str,
) -> Result<
(
i16,
DateTime<Local>,
DateTime<FixedOffset>,
&'_ str,
&'_ str,
),
Error,
> {
let mut prio_and_remainder = line.splitn(2, '>'); let mut prio_and_remainder = line.splitn(2, '>');
let prio = prio_and_remainder.next().ok_or("log did not contain priority")?; let prio = prio_and_remainder
.next()
.ok_or("log did not contain priority")?;
let prio = &prio[1..]; let prio = &prio[1..];
let prio = prio.parse() let prio = prio
.map_err(|e| format!("could not parse priority {}: {}",prio, e))?; .parse()
.map_err(|e| format!("could not parse priority {}: {}", prio, e))?;
let line = prio_and_remainder
let line = prio_and_remainder.next().expect("splitn should always return a second part"); .next()
.expect("splitn should always return a second part");
let (date, line) = line.split_at(16); let (date, line) = line.split_at(16);
// we need to prepend the current year and timezone, as that is not stated in the logfile // we need to prepend the current year and timezone, as that is not stated in the logfile
@ -136,14 +175,17 @@ fn parse_line(line: &'_ str) -> Result<(i16, DateTime<Local>, DateTime<FixedOffs
let mut base = format!("{}", now.format("%Y %z ")); let mut base = format!("{}", now.format("%Y %z "));
base.push_str(date); base.push_str(date);
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S "). let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")
map_err(|e| format!("could not parse {}{} {}", date, line, e))?; .map_err(|e| format!("could not parse {}{} {}", date, line, e))?;
let mut parts = line.splitn(2, ':'); let mut parts = line.splitn(2, ':');
let service_and_pid = parts.next().ok_or("could not parse service")?; let service_and_pid = parts.next().ok_or("could not parse service")?;
let mut service_parts = service_and_pid.splitn(2, '['); let mut service_parts = service_and_pid.splitn(2, '[');
let service = service_parts.next().ok_or("could not split pid from service")?.trim(); let service = service_parts
.next()
.ok_or("could not split pid from service")?
.trim();
let log = parts.next().ok_or("could not parse logfile")?.trim(); let log = parts.next().ok_or("could not parse logfile")?.trim();
Ok((prio, now, date, service, log)) Ok((prio, now, date, service, log))