now deletes old entries every hour
This commit is contained in:
parent
6fb7b1c142
commit
37da2a4999
1 changed files with 37 additions and 9 deletions
46
src/main.rs
46
src/main.rs
|
@ -2,7 +2,6 @@ use chrono::{DateTime, Local};
|
||||||
use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone};
|
use chrono::{Datelike, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone};
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -68,6 +67,10 @@ async fn main() -> Result<(), Error> {
|
||||||
let insert_statement = client
|
let insert_statement = client
|
||||||
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
||||||
|
|
||||||
|
let delete_statement = client
|
||||||
|
.prepare("delete from log where rcv_date < now() - interval '3 months'")
|
||||||
|
.await?;
|
||||||
|
|
||||||
let tcp_listener = TcpListener::bind(&options.addr)
|
let tcp_listener = TcpListener::bind(&options.addr)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?;
|
.map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?;
|
||||||
|
@ -75,8 +78,21 @@ async fn main() -> Result<(), Error> {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?;
|
.map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?;
|
||||||
|
|
||||||
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
let blacklist = Arc::new(options.blacklist);
|
||||||
let blacklist = Arc::new(blacklist);
|
|
||||||
|
// garbage collection
|
||||||
|
let gc = {
|
||||||
|
let client = client.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = client.execute(&delete_statement, &[]).await {
|
||||||
|
eprintln!("error deleting old records {e}");
|
||||||
|
};
|
||||||
|
// once per hour
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
// tcp handling
|
// tcp handling
|
||||||
let tcp = {
|
let tcp = {
|
||||||
|
@ -140,6 +156,7 @@ async fn main() -> Result<(), Error> {
|
||||||
};
|
};
|
||||||
tcp.await?;
|
tcp.await?;
|
||||||
udp.await?;
|
udp.await?;
|
||||||
|
gc.await?;
|
||||||
// should be unreachable
|
// should be unreachable
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -149,19 +166,29 @@ async fn handle_udp_and_error<S: AsRef<str>>(
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
db: Arc<Client>,
|
db: Arc<Client>,
|
||||||
insert_statement: Statement,
|
insert_statement: Statement,
|
||||||
blacklist: Arc<HashSet<String>>,
|
blacklist: Arc<Vec<String>>,
|
||||||
) {
|
) {
|
||||||
if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await {
|
if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await {
|
||||||
eprintln!("udp line error: {e}");
|
eprintln!("udp line error: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// true if blacklist contains check
|
||||||
|
/// false otherwise
|
||||||
|
// currently linear time operation, but blacklist won't be very big
|
||||||
|
fn blacklist_matches<S: AsRef<str>> (blacklist: &[S], check: &str) -> bool{
|
||||||
|
for b in blacklist {
|
||||||
|
if check.starts_with(b.as_ref()) { return true }
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_udp(
|
async fn handle_udp(
|
||||||
line: &'_ str,
|
line: &'_ str,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
db: Arc<Client>,
|
db: Arc<Client>,
|
||||||
insert_statement: Statement,
|
insert_statement: Statement,
|
||||||
blacklist: Arc<HashSet<String>>,
|
blacklist: Arc<Vec<String>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let ip = peer.ip();
|
let ip = peer.ip();
|
||||||
let ParsedLine {
|
let ParsedLine {
|
||||||
|
@ -171,7 +198,7 @@ async fn handle_udp(
|
||||||
service,
|
service,
|
||||||
entry,
|
entry,
|
||||||
} = parse_line(&line)?;
|
} = parse_line(&line)?;
|
||||||
if !blacklist.contains(service) {
|
if !blacklist_matches(&blacklist, service) {
|
||||||
db.execute(
|
db.execute(
|
||||||
&insert_statement,
|
&insert_statement,
|
||||||
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
||||||
|
@ -186,7 +213,7 @@ async fn handle_peer_and_error(
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
db: Arc<Client>,
|
db: Arc<Client>,
|
||||||
insert_statement: Statement,
|
insert_statement: Statement,
|
||||||
blacklist: Arc<HashSet<String>>,
|
blacklist: Arc<Vec<String>>,
|
||||||
) {
|
) {
|
||||||
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
||||||
eprintln!("tcp line error: {}", e);
|
eprintln!("tcp line error: {}", e);
|
||||||
|
@ -198,7 +225,7 @@ async fn handle_peer(
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
db: Arc<Client>,
|
db: Arc<Client>,
|
||||||
insert_statement: Statement,
|
insert_statement: Statement,
|
||||||
blacklist: Arc<HashSet<String>>,
|
blacklist: Arc<Vec<String>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
use tokio_util::codec::{FramedRead, LinesCodec};
|
use tokio_util::codec::{FramedRead, LinesCodec};
|
||||||
|
|
||||||
|
@ -213,7 +240,7 @@ async fn handle_peer(
|
||||||
service,
|
service,
|
||||||
entry,
|
entry,
|
||||||
} = parse_line(&line)?;
|
} = parse_line(&line)?;
|
||||||
if !blacklist.contains(service) {
|
if !blacklist_matches(&blacklist, service) {
|
||||||
db.execute(
|
db.execute(
|
||||||
&insert_statement,
|
&insert_statement,
|
||||||
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
||||||
|
@ -271,6 +298,7 @@ fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
||||||
/// 123456789012345
|
/// 123456789012345
|
||||||
/// exactly 15 characters
|
/// exactly 15 characters
|
||||||
/// * separator: exactly one space
|
/// * separator: exactly one space
|
||||||
|
//fimxe: ubi-devices don't seem to send a hostname, wrt-ones do, how to fix?
|
||||||
/// * hostname: ipv4, ipv6 or (dns)-hostname (containing no spaces
|
/// * hostname: ipv4, ipv6 or (dns)-hostname (containing no spaces
|
||||||
/// * separator: exactly one space
|
/// * separator: exactly one space
|
||||||
/// * message: tag (usually daemon name) and message
|
/// * message: tag (usually daemon name) and message
|
||||||
|
|
Loading…
Reference in a new issue