Compare commits
3 commits
a25c788832
...
bcacc8b664
Author | SHA1 | Date | |
---|---|---|---|
|
bcacc8b664 | ||
|
f317d9be58 | ||
|
731b94b046 |
4 changed files with 713 additions and 843 deletions
1415
Cargo.lock
generated
1415
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
15
Cargo.toml
15
Cargo.toml
|
@ -2,20 +2,15 @@
|
||||||
name = "log2db"
|
name = "log2db"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
authors = ["Yannik"]
|
authors = ["Yannik"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
structopt = "*"
|
structopt = "*"
|
||||||
tokio = "0.2.0-alpha.6"
|
tokio = {version="*", features=["rt", "macros"]}
|
||||||
|
tokio-util = {version="*", features=["codec"]}
|
||||||
|
tokio-postgres = {version = "*", features =["with-chrono-0_4"]}
|
||||||
chrono = "*"
|
chrono = "*"
|
||||||
|
futures-util = "*"
|
||||||
[dependencies.tokio-postgres]
|
|
||||||
git = "https://github.com/sfackler/rust-postgres.git"
|
|
||||||
features = ["with-chrono-0_4"]
|
|
||||||
|
|
||||||
[dependencies.futures-preview]
|
|
||||||
version = "=0.3.0-alpha.19"
|
|
||||||
features = [ "async-await" ]
|
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
lto = "fat"
|
lto = "fat"
|
||||||
|
|
4
readme.md
Normal file
4
readme.md
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
nimmt syslog von antennen und tuts in postgress.
|
||||||
|
für weniger glibc-versionsabhängingkeit bauen mit:
|
||||||
|
|
||||||
|
cargo build --release --target=x86_64-unknown-linux-musl
|
100
src/main.rs
100
src/main.rs
|
@ -1,9 +1,8 @@
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
use tokio_postgres::{Config, NoTls, Client, Statement};
|
use tokio_postgres::{Client, Config, NoTls, Statement};
|
||||||
|
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::prelude::*;
|
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -12,8 +11,10 @@ use structopt::StructOpt;
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error>;
|
use futures_util::FutureExt;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
|
||||||
|
type Error = Box<dyn std::error::Error>;
|
||||||
|
|
||||||
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
||||||
/// splits each line into its fields and writes them to a sql database for further analysis.
|
/// splits each line into its fields and writes them to a sql database for further analysis.
|
||||||
|
@ -27,17 +28,16 @@ struct Options {
|
||||||
addr: String,
|
addr: String,
|
||||||
/// space separated list of daemon/service names to not write to the db
|
/// space separated list of daemon/service names to not write to the db
|
||||||
#[structopt(default_value = "dnsmasq-dhcp")]
|
#[structopt(default_value = "dnsmasq-dhcp")]
|
||||||
blacklist: Vec<String>
|
blacklist: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
let options = Options::from_args();
|
let options = Options::from_args();
|
||||||
|
|
||||||
let postgresurl = std::env::var("LOG2DB_PSQL")
|
let postgresurl = std::env::var("LOG2DB_PSQL")
|
||||||
.map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?;
|
.map_err(|_| "please supply the LOG2DB_PSQL environment variable, containting a postgress connection string")?;
|
||||||
|
|
||||||
|
|
||||||
let mut cfg = Config::from_str(&postgresurl)?;
|
let mut cfg = Config::from_str(&postgresurl)?;
|
||||||
cfg.application_name("log2db");
|
cfg.application_name("log2db");
|
||||||
let (client, connection) = cfg.connect(NoTls).await?;
|
let (client, connection) = cfg.connect(NoTls).await?;
|
||||||
|
@ -49,7 +49,9 @@ async fn main() -> Result<(), Error> {
|
||||||
});
|
});
|
||||||
tokio::spawn(connection);
|
tokio::spawn(connection);
|
||||||
|
|
||||||
client.execute("create table if not exists log(
|
client
|
||||||
|
.execute(
|
||||||
|
"create table if not exists log(
|
||||||
prio smallint,
|
prio smallint,
|
||||||
rcv_ip inet,
|
rcv_ip inet,
|
||||||
rcv_date timestamptz,
|
rcv_date timestamptz,
|
||||||
|
@ -57,43 +59,61 @@ async fn main() -> Result<(), Error> {
|
||||||
daemon varchar,
|
daemon varchar,
|
||||||
message varchar
|
message varchar
|
||||||
)
|
)
|
||||||
", &[]).await?;
|
",
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let client = Arc::new(client);
|
let client = Arc::new(client);
|
||||||
|
|
||||||
let insert_statement = client
|
let insert_statement = client
|
||||||
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
||||||
|
|
||||||
let mut listener = TcpListener::bind(&options.addr).await
|
let listener = TcpListener::bind(&options.addr)
|
||||||
|
.await
|
||||||
.map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?;
|
.map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?;
|
||||||
|
|
||||||
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
||||||
let blacklist = Arc::new(blacklist);
|
let blacklist = Arc::new(blacklist);
|
||||||
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match listener.accept().await {
|
match listener.accept().await {
|
||||||
Ok((socket, peer)) => {
|
Ok((socket, peer)) => {
|
||||||
tokio::spawn(
|
tokio::spawn(handle_peer_and_error(
|
||||||
handle_peer_and_error(socket, peer, client.clone(), insert_statement.clone(), blacklist.clone())
|
socket,
|
||||||
);
|
peer,
|
||||||
},
|
client.clone(),
|
||||||
|
insert_statement.clone(),
|
||||||
|
blacklist.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
Err(e) => eprintln!("{:?}", e),
|
Err(e) => eprintln!("{:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
use chrono::{Local, DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset, Local};
|
||||||
|
|
||||||
async fn handle_peer_and_error(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>) {
|
async fn handle_peer_and_error(
|
||||||
|
stream: TcpStream,
|
||||||
|
peer: SocketAddr,
|
||||||
|
db: Arc<Client>,
|
||||||
|
insert_statement: Statement,
|
||||||
|
blacklist: Arc<HashSet<String>>,
|
||||||
|
) {
|
||||||
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
||||||
eprintln!("{}", e);
|
eprintln!("{}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, insert_statement: Statement, blacklist: Arc<HashSet<String>>)
|
async fn handle_peer(
|
||||||
-> Result<(), Error> {
|
stream: TcpStream,
|
||||||
use tokio::codec::{FramedRead, LinesCodec};
|
peer: SocketAddr,
|
||||||
|
db: Arc<Client>,
|
||||||
|
insert_statement: Statement,
|
||||||
|
blacklist: Arc<HashSet<String>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
use tokio_util::codec::{FramedRead, LinesCodec};
|
||||||
|
|
||||||
let ip = peer.ip();
|
let ip = peer.ip();
|
||||||
|
|
||||||
|
@ -103,9 +123,13 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, inser
|
||||||
Some(line) => {
|
Some(line) => {
|
||||||
let (prio, now, date, service, log) = parse_line(&line)?;
|
let (prio, now, date, service, log) = parse_line(&line)?;
|
||||||
if !blacklist.contains(service) {
|
if !blacklist.contains(service) {
|
||||||
db.execute(&insert_statement, &[&prio, &ip, &now, &date, &service, &log]).await?;
|
db.execute(
|
||||||
|
&insert_statement,
|
||||||
|
&[&prio, &ip, &now, &date, &service, &log],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,15 +144,30 @@ async fn handle_peer(stream: TcpStream, peer: SocketAddr, db: Arc<Client>, inser
|
||||||
* , log entry
|
* , log entry
|
||||||
* )
|
* )
|
||||||
*/
|
*/
|
||||||
fn parse_line(line: &'_ str) -> Result<(i16, DateTime<Local>, DateTime<FixedOffset>, &'_ str, &'_ str), Error> {
|
fn parse_line(
|
||||||
|
line: &'_ str,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
i16,
|
||||||
|
DateTime<Local>,
|
||||||
|
DateTime<FixedOffset>,
|
||||||
|
&'_ str,
|
||||||
|
&'_ str,
|
||||||
|
),
|
||||||
|
Error,
|
||||||
|
> {
|
||||||
let mut prio_and_remainder = line.splitn(2, '>');
|
let mut prio_and_remainder = line.splitn(2, '>');
|
||||||
let prio = prio_and_remainder.next().ok_or("log did not contain priority")?;
|
let prio = prio_and_remainder
|
||||||
|
.next()
|
||||||
|
.ok_or("log did not contain priority")?;
|
||||||
let prio = &prio[1..];
|
let prio = &prio[1..];
|
||||||
let prio = prio.parse()
|
let prio = prio
|
||||||
|
.parse()
|
||||||
.map_err(|e| format!("could not parse priority {}: {}", prio, e))?;
|
.map_err(|e| format!("could not parse priority {}: {}", prio, e))?;
|
||||||
|
|
||||||
|
let line = prio_and_remainder
|
||||||
let line = prio_and_remainder.next().expect("splitn should always return a second part");
|
.next()
|
||||||
|
.expect("splitn should always return a second part");
|
||||||
let (date, line) = line.split_at(16);
|
let (date, line) = line.split_at(16);
|
||||||
|
|
||||||
// we need to prepend the current year and timezone, as that is not stated in the logfile
|
// we need to prepend the current year and timezone, as that is not stated in the logfile
|
||||||
|
@ -136,14 +175,17 @@ fn parse_line(line: &'_ str) -> Result<(i16, DateTime<Local>, DateTime<FixedOffs
|
||||||
let mut base = format!("{}", now.format("%Y %z "));
|
let mut base = format!("{}", now.format("%Y %z "));
|
||||||
base.push_str(date);
|
base.push_str(date);
|
||||||
|
|
||||||
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ").
|
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")
|
||||||
map_err(|e| format!("could not parse {}{} {}", date, line, e))?;
|
.map_err(|e| format!("could not parse {}{} {}", date, line, e))?;
|
||||||
|
|
||||||
let mut parts = line.splitn(2, ':');
|
let mut parts = line.splitn(2, ':');
|
||||||
|
|
||||||
let service_and_pid = parts.next().ok_or("could not parse service")?;
|
let service_and_pid = parts.next().ok_or("could not parse service")?;
|
||||||
let mut service_parts = service_and_pid.splitn(2, '[');
|
let mut service_parts = service_and_pid.splitn(2, '[');
|
||||||
let service = service_parts.next().ok_or("could not split pid from service")?.trim();
|
let service = service_parts
|
||||||
|
.next()
|
||||||
|
.ok_or("could not split pid from service")?
|
||||||
|
.trim();
|
||||||
|
|
||||||
let log = parts.next().ok_or("could not parse logfile")?.trim();
|
let log = parts.next().ok_or("could not parse logfile")?.trim();
|
||||||
Ok((prio, now, date, service, log))
|
Ok((prio, now, date, service, log))
|
||||||
|
|
Loading…
Reference in a new issue