Compare commits
2 commits
bcacc8b664
...
42c318a1b8
Author | SHA1 | Date | |
---|---|---|---|
|
42c318a1b8 | ||
|
23733caed8 |
1 changed files with 68 additions and 57 deletions
125
src/main.rs
125
src/main.rs
|
@ -1,18 +1,14 @@
|
||||||
use std::str::FromStr;
|
use chrono::{DateTime, Local};
|
||||||
|
use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone};
|
||||||
use tokio_postgres::{Client, Config, NoTls, Statement};
|
|
||||||
|
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use structopt::StructOpt;
|
|
||||||
|
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use structopt::StructOpt;
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio_postgres::{Client, Config, NoTls, Statement};
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error>;
|
type Error = Box<dyn std::error::Error>;
|
||||||
|
|
||||||
|
@ -92,8 +88,6 @@ async fn main() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
use chrono::{DateTime, FixedOffset, Local};
|
|
||||||
|
|
||||||
async fn handle_peer_and_error(
|
async fn handle_peer_and_error(
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
|
@ -118,44 +112,57 @@ async fn handle_peer(
|
||||||
let ip = peer.ip();
|
let ip = peer.ip();
|
||||||
|
|
||||||
let mut lines = FramedRead::new(stream, LinesCodec::new());
|
let mut lines = FramedRead::new(stream, LinesCodec::new());
|
||||||
loop {
|
while let Some(line) = lines.next().await.transpose()? {
|
||||||
match lines.next().await.transpose()? {
|
let ParsedLine {
|
||||||
Some(line) => {
|
prio,
|
||||||
let (prio, now, date, service, log) = parse_line(&line)?;
|
rcvtime,
|
||||||
if !blacklist.contains(service) {
|
logtime,
|
||||||
db.execute(
|
service,
|
||||||
&insert_statement,
|
entry,
|
||||||
&[&prio, &ip, &now, &date, &service, &log],
|
} = parse_line(&line)?;
|
||||||
)
|
if !blacklist.contains(service) {
|
||||||
.await?;
|
db.execute(
|
||||||
}
|
&insert_statement,
|
||||||
}
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
||||||
None => break,
|
)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/** parses a line, returning
|
struct ParsedLine<'a> {
|
||||||
* ( prio
|
prio: i16,
|
||||||
* , time the log was recieved
|
rcvtime: DateTime<Local>,
|
||||||
* , time the log was written according to logger
|
logtime: DateTime<Local>,
|
||||||
* , name of the service that wrote the log
|
// maybe this would be more correct, but i don't want to redo the database rn
|
||||||
* , log entry
|
//logtime: NaiveDateTime,
|
||||||
* )
|
service: &'a str,
|
||||||
*/
|
entry: &'a str,
|
||||||
fn parse_line(
|
}
|
||||||
line: &'_ str,
|
#[test]
|
||||||
) -> Result<
|
fn tst_timeparse() {
|
||||||
(
|
let input = "Jul 8 01:20:30";
|
||||||
i16,
|
let year = 2022;
|
||||||
DateTime<Local>,
|
let parsed = parse_log_date(year, input).unwrap();
|
||||||
DateTime<FixedOffset>,
|
assert_eq!(
|
||||||
&'_ str,
|
parsed,
|
||||||
&'_ str,
|
NaiveDateTime::parse_from_str("2022-07-08 01:20:30", "%Y-%m-%d %H:%M:%S").unwrap()
|
||||||
),
|
)
|
||||||
Error,
|
}
|
||||||
> {
|
|
||||||
|
fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
||||||
|
let mut parts = input.split(" ").map(|p| p.trim()).filter(|p| p.len() > 0);
|
||||||
|
let month = Month::from_str(parts.next().ok_or("no month")?)
|
||||||
|
.map_err(|e| format!("month parsing error: {:?}", e))?;
|
||||||
|
let day: u32 = parts.next().ok_or("no day")?.parse()?;
|
||||||
|
let date =
|
||||||
|
NaiveDate::from_ymd_opt(year, month.number_from_month(), day).ok_or("invalid day+moth")?;
|
||||||
|
let time = NaiveTime::parse_from_str(parts.next().ok_or("no time")?, "%H:%M:%S")?;
|
||||||
|
Ok(NaiveDateTime::new(date, time))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_line(line: &'_ str) -> Result<ParsedLine<'_>, Error> {
|
||||||
let mut prio_and_remainder = line.splitn(2, '>');
|
let mut prio_and_remainder = line.splitn(2, '>');
|
||||||
let prio = prio_and_remainder
|
let prio = prio_and_remainder
|
||||||
.next()
|
.next()
|
||||||
|
@ -170,13 +177,11 @@ fn parse_line(
|
||||||
.expect("splitn should always return a second part");
|
.expect("splitn should always return a second part");
|
||||||
let (date, line) = line.split_at(16);
|
let (date, line) = line.split_at(16);
|
||||||
|
|
||||||
// we need to prepend the current year and timezone, as that is not stated in the logfile
|
let rcvtime = chrono::Local::now();
|
||||||
let now = chrono::Local::now();
|
// we need to prepend the current year and add timezone, as that is not stated in the logfile
|
||||||
let mut base = format!("{}", now.format("%Y %z "));
|
let logtime = parse_log_date(rcvtime.date_naive().year(), date)
|
||||||
base.push_str(date);
|
.map_err(|e| format!("could not parse logtime {}{} {}", date, line, e))?;
|
||||||
|
let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap();
|
||||||
let date = DateTime::parse_from_str(&base, "%Y %z %b %e %H:%M:%S ")
|
|
||||||
.map_err(|e| format!("could not parse {}{} {}", date, line, e))?;
|
|
||||||
|
|
||||||
let mut parts = line.splitn(2, ':');
|
let mut parts = line.splitn(2, ':');
|
||||||
|
|
||||||
|
@ -187,6 +192,12 @@ fn parse_line(
|
||||||
.ok_or("could not split pid from service")?
|
.ok_or("could not split pid from service")?
|
||||||
.trim();
|
.trim();
|
||||||
|
|
||||||
let log = parts.next().ok_or("could not parse logfile")?.trim();
|
let entry = parts.next().ok_or("could not parse logfile")?.trim();
|
||||||
Ok((prio, now, date, service, log))
|
Ok(ParsedLine {
|
||||||
|
prio,
|
||||||
|
rcvtime,
|
||||||
|
logtime,
|
||||||
|
service,
|
||||||
|
entry,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue