Skip to content

Commit e32661a

Browse files
feat(aws_s3 sink): add Parquet encoder with schema_file and auto infer schema support (#25156)
* feat(codecs): add Parquet encoder with schema_file and schema_mode support * fix broken tests * remove benches stuff * fix the integration tests * Add compression level to compression type * update teh aws_s3.cue file * Minor updates based on my own PR review of szibis initial PR * fix minor issue from batch encoder * fix tests from szibis PR * Fix the tests * Fix cargo fmt and deny issues * fix cargo markdown and license errors * Revert cargo.lock serde_json back to what it was * rebuild cargo.lock, don't want to pull in non needed upgrades * update warning and licenses * revert weird test updates * merge internal_events * cargo fmt fix * Throw a validation error if someone tries to use arrow encoding with amazon s3 sink * update licenses after re-installing dd-rust-license-tool * Short circuit when there's a serialization error instead of just emitting an error * PR comments feedback * emit dropped events metric when there's an error in encoding * put JsonSerializationError under parquet feature * emit ArrowWriterError and dropped events * remove formatting changes in cargo.toml * Apply suggestions from May's code review Co-authored-by: May Lee <may.lee@datadoghq.com> * remove whitespace updates from lib/codecs/cargo.toml --------- Co-authored-by: May Lee <may.lee@datadoghq.com>
1 parent 16458a8 commit e32661a

21 files changed

Lines changed: 1818 additions & 43 deletions

File tree

.github/actions/spelling/expect.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,7 @@ transitioning
604604
Trauring
605605
Treemap
606606
trialled
607+
Trino
607608
Trivago
608609
trivy
609610
trl

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ cargo-timing*.html
44
target
55
tests/data/wasm/*/target
66
bench_output.txt
7+
rust-analyzer.toml
78

89
# Python
910
*.pyc
@@ -64,3 +65,9 @@ volumes/
6465

6566
# LLM tools
6667
copilot-instructions.md
68+
69+
# local files not checked in
70+
local/
71+
72+
# vscode
73+
.vscode/

Cargo.lock

Lines changed: 64 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ tonic-build = { version = "0.11", default-features = false, features = ["transpo
204204
tonic-health = { version = "0.11", default-features = false }
205205
tonic-reflection = { version = "0.11", default-features = false, features = ["server"] }
206206
tracing = { version = "0.1.44", default-features = false }
207-
tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt"] }
207+
tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt"] }
208208
url = { version = "2.5.4", default-features = false, features = ["serde"] }
209209
uuid = { version = "1.22.0", features = ["v4", "v7", "serde", "fast-rng"] }
210210
vector-config = { path = "lib/vector-config" }
@@ -353,6 +353,13 @@ async-compression = { version = "0.4.27", default-features = false, features = [
353353
apache-avro = { version = "0.16.0", default-features = false, optional = true }
354354
arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true }
355355
arrow-schema = { version = "56.2.0", default-features = false, optional = true }
356+
parquet = { version = "56.2.0", default-features = false, features = [
357+
"arrow",
358+
"snap",
359+
"zstd",
360+
"lz4",
361+
"flate2-rust_backened",
362+
], optional = true }
356363
axum = { version = "0.6.20", default-features = false }
357364
base64 = { workspace = true, optional = true }
358365
bloomy = { version = "1.2.0", default-features = false, optional = true }
@@ -609,6 +616,7 @@ enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]
609616

610617
# Codecs
611618
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
619+
codecs-parquet = ["dep:parquet", "codecs-arrow", "vector-lib/parquet"]
612620
codecs-opentelemetry = ["vector-lib/opentelemetry"]
613621
codecs-syslog = ["vector-lib/syslog"]
614622

LICENSE-3rdparty.csv

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun <mail@hannobraun.
382382
inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun <hb@hannobraun.de>
383383
inout,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
384384
instability,https://github.com/ratatui-org/instability,MIT,"Stephen M. Coakley <me@stephencoakley.com>, Joshka"
385+
integer-encoding,https://github.com/dermesser/integer-encoding-rs,MIT,Lewin Bormann <lbo@spheniscida.de>
385386
inventory,https://github.com/dtolnay/inventory,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
386387
ipconfig,https://github.com/liranringel/ipconfig,MIT OR Apache-2.0,Liran Ringel <liranringel@gmail.com>
387388
ipcrypt-rs,https://github.com/jedisct1/rust-ipcrypt2,ISC,Frank Denis <github@pureftpd.org>
@@ -533,8 +534,10 @@ pad,https://github.com/ogham/rust-pad,MIT,Ben S <ogham@bsago.me>
533534
parking,https://github.com/smol-rs/parking,Apache-2.0 OR MIT,"Stjepan Glavina <stjepang@gmail.com>, The Rust Project Developers"
534535
parking_lot,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras <amanieu@gmail.com>
535536
parking_lot_core,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras <amanieu@gmail.com>
537+
parquet,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow <dev@arrow.apache.org>
536538
parse-size,https://github.com/kennytm/parse-size,MIT,kennytm <kennytm@gmail.com>
537539
passt,https://github.com/kevingimbel/passt,MIT OR Apache-2.0,Kevin Gimbel <hallo@kevingimbel.com>
540+
paste,https://github.com/dtolnay/paste,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
538541
pastey,https://github.com/as1100k/pastey,MIT OR Apache-2.0,"Aditya Kumar <git@adityais.dev>, David Tolnay <dtolnay@gmail.com>"
539542
pbkdf2,https://github.com/RustCrypto/password-hashes/tree/master/pbkdf2,MIT OR Apache-2.0,RustCrypto Developers
540543
peeking_take_while,https://github.com/fitzgen/peeking_take_while,MIT OR Apache-2.0,Nick Fitzgerald <fitzgen@gmail.com>
@@ -674,6 +677,7 @@ secrecy,https://github.com/iqlusioninc/crates/tree/main/secrecy,Apache-2.0 OR MI
674677
security-framework,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler <sfackler@gmail.com>, Kornel <kornel@geekhood.net>"
675678
security-framework-sys,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler <sfackler@gmail.com>, Kornel <kornel@geekhood.net>"
676679
semver,https://github.com/dtolnay/semver,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
680+
seq-macro,https://github.com/dtolnay/seq-macro,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
677681
serde,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar <erick.tryzelaar@gmail.com>, David Tolnay <dtolnay@gmail.com>"
678682
serde-aux,https://github.com/iddm/serde-aux,MIT,Victor Polevoy <maintainer@vpolevoy.com>
679683
serde-toml-merge,https://github.com/jdrouet/serde-toml-merge,MIT,Jeremie Drouet <jeremie.drouet@gmail.com>
@@ -756,6 +760,7 @@ terminal_size,https://github.com/eminence/terminal-size,MIT OR Apache-2.0,Andrew
756760
thiserror,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
757761
thiserror-impl,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
758762
thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanieu d'Antras <amanieu@gmail.com>
763+
thrift,https://github.com/apache/thrift/tree/master/lib/rs,Apache-2.0,Apache Thrift Developers <dev@thrift.apache.org>
759764
tikv-jemalloc-sys,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton <alex@alexcrichton.com>, Gonzalo Brito Gadeschi <gonzalobg88@gmail.com>, The TiKV Project Developers"
760765
tikv-jemallocator,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton <alex@alexcrichton.com>, Gonzalo Brito Gadeschi <gonzalobg88@gmail.com>, Simon Sapin <simon.sapin@exyr.org>, Steven Fackler <sfackler@gmail.com>, The TiKV Project Developers"
761766
time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt <open-source@jhpratt.dev>, Time contributors"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Add Apache Parquet batch encoding support for the `aws_s3` sink with flexible schema definitions.
2+
3+
Events can now be encoded as Parquet columnar files with multiple schema input options:
4+
5+
- **Native Parquet schema** — automatically generate a schema or supply `.schema` file
6+
- **Configurable compression** - (Snappy, ZSTD, GZIP, LZ4, None).
7+
8+
Enable the `codecs-parquet` feature and configure `batch_encoding` with `codec = "parquet"` in the S3 sink configuration.
9+
10+
authors: szibis petere-datadog

deny.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ ignore = [
4444
{ id = "RUSTSEC-2024-0388", reason = "derivative is unmaintained (https://github.com/vectordotdev/vector/issues/24940)" },
4545
{ id = "RUSTSEC-2025-0134", reason = "rustls-pemfile is unmaintained - unpatched crate (https://github.com/bytebeamio/rumqtt/issues/1010) & tonic/reqwest upgrade (https://github.com/vectordotdev/vector/issues/19179)" },
4646
{ id = "RUSTSEC-2026-0049", reason = "rustls-webpki 0.102 is vulnerable - tonic upgrade (https://github.com/vectordotdev/vector/issues/19179)" },
47+
{ id = "RUSTSEC-2024-0436", reason = "paste crate is unmaintained - transitive dependency via parquet v56.2.0, no safe upgrade available" },
4748
]

lib/codecs/Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,17 @@ path = "tests/bin/generate-avro-fixtures.rs"
1515
[dependencies]
1616
apache-avro = { version = "0.20.0", default-features = false }
1717
arrow = { version = "56.2.0", default-features = false, features = ["ipc", "json"], optional = true }
18+
parquet = { version = "56.2.0", default-features = false, features = [
19+
"arrow",
20+
"snap",
21+
"zstd",
22+
"lz4",
23+
"flate2-rust_backened",
24+
], optional = true }
1825
async-trait.workspace = true
1926
bytes.workspace = true
2027
chrono.workspace = true
28+
chrono-tz.workspace = true
2129
rust_decimal.workspace = true
2230
csv-core = { version = "0.1.13", default-features = false }
2331
derivative.workspace = true
@@ -69,7 +77,8 @@ uuid.workspace = true
6977
vrl.workspace = true
7078

7179
[features]
72-
arrow = ["dep:arrow"]
80+
arrow = ["dep:arrow", "arrow/chrono-tz"]
81+
parquet = ["dep:parquet", "arrow"]
7382
opentelemetry = ["dep:opentelemetry-proto"]
7483
syslog = ["dep:syslog_loose", "dep:strum", "dep:derive_more", "dep:serde-aux", "dep:toml"]
7584
test = []

lib/codecs/src/encoding/encoder.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use vector_core::event::Event;
55

66
#[cfg(feature = "arrow")]
77
use crate::encoding::ArrowStreamSerializer;
8+
#[cfg(feature = "parquet")]
9+
use crate::encoding::ParquetSerializer;
810
use crate::{
911
encoding::{Error, Framer, Serializer},
1012
internal_events::{EncoderFramingError, EncoderSerializeError},
@@ -16,6 +18,9 @@ pub enum BatchSerializer {
1618
/// Arrow IPC stream format serializer.
1719
#[cfg(feature = "arrow")]
1820
Arrow(ArrowStreamSerializer),
21+
/// Parquet format serializer.
22+
#[cfg(feature = "parquet")]
23+
Parquet(Box<ParquetSerializer>),
1924
}
2025

2126
/// An encoder that encodes batches of events.
@@ -36,10 +41,12 @@ impl BatchEncoder {
3641
}
3742

3843
/// Get the HTTP content type.
39-
#[cfg(feature = "arrow")]
44+
#[cfg(any(feature = "arrow", feature = "parquet"))]
4045
pub const fn content_type(&self) -> &'static str {
4146
match &self.serializer {
4247
BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
48+
#[cfg(feature = "parquet")]
49+
BatchSerializer::Parquet(_) => "application/vnd.apache.parquet",
4350
}
4451
}
4552
}
@@ -63,6 +70,11 @@ impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
6370
}
6471
})
6572
}
73+
#[cfg(feature = "parquet")]
74+
BatchSerializer::Parquet(serializer) => serializer
75+
.encode(events, buffer)
76+
.map_err(Error::SerializingError),
77+
#[allow(unreachable_patterns)]
6678
_ => unreachable!("BatchSerializer cannot be constructed without encode()"),
6779
}
6880
}
@@ -74,7 +86,7 @@ pub enum EncoderKind {
7486
/// Uses framing to encode individual events
7587
Framed(Box<Encoder<Framer>>),
7688
/// Encodes events in batches without framing
77-
#[cfg(feature = "arrow")]
89+
#[cfg(any(feature = "arrow", feature = "parquet"))]
7890
Batch(BatchEncoder),
7991
}
8092

0 commit comments

Comments
 (0)