finished (untested) udp support
This commit is contained in:
parent
dc73d26b01
commit
87815efc24
1 changed files with 81 additions and 20 deletions
101
src/main.rs
101
src/main.rs
|
@ -7,9 +7,13 @@ use std::net::SocketAddr;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::select;
|
||||||
use tokio_postgres::{Client, Config, NoTls, Statement};
|
use tokio_postgres::{Client, Config, NoTls, Statement};
|
||||||
|
|
||||||
|
// todo: do better errorhandling
|
||||||
|
|
||||||
type Error = Box<dyn std::error::Error>;
|
type Error = Box<dyn std::error::Error>;
|
||||||
|
|
||||||
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
/// This application recieves logfiles in ubiquitis weird logfile format from the network,
|
||||||
|
@ -65,29 +69,90 @@ async fn main() -> Result<(), Error> {
|
||||||
let insert_statement = client
|
let insert_statement = client
|
||||||
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
.prepare("insert into log(prio, rcv_ip, rcv_date, date, daemon, message) values ($1, $2, $3, $4, $5, $6)").await?;
|
||||||
|
|
||||||
let listener = TcpListener::bind(&options.addr)
|
let tcp_listener = TcpListener::bind(&options.addr)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("could not bind to {} with error {:?}", &options.addr, e))?;
|
.map_err(|e| format!("could not bind to tcp:{} with error {:?}", &options.addr, e))?;
|
||||||
|
let udp_socket = UdpSocket::bind(&options.addr)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("could not bind to udp:{} with error {:?}", &options.addr, e))?;
|
||||||
|
|
||||||
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
let blacklist: HashSet<_> = options.blacklist.into_iter().collect();
|
||||||
let blacklist = Arc::new(blacklist);
|
let blacklist = Arc::new(blacklist);
|
||||||
|
|
||||||
|
// rfc says max length for messages is 1024
|
||||||
|
let mut buf = [0; 1024];
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match listener.accept().await {
|
//todo: possibly better implemented by just running two tokio::spawn tasks that loop
|
||||||
Ok((socket, peer)) => {
|
select! {
|
||||||
tokio::spawn(handle_peer_and_error(
|
tcp = tcp_listener.accept() => match tcp {
|
||||||
socket,
|
Ok((socket, peer)) => {
|
||||||
peer,
|
tokio::spawn(handle_peer_and_error(
|
||||||
client.clone(),
|
socket,
|
||||||
insert_statement.clone(),
|
peer,
|
||||||
blacklist.clone(),
|
client.clone(),
|
||||||
));
|
insert_statement.clone(),
|
||||||
}
|
blacklist.clone(),
|
||||||
Err(e) => eprintln!("{:?}", e),
|
));
|
||||||
|
}
|
||||||
|
Err(e) => eprintln!("tcp error: {:?}", e),
|
||||||
|
},
|
||||||
|
udp = udp_socket.recv_from(&mut buf) => match udp {
|
||||||
|
Ok((len, addr)) => {
|
||||||
|
let line = &buf[0..len];
|
||||||
|
let line = match std::str::from_utf8(&line) {
|
||||||
|
Ok(l) => l,
|
||||||
|
Err(e) => {eprintln!("udp packet is not valid utf8: {e}"); continue},
|
||||||
|
};
|
||||||
|
let line: String = line.into();
|
||||||
|
|
||||||
|
tokio::spawn(handle_udp_and_error(
|
||||||
|
line, addr, client.clone(), insert_statement.clone(),blacklist.clone()));
|
||||||
|
|
||||||
|
},
|
||||||
|
Err(e) => eprintln!("udp error: {:?}", e),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_udp_and_error<S: AsRef<str>>(
|
||||||
|
line: S,
|
||||||
|
peer: SocketAddr,
|
||||||
|
db: Arc<Client>,
|
||||||
|
insert_statement: Statement,
|
||||||
|
blacklist: Arc<HashSet<String>>,
|
||||||
|
) {
|
||||||
|
if let Err(e) = handle_udp(line.as_ref(), peer, db, insert_statement, blacklist).await {
|
||||||
|
eprintln!("udp line error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_udp(
|
||||||
|
line: &'_ str,
|
||||||
|
peer: SocketAddr,
|
||||||
|
db: Arc<Client>,
|
||||||
|
insert_statement: Statement,
|
||||||
|
blacklist: Arc<HashSet<String>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let ip = peer.ip();
|
||||||
|
let ParsedLine {
|
||||||
|
prio,
|
||||||
|
rcvtime,
|
||||||
|
logtime,
|
||||||
|
service,
|
||||||
|
entry,
|
||||||
|
} = parse_line(&line)?;
|
||||||
|
if !blacklist.contains(service) {
|
||||||
|
db.execute(
|
||||||
|
&insert_statement,
|
||||||
|
&[&prio, &ip, &rcvtime, &logtime, &service, &entry],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_peer_and_error(
|
async fn handle_peer_and_error(
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
|
@ -96,7 +161,7 @@ async fn handle_peer_and_error(
|
||||||
blacklist: Arc<HashSet<String>>,
|
blacklist: Arc<HashSet<String>>,
|
||||||
) {
|
) {
|
||||||
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
if let Err(e) = handle_peer(stream, peer, db, insert_statement, blacklist).await {
|
||||||
eprintln!("{}", e);
|
eprintln!("tcp line error: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +219,6 @@ fn tst_timeparse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
||||||
let mut parts = input.split(" ").map(|p| p.trim()).filter(|p| p.len() > 0);
|
|
||||||
let (month, input) = input.split_at(3);
|
let (month, input) = input.split_at(3);
|
||||||
let (day, time) = input.split_at(4);
|
let (day, time) = input.split_at(4);
|
||||||
let day = day.trim();
|
let day = day.trim();
|
||||||
|
@ -188,9 +252,7 @@ fn parse_log_date(year: i32, input: &'_ str) -> Result<NaiveDateTime, Error> {
|
||||||
/// * termination: a non-{alphanum, space, -} character, usually [ or :
|
/// * termination: a non-{alphanum, space, -} character, usually [ or :
|
||||||
/// * content: rest of the line
|
/// * content: rest of the line
|
||||||
fn parse_line(line: &'_ str) -> Result<ParsedLine<'_>, Error> {
|
fn parse_line(line: &'_ str) -> Result<ParsedLine<'_>, Error> {
|
||||||
let (prio, line) = line
|
let (prio, line) = line.split_once('>').ok_or("log did not contain priority")?;
|
||||||
.split_once('>')
|
|
||||||
.expect("log did not contain priority")?;
|
|
||||||
let prio = &prio[1..];
|
let prio = &prio[1..];
|
||||||
let prio = prio
|
let prio = prio
|
||||||
.parse()
|
.parse()
|
||||||
|
@ -204,9 +266,8 @@ fn parse_line(line: &'_ str) -> Result<ParsedLine<'_>, Error> {
|
||||||
let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap();
|
let logtime = TimeZone::from_local_datetime(&Local, &logtime).unwrap();
|
||||||
|
|
||||||
// this slightly differs from the rfc: - is considered non-terminating
|
// this slightly differs from the rfc: - is considered non-terminating
|
||||||
let terminator = |c| !((c == '-') || c.is_alphanumeric());
|
|
||||||
let (service, entry) = line
|
let (service, entry) = line
|
||||||
.split_once(terminator)
|
.split_once(|c: char| !((c == '-') || c.is_alphanumeric()))
|
||||||
.ok_or("invalid service or message")?;
|
.ok_or("invalid service or message")?;
|
||||||
Ok(ParsedLine {
|
Ok(ParsedLine {
|
||||||
prio,
|
prio,
|
||||||
|
|
Loading…
Reference in a new issue