vector/transforms/
delay.rs

1use std::{num::NonZeroUsize, pin::Pin, time::Duration};
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5use serde_with::serde_as;
6use snafu::Snafu;
7use tokio_util::time::DelayQueue;
8use vector_lib::configurable::configurable_component;
9use vector_lib::internal_event::INTENTIONAL;
10use vector_lib::{config::clone_input_definitions, internal_event::ComponentEventsDropped};
11
12use crate::{
13    conditions::{AnyCondition, Condition},
14    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
15    event::Event,
16    schema,
17    transforms::{TaskTransform, Transform},
18};
19
20/// Configuration for the `delay` transform.
21#[serde_as]
22#[configurable_component(transform("delay", "Slow down events passing through a topology."))]
23#[derive(Clone, Debug)]
24#[serde(deny_unknown_fields)]
25pub struct DelayConfig {
26    /// Time to delay each event, in milliseconds.
27    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
28    #[configurable(metadata(docs::human_name = "Delay in milliseconds", docs::example = 200))]
29    delay_ms: Duration,
30
31    /// Limit for number of items in the delay queue.
32    #[serde(default = "default_queue_capacity")]
33    queue_capacity: NonZeroUsize,
34
35    /// Strategy to handle full queue capacity.
36    #[serde(default)]
37    overflow_strategy: OverflowStrategy,
38
39    /// Delay events in provided delay periods until the condition is met.
40    condition: Option<AnyCondition>,
41}
42
43const fn default_queue_capacity() -> NonZeroUsize {
44    NonZeroUsize::new(500).expect("static non-zero number")
45}
46
47impl Default for DelayConfig {
48    fn default() -> Self {
49        Self {
50            delay_ms: Default::default(),
51            queue_capacity: default_queue_capacity(),
52            overflow_strategy: Default::default(),
53            condition: Default::default(),
54        }
55    }
56}
57
58/// Event handling behavior when delay queue is full.
59#[configurable_component]
60#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
61#[serde(rename_all = "snake_case")]
62pub enum OverflowStrategy {
63    /// Wait for free space in the queue.
64    ///
65    /// This applies backpressure up the topology, signalling that sources should slow down
66    /// the acceptance/consumption of events. This may cause the system to degenerate if this
67    /// component blocks for too long.
68    #[default]
69    Block,
70
71    /// Drops the event instead of waiting for free space in the queue.
72    ///
73    /// The event will be intentionally dropped. This mode is typically used when performance is the
74    /// highest priority, and it is preferable to temporarily lose events rather than cause a
75    /// slowdown in the acceptance/consumption of events.
76    DropNewest,
77
78    /// Forward the event without any delay to next component.
79    Forward,
80}
81
82impl_generate_config_from_default!(DelayConfig);
83
84#[async_trait::async_trait]
85#[typetag::serde(name = "delay")]
86impl TransformConfig for DelayConfig {
87    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
88        if self.delay_ms.as_millis() == 0 {
89            return Err(Box::new(BuildError::ZeroDelayDuration));
90        }
91        Ok(Transform::event_task(Delay::new(self, context)?))
92    }
93
94    fn input(&self) -> Input {
95        Input::all()
96    }
97
98    fn outputs(
99        &self,
100        _: &TransformContext,
101        input_definitions: &[(OutputId, schema::Definition)],
102    ) -> Vec<TransformOutput> {
103        // The event is not modified, so the definition is passed through as-is
104        vec![TransformOutput::new(
105            DataType::all_bits(),
106            clone_input_definitions(input_definitions),
107        )]
108    }
109}
110
111pub struct Delay {
112    delay: Duration,
113    queue: DelayQueue<Event>,
114    queue_capacity: NonZeroUsize,
115    overflow_strategy: OverflowStrategy,
116    condition: Option<Condition>,
117}
118
119impl Delay {
120    pub fn new(config: &DelayConfig, context: &TransformContext) -> crate::Result<Self> {
121        Ok(Self {
122            delay: config.delay_ms,
123            queue: DelayQueue::with_capacity(config.queue_capacity.get()),
124            queue_capacity: config.queue_capacity,
125            overflow_strategy: config.overflow_strategy,
126            condition: config
127                .condition
128                .as_ref()
129                .map(|c| c.build(&context.enrichment_tables, &context.metrics_storage))
130                .transpose()?,
131        })
132    }
133
134    fn check_condition(&self, event: Event, first: bool) -> (bool, Event) {
135        if let Some(condition) = self.condition.as_ref() {
136            condition.check(event)
137        } else {
138            // If this is the first check, we need to ensure at least one delay is
139            // done if no condition is configured
140            (!first, event)
141        }
142    }
143}
144
145impl TaskTransform<Event> for Delay {
146    fn transform(
147        mut self: Box<Self>,
148        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
149    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
150    where
151        Self: 'static,
152    {
153        Box::pin(stream! {
154            let mut done = false;
155            loop {
156                if done && self.queue.is_empty() {
157                    break;
158                }
159                tokio::select! {
160                    biased;
161
162                    Some(res) = self.queue.next() => {
163                        let event = res.into_inner();
164                        let (result, event) = self.check_condition(event, false);
165                        if result {
166                            yield event;
167                        } else {
168                            self.queue.insert(event, self.delay);
169                        }
170                        if done && self.queue.is_empty() {
171                            break;
172                        }
173                    },
174
175                    maybe_event = input_rx.next(), if !done => {
176                        match maybe_event {
177                            None => {
178                                done = true;
179                            }
180                            Some(event) => {
181                                let (result, event) = self.check_condition(event, true);
182                                if result {
183                                    yield event
184                                } else {
185                                    if self.queue_capacity.get() <= self.queue.len() {
186                                        match self.overflow_strategy {
187                                            OverflowStrategy::Block => {
188                                                while self.queue_capacity.get() <= self.queue.len() && let Some(next) = self.queue.next().await {
189                                                    let event = next.into_inner();
190                                                    let (result, event) = self.check_condition(event, false);
191                                                    if result {
192                                                        yield event;
193                                                    } else {
194                                                        self.queue.insert(event, self.delay);
195                                                    }
196                                                }
197                                            },
198                                            OverflowStrategy::DropNewest => {
199                                                emit!(ComponentEventsDropped::<INTENTIONAL> {
200                                                    count: 1,
201                                                    reason: "Queue is full and overflow strategy is drop_newest",
202                                                });
203                                                continue;
204                                            }
205                                            OverflowStrategy::Forward => {
206                                                yield event;
207                                                continue;
208                                            }
209                                        }
210                                    }
211                                    self.queue.insert(event, self.delay);
212                                }
213                            }
214                        }
215                    },
216                }
217            }
218        })
219    }
220}
221
222#[derive(Debug, Snafu)]
223pub enum BuildError {
224    #[snafu(display("The delay duration must not be zero"))]
225    ZeroDelayDuration,
226}
227
228#[cfg(test)]
229mod tests {
230    use indoc::indoc;
231    use std::task::Poll;
232
233    use futures::SinkExt;
234    use vector_lib::event::TraceEvent;
235
236    use super::*;
237    use crate::event::LogEvent;
238
239    #[test]
240    fn generate_config() {
241        crate::test_util::test_generate_config::<DelayConfig>();
242    }
243
244    #[tokio::test]
245    async fn delay_events() {
246        let config = toml::from_str::<DelayConfig>(indoc! {"
247            delay_ms = 200
248        "})
249        .unwrap();
250
251        let delay =
252            Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
253
254        let delay = delay.into_task();
255
256        let (mut tx, rx) = futures::channel::mpsc::channel(10);
257        let mut out_stream = delay.transform_events(Box::pin(rx));
258
259        tx.send(LogEvent::default().into()).await.unwrap();
260
261        // We should be pending, because we are now waiting for the delay
262        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
263
264        // Wait long enough for delay to end
265        tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
266
267        if !matches!(futures::poll!(out_stream.next()), Poll::Ready(Some(_event))) {
268            panic!("Unexpectedly received None or Pending in output stream");
269        }
270    }
271
272    #[tokio::test]
273    async fn delay_events_at_capacity_drop_newest() {
274        let config = toml::from_str::<DelayConfig>(indoc! {r#"
275            delay_ms = 200
276            queue_capacity = 1
277            overflow_strategy = "drop_newest"
278        "#})
279        .unwrap();
280
281        let delay =
282            Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
283
284        let delay = delay.into_task();
285
286        let (mut tx, rx) = futures::channel::mpsc::channel(10);
287        let mut out_stream = delay.transform_events(Box::pin(rx));
288
289        tx.send(LogEvent::default().into()).await.unwrap();
290        tx.send(TraceEvent::default().into()).await.unwrap();
291
292        // We should be pending, because we are now waiting for the delay
293        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
294
295        // Wait long enough for delay to end
296        tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
297
298        let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
299            panic!("Unexpectedly received None or Pending in output stream");
300        };
301        assert!(event.try_into_log().is_some());
302
303        // We should be pending, because trace event should have been dropped
304        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
305    }
306
307    #[tokio::test]
308    async fn delay_events_at_capacity_pass() {
309        let config = toml::from_str::<DelayConfig>(indoc! {r#"
310            delay_ms = 200
311            queue_capacity = 1
312            overflow_strategy = "forward"
313        "#})
314        .unwrap();
315
316        let delay =
317            Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
318
319        let delay = delay.into_task();
320
321        let (mut tx, rx) = futures::channel::mpsc::channel(10);
322        let mut out_stream = delay.transform_events(Box::pin(rx));
323
324        tx.send(LogEvent::default().into()).await.unwrap();
325        tx.send(TraceEvent::default().into()).await.unwrap();
326
327        // First event should be trace, because it is passed right away before delay
328        let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
329            panic!("Unexpectedly received None or Pending in output stream");
330        };
331        assert!(event.try_into_trace().is_some());
332
333        // We should be pending, because we are now waiting for the delay
334        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
335
336        // Wait long enough for delay to end
337        tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
338
339        let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
340            panic!("Unexpectedly received None or Pending in output stream");
341        };
342        assert!(event.try_into_log().is_some());
343    }
344}