Skip to content

Commit

Permalink
Migrate from chrono to time (#68)
Browse files Browse the repository at this point in the history
* Migrate from chrono to time

* Migrate from chrono to time

* Migrate from chrono to time

Co-authored-by: Regis Oliveira <[email protected]>
  • Loading branch information
rejao and Regis Oliveira authored Mar 28, 2022
1 parent b9990f0 commit 0b0c37c
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 165 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Changelog

All notable changes to the time project will be documented in this file.
---



<a name="0.3.0"></a>
### 0.3.0 (2022-03-14)

#### Breaking Changes

* Migrate from chrono to [time](https://docs.rs/time/latest/time/)
* String formatting changed from strftime to [time](https://docs.rs/time/latest/time/format_description/index.html) custom formatting
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2018"
name = "flowgger"
version = "0.2.14"
version = "0.3.0"
authors = ["Frank Denis <[email protected]>", "Matteo Bigoi <[email protected]>", "Vivien Chene <[email protected]>", "Francesco Berni <[email protected]>"]
build = "build.rs"
repository = "https://github.com/awslabs/flowgger"
Expand Down Expand Up @@ -38,8 +38,6 @@ optional = true

[dependencies]
capnp = { version = "0.14", optional = true }
chrono = "0.4"
chrono-tz = "0.5"
clap = "3"
flate2 = "1"
glob = { version = "0.3", optional = true }
Expand All @@ -53,7 +51,8 @@ serde = { version = "1", optional = true }
serde_json = { version = "~0.8", optional = true }
may = { version = "~0.3", optional = true }
toml = "0.5"
time = "0.3"
time = { version = "0.3", features = ["parsing", "formatting"] }
time-tz = "0.3"

[dev-dependencies]
tempdir = "0.3"
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

[![Build Status](https://app.travis-ci.com/awslabs/flowgger.svg?branch=master)](https://app.travis-ci.com/awslabs/flowgger) [![License: BSD2](https://img.shields.io/badge/License-BSD2-brightgreen.svg)](https://github.com/awslabs/flowgger/blob/master/LICENSE)

<a name="0.3.0"></a>
### New major version: 0.3.0 (2022-03-14)

#### Breaking Changes

* Migrate from chrono to [time](https://docs.rs/time/latest/time/) as per https://rustsec.org/advisories/RUSTSEC-2020-0071
* String formatting changed from strftime to [time](https://docs.rs/time/latest/time/format_description/index.html) custom formatting - see ```flowgger.toml``` for examples on change

---

Flowgger is a fast, simple and lightweight data collector written in Rust.

It reads log entries over a given protocol, extracts them, decodes them using a
Expand Down
7 changes: 4 additions & 3 deletions flowgger.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ file_rotation_size = 2048
file_rotation_time = 2

# Optional: When time rotation is enabled, the timestamp format is appended to the filenames.
# Default is set to "%Y%m%dT%H%M%SZ". Format must conform to https://docs.rs/chrono/0.3.1/chrono/format/strftime/index.html
file_rotation_timeformat = "%Y%m%dT%H%M%SZ"
# Default is set to "[year][month][day]T[hour][minute][second]Z".
# Format must conform to https://docs.rs/time/0.3.7/time/format_description/index.html
file_rotation_timeformat = "[year][month][day]T[hour][minute][second]Z"

# Optional, only used if either file_rotation_size or file_rotation_time is set:
# Specifies number of rotation files to use. The default value is 50.
Expand Down Expand Up @@ -158,4 +159,4 @@ framing = "line"
# "rfc3164" or "rfc5424" or "passthrough"
format = "rfc3164"
# Format of the optional timestamp to be prepended to each event
syslog_prepend_timestamp="[%Y-%m-%dT%H:%M:%S%.6fZ]"
syslog_prepend_timestamp="[[[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6]Z]"
50 changes: 42 additions & 8 deletions src/flowgger/decoder/ltsv_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use super::Decoder;
use crate::flowgger::config::Config;
use crate::flowgger::record::{Record, SDValue, SDValueType, StructuredData};
use crate::flowgger::utils;
use chrono::DateTime;
use std::collections::HashMap;
use time::format_description::well_known::Rfc3339;
use time::{format_description, OffsetDateTime};

#[derive(Clone)]
struct Suffixes {
Expand Down Expand Up @@ -221,23 +222,41 @@ impl Decoder for LTSVDecoder {
}

fn rfc3339_to_unix(rfc3339: &str) -> Result<f64, &'static str> {
match DateTime::parse_from_rfc3339(rfc3339) {
Ok(date) => Ok(utils::PreciseTimestamp::from_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date"),
match OffsetDateTime::parse(rfc3339, &Rfc3339) {
Ok(date) => Ok(utils::PreciseTimestamp::from_offset_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date from RFC3339 to Unix in LTSV decoder"),
}
}

fn english_time_to_unix(et: &str) -> Result<f64, &'static str> {
match DateTime::parse_from_str(et, "%e/%b/%Y:%H:%M:%S%.f %z") {
Ok(date) => Ok(utils::PreciseTimestamp::from_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date"),
english_time_to_unix_with_subsecond(et, false)
.or_else(|_| english_time_to_unix_with_subsecond(et, true))
}

fn english_time_to_unix_with_subsecond(
et: &str,
with_subsecond: bool,
) -> Result<f64, &'static str> {
let mut format_str =
"[day padding:none]/[month repr:short]/[year]:[hour]:[minute]:[second].[subsecond] \
[offset_hour sign:mandatory][offset_minute]"
.to_string();

if !with_subsecond {
format_str = format_str.replace(".[subsecond]", "");
}

let format_item = format_description::parse(&format_str).unwrap();
match OffsetDateTime::parse(&et, &format_item) {
Ok(date) => Ok(utils::PreciseTimestamp::from_offset_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the English to Unix timestamp in LTSV decoder"),
}
}

fn unix_strtime_to_unix(et: &str) -> Result<f64, &'static str> {
match et.parse::<f64>() {
Ok(ts) => Ok(ts),
Err(_) => Err("Unable to parse the date"),
Err(_) => Err("Unable to parse the date from Unix strtime to Unix in LTSV decoder"),
}
}

Expand Down Expand Up @@ -451,3 +470,18 @@ fn test_ltsv_3() {
false
}));
}

#[test]
fn test_ltsv4() {
let config = Config::from_string(
"[input]\n[input.ltsv_schema]\ncounter = \"u64\"\nscore = \
\"i64\"\nmean = \"f64\"\ndone = \"bool\"\n",
);
let ltsv_decoder = LTSVDecoder::new(&config.unwrap());
let msg =
"time:[5/Aug/2015:15:53:45.637824 -0000]\thost:testhostname\tname1:value1\tname 2: value \
2\tn3:v3";
let res = ltsv_decoder.decode(msg).unwrap();
println!("{}", res.ts);
assert!(res.ts == 1_438_790_025.637_824);
}
68 changes: 39 additions & 29 deletions src/flowgger/decoder/rfc3164_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use super::Decoder;
use crate::flowgger::config::Config;
use crate::flowgger::record::Record;
use crate::flowgger::utils;
use chrono::{Datelike, NaiveDateTime, TimeZone, Utc};
use chrono_tz::Tz;
use std::io::{stderr, Write};
use time::{format_description, OffsetDateTime, PrimitiveDateTime};
use time_tz::timezones::get_by_name;
use time_tz::PrimitiveDateTimeExt;

#[derive(Clone)]
pub struct RFC3164Decoder {}
Expand Down Expand Up @@ -169,49 +170,60 @@ fn parse_date<'a>(
// If no year in the string, parse manually add the current year
if has_year {
idx = 4;
ts_str = ts_tokens[0..idx].join(" ");
ts_str = match ts_tokens.get(0..idx) {
Some(str) => str.join(" "),
None => return Err("Unable to parse RFC3164 date with year"),
};
} else {
idx = 3;
let current_year = Utc::now().year();
ts_str = format!("{} {}", current_year, ts_tokens[0..idx].join(" "));
let current_year = OffsetDateTime::now_utc().year();
ts_str = match ts_tokens.get(0..idx) {
Some(str) => format!("{} {}", current_year, str.join(" ")),
None => return Err("Unable to parse RFC3164 date without year"),
};
}

match NaiveDateTime::parse_from_str(&ts_str, "%Y %b %d %H:%M:%S") {
Ok(naive_dt) => {
let format_item = format_description::parse(
"[year] [month repr:short] [day padding:none] [hour]:[minute]:[second]",
)
.unwrap();
match PrimitiveDateTime::parse(&ts_str, &format_item) {
Ok(primitive_date) => {
// See if the next token is a timezone
let mut ts = 0.0;
let tz_res: Result<Tz, String> = if ts_tokens.len() > idx {
ts_tokens[idx].parse()
let ts: f64;
let tz_res = if ts_tokens.len() > idx {
get_by_name(ts_tokens[idx]).ok_or("No timezone".to_string())
} else {
Err("No timezone".to_string())
};

if let Ok(tz) = tz_res {
let dt = tz.from_local_datetime(&naive_dt).single();
if dt.is_some() {
ts = utils::PreciseTimestamp::from_datetime_tz(dt.unwrap()).as_f64();
idx += 1;
}
let dt = primitive_date.assume_timezone(tz);
ts = utils::PreciseTimestamp::from_offset_datetime(dt).as_f64();
idx += 1;
}
// No timezome, give a timestamp without tz
else {
ts = utils::PreciseTimestamp::from_naive_datetime(naive_dt).as_f64();
ts = utils::PreciseTimestamp::from_primitive_datetime(primitive_date).as_f64();
}
Ok((ts, ts_tokens[idx..].to_vec()))
}
Err(_err) => Err("Unable to parse date"),
Err(_) => Err("Unable to parse the date in RFC3164 decoder"),
}
}

#[cfg(test)]
use crate::flowgger::utils::test_utils::rfc_test_utils::{
ts_from_date_time, ts_from_partial_date_time,
};
#[cfg(test)]
use time::Month;

#[test]
fn test_rfc3164_decode_nopri() {
let msg = r#"Aug 6 11:15:24 testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
let expected_ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -231,7 +243,7 @@ fn test_rfc3164_decode_nopri() {
fn test_rfc3164_decode_with_pri() {
let msg = r#"<13>Aug 6 11:15:24 testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
let expected_ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -251,7 +263,7 @@ fn test_rfc3164_decode_with_pri() {
fn test_rfc3164_decode_with_pri_year() {
let msg = r#"<13>2020 Aug 6 11:15:24 testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);
let expected_ts = ts_from_date_time(2020, Month::August, 6, 11, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -269,9 +281,9 @@ fn test_rfc3164_decode_with_pri_year() {

#[test]
fn test_rfc3164_decode_with_pri_year_tz() {
let msg = r#"<13>2020 Aug 6 11:15:24 UTC testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let msg = r#"<13>2020 Aug 6 05:15:24 America/Sao_Paulo testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);
let expected_ts = ts_from_date_time(2020, Month::August, 6, 08, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -291,7 +303,7 @@ fn test_rfc3164_decode_with_pri_year_tz() {
fn test_rfc3164_decode_tz_no_year() {
let msg = r#"Aug 6 11:15:24 UTC testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
let expected_ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand Down Expand Up @@ -329,11 +341,9 @@ fn test_rfc3164_decode_invalid_date() {

#[test]
fn test_rfc3164_decode_custom_with_year() {
// let msg = r#"testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message"#;
let msg = r#"testhostname: 2020 Aug 6 11:15:24 UTC: appname 69 42 some test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
// let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);
let expected_ts = ts_from_date_time(2020, Month::August, 6, 11, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -356,7 +366,7 @@ fn test_rfc3164_decode_custom_with_year() {
fn test_rfc3164_decode_custom_with_year_notz() {
let msg = r#"testhostname: 2019 Mar 27 12:09:39: appname: a test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);
let expected_ts = ts_from_date_time(2019, Month::March, 27, 12, 9, 39, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -376,7 +386,7 @@ fn test_rfc3164_decode_custom_with_year_notz() {
fn test_rfc3164_decode_custom_with_pri() {
let msg = r#"<13>testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);
let expected_ts = ts_from_date_time(2019, Month::March, 27, 12, 9, 39, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand All @@ -396,7 +406,7 @@ fn test_rfc3164_decode_custom_with_pri() {
fn test_rfc3164_decode_custom_trimed() {
let msg = "<13>testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message \n";
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);
let expected_ts = ts_from_date_time(2019, Month::March, 27, 12, 9, 39, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
Expand Down
9 changes: 5 additions & 4 deletions src/flowgger/decoder/rfc5424_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use super::Decoder;
use crate::flowgger::config::Config;
use crate::flowgger::record::{Record, SDValue, StructuredData};
use crate::flowgger::utils;
use chrono::DateTime;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

#[derive(Clone)]
pub struct RFC5424Decoder;
Expand Down Expand Up @@ -91,9 +92,9 @@ fn parse_pri_version(line: &str) -> Result<Pri, &'static str> {
}

fn rfc3339_to_unix(rfc3339: &str) -> Result<f64, &'static str> {
match DateTime::parse_from_rfc3339(rfc3339) {
Ok(date) => Ok(utils::PreciseTimestamp::from_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date"),
match OffsetDateTime::parse(rfc3339, &Rfc3339) {
Ok(date) => Ok(utils::PreciseTimestamp::from_offset_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date from RFC3339 to Unix time in RFC5424 decoder"),
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/flowgger/encoder/ltsv_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ impl Encoder for LTSVEncoder {
use crate::flowgger::record::StructuredData;
#[cfg(test)]
use crate::flowgger::utils::test_utils::rfc_test_utils::ts_from_partial_date_time;
#[cfg(test)]
use time::Month;

#[test]
fn test_ltsv_full_encode_no_sd() {
let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message";
let expected_msg = "host:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42";
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap();
let ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24);

let record = Record {
ts,
Expand All @@ -159,7 +161,7 @@ fn test_ltsv_full_encode_multiple_sd() {
let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message";
let expected_msg = "a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42";
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap();
let ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24);

let record = Record {
ts,
Expand Down
Loading

0 comments on commit 0b0c37c

Please sign in to comment.