vector/transforms/
mod.rs

1#![allow(missing_docs)]
2#[allow(unused_imports)]
3use std::collections::HashSet;
4
5pub mod dedupe;
6pub mod reduce;
7#[cfg(feature = "transforms-impl-sample")]
8pub mod sample;
9
10#[cfg(feature = "transforms-aggregate")]
11pub mod aggregate;
12#[cfg(feature = "transforms-aws_ec2_metadata")]
13pub mod aws_ec2_metadata;
14#[cfg(feature = "transforms-delay")]
15pub mod delay;
16#[cfg(feature = "transforms-exclusive-route")]
17mod exclusive_route;
18#[cfg(feature = "transforms-filter")]
19pub mod filter;
20#[cfg(feature = "transforms-incremental_to_absolute")]
21pub mod incremental_to_absolute;
22#[cfg(feature = "transforms-log_to_metric")]
23pub mod log_to_metric;
24#[cfg(feature = "transforms-lua")]
25pub mod lua;
26#[cfg(feature = "transforms-metric_to_log")]
27pub mod metric_to_log;
28#[cfg(feature = "transforms-remap")]
29pub mod remap;
30#[cfg(feature = "transforms-route")]
31pub mod route;
32#[cfg(feature = "transforms-tag_cardinality_limit")]
33pub mod tag_cardinality_limit;
34#[cfg(feature = "transforms-throttle")]
35pub mod throttle;
36#[cfg(feature = "transforms-trace_to_log")]
37pub mod trace_to_log;
38#[cfg(feature = "transforms-window")]
39pub mod window;
40
41pub use vector_lib::transform::{
42    FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs,
43    TransformOutputsBuf,
44};
45
46#[cfg(test)]
47mod test {
48    use futures::Stream;
49    use futures_util::SinkExt;
50    use tokio::sync::mpsc;
51    use tokio_util::sync::PollSender;
52    use vector_lib::transform::FunctionTransform;
53
54    use crate::{
55        config::{
56            ConfigBuilder, TransformConfig,
57            unit_test::{UnitTestStreamSinkConfig, UnitTestStreamSourceConfig},
58        },
59        event::Event,
60        test_util::start_topology,
61        topology::RunningTopology,
62        transforms::OutputBuffer,
63    };
64
65    /// Transform a single `Event` through the `FunctionTransform`
66    ///
67    /// # Panics
68    ///
69    /// If `ft` attempts to emit more than one `Event` on transform this
70    /// function will panic.
71    // We allow dead_code here to avoid unused warnings when we compile our
72    // benchmarks as tests. It's a valid warning -- the benchmarks don't use
73    // this function -- but flagging this function off for bench flags will
74    // issue a unused warnings about the import above.
75    #[allow(dead_code)]
76    pub fn transform_one(ft: &mut dyn FunctionTransform, event: Event) -> Option<Event> {
77        let mut buf = OutputBuffer::with_capacity(1);
78        ft.transform(&mut buf, event);
79        assert!(buf.len() <= 1);
80        buf.into_events().next()
81    }
82
83    #[allow(dead_code)]
84    pub async fn create_topology<T: TransformConfig + 'static>(
85        events: impl Stream<Item = Event> + Send + 'static,
86        transform_config: T,
87    ) -> (RunningTopology, mpsc::Receiver<Event>) {
88        let mut builder = ConfigBuilder::default();
89
90        let (tx, rx) = mpsc::channel(1);
91
92        // TODO: Use non-hard-coded names to improve tests.
93        builder.add_source("in", UnitTestStreamSourceConfig::new(events));
94        builder.add_transform("transform", &["in"], transform_config);
95        builder.add_sink(
96            "out",
97            &["transform"],
98            UnitTestStreamSinkConfig::new(
99                PollSender::new(tx).sink_map_err(|error| panic!("{}", error)),
100            ),
101        );
102
103        let config = builder.build().expect("building config should not fail");
104        let (topology, _) = start_topology(config, false).await;
105
106        (topology, rx)
107    }
108}