1use std::{num::NonZeroUsize, pin::Pin, time::Duration};
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5use serde_with::serde_as;
6use snafu::Snafu;
7use tokio_util::time::DelayQueue;
8use vector_lib::configurable::configurable_component;
9use vector_lib::internal_event::INTENTIONAL;
10use vector_lib::{config::clone_input_definitions, internal_event::ComponentEventsDropped};
11
12use crate::{
13 conditions::{AnyCondition, Condition},
14 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
15 event::Event,
16 schema,
17 transforms::{TaskTransform, Transform},
18};
19
20#[serde_as]
22#[configurable_component(transform("delay", "Slow down events passing through a topology."))]
23#[derive(Clone, Debug)]
24#[serde(deny_unknown_fields)]
25pub struct DelayConfig {
26 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
28 #[configurable(metadata(docs::human_name = "Delay in milliseconds", docs::example = 200))]
29 delay_ms: Duration,
30
31 #[serde(default = "default_queue_capacity")]
33 queue_capacity: NonZeroUsize,
34
35 #[serde(default)]
37 overflow_strategy: OverflowStrategy,
38
39 condition: Option<AnyCondition>,
41}
42
43const fn default_queue_capacity() -> NonZeroUsize {
44 NonZeroUsize::new(500).expect("static non-zero number")
45}
46
47impl Default for DelayConfig {
48 fn default() -> Self {
49 Self {
50 delay_ms: Default::default(),
51 queue_capacity: default_queue_capacity(),
52 overflow_strategy: Default::default(),
53 condition: Default::default(),
54 }
55 }
56}
57
58#[configurable_component]
60#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
61#[serde(rename_all = "snake_case")]
62pub enum OverflowStrategy {
63 #[default]
69 Block,
70
71 DropNewest,
77
78 Forward,
80}
81
82impl_generate_config_from_default!(DelayConfig);
83
84#[async_trait::async_trait]
85#[typetag::serde(name = "delay")]
86impl TransformConfig for DelayConfig {
87 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
88 if self.delay_ms.as_millis() == 0 {
89 return Err(Box::new(BuildError::ZeroDelayDuration));
90 }
91 Ok(Transform::event_task(Delay::new(self, context)?))
92 }
93
94 fn input(&self) -> Input {
95 Input::all()
96 }
97
98 fn outputs(
99 &self,
100 _: &TransformContext,
101 input_definitions: &[(OutputId, schema::Definition)],
102 ) -> Vec<TransformOutput> {
103 vec![TransformOutput::new(
105 DataType::all_bits(),
106 clone_input_definitions(input_definitions),
107 )]
108 }
109}
110
111pub struct Delay {
112 delay: Duration,
113 queue: DelayQueue<Event>,
114 queue_capacity: NonZeroUsize,
115 overflow_strategy: OverflowStrategy,
116 condition: Option<Condition>,
117}
118
119impl Delay {
120 pub fn new(config: &DelayConfig, context: &TransformContext) -> crate::Result<Self> {
121 Ok(Self {
122 delay: config.delay_ms,
123 queue: DelayQueue::with_capacity(config.queue_capacity.get()),
124 queue_capacity: config.queue_capacity,
125 overflow_strategy: config.overflow_strategy,
126 condition: config
127 .condition
128 .as_ref()
129 .map(|c| c.build(&context.enrichment_tables, &context.metrics_storage))
130 .transpose()?,
131 })
132 }
133
134 fn check_condition(&self, event: Event, first: bool) -> (bool, Event) {
135 if let Some(condition) = self.condition.as_ref() {
136 condition.check(event)
137 } else {
138 (!first, event)
141 }
142 }
143}
144
145impl TaskTransform<Event> for Delay {
146 fn transform(
147 mut self: Box<Self>,
148 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
149 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
150 where
151 Self: 'static,
152 {
153 Box::pin(stream! {
154 let mut done = false;
155 loop {
156 if done && self.queue.is_empty() {
157 break;
158 }
159 tokio::select! {
160 biased;
161
162 Some(res) = self.queue.next() => {
163 let event = res.into_inner();
164 let (result, event) = self.check_condition(event, false);
165 if result {
166 yield event;
167 } else {
168 self.queue.insert(event, self.delay);
169 }
170 if done && self.queue.is_empty() {
171 break;
172 }
173 },
174
175 maybe_event = input_rx.next(), if !done => {
176 match maybe_event {
177 None => {
178 done = true;
179 }
180 Some(event) => {
181 let (result, event) = self.check_condition(event, true);
182 if result {
183 yield event
184 } else {
185 if self.queue_capacity.get() <= self.queue.len() {
186 match self.overflow_strategy {
187 OverflowStrategy::Block => {
188 while self.queue_capacity.get() <= self.queue.len() && let Some(next) = self.queue.next().await {
189 let event = next.into_inner();
190 let (result, event) = self.check_condition(event, false);
191 if result {
192 yield event;
193 } else {
194 self.queue.insert(event, self.delay);
195 }
196 }
197 },
198 OverflowStrategy::DropNewest => {
199 emit!(ComponentEventsDropped::<INTENTIONAL> {
200 count: 1,
201 reason: "Queue is full and overflow strategy is drop_newest",
202 });
203 continue;
204 }
205 OverflowStrategy::Forward => {
206 yield event;
207 continue;
208 }
209 }
210 }
211 self.queue.insert(event, self.delay);
212 }
213 }
214 }
215 },
216 }
217 }
218 })
219 }
220}
221
222#[derive(Debug, Snafu)]
223pub enum BuildError {
224 #[snafu(display("The delay duration must not be zero"))]
225 ZeroDelayDuration,
226}
227
228#[cfg(test)]
229mod tests {
230 use indoc::indoc;
231 use std::task::Poll;
232
233 use futures::SinkExt;
234 use vector_lib::event::TraceEvent;
235
236 use super::*;
237 use crate::event::LogEvent;
238
239 #[test]
240 fn generate_config() {
241 crate::test_util::test_generate_config::<DelayConfig>();
242 }
243
244 #[tokio::test]
245 async fn delay_events() {
246 let config = toml::from_str::<DelayConfig>(indoc! {"
247 delay_ms = 200
248 "})
249 .unwrap();
250
251 let delay =
252 Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
253
254 let delay = delay.into_task();
255
256 let (mut tx, rx) = futures::channel::mpsc::channel(10);
257 let mut out_stream = delay.transform_events(Box::pin(rx));
258
259 tx.send(LogEvent::default().into()).await.unwrap();
260
261 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
263
264 tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
266
267 if !matches!(futures::poll!(out_stream.next()), Poll::Ready(Some(_event))) {
268 panic!("Unexpectedly received None or Pending in output stream");
269 }
270 }
271
272 #[tokio::test]
273 async fn delay_events_at_capacity_drop_newest() {
274 let config = toml::from_str::<DelayConfig>(indoc! {r#"
275 delay_ms = 200
276 queue_capacity = 1
277 overflow_strategy = "drop_newest"
278 "#})
279 .unwrap();
280
281 let delay =
282 Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
283
284 let delay = delay.into_task();
285
286 let (mut tx, rx) = futures::channel::mpsc::channel(10);
287 let mut out_stream = delay.transform_events(Box::pin(rx));
288
289 tx.send(LogEvent::default().into()).await.unwrap();
290 tx.send(TraceEvent::default().into()).await.unwrap();
291
292 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
294
295 tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
297
298 let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
299 panic!("Unexpectedly received None or Pending in output stream");
300 };
301 assert!(event.try_into_log().is_some());
302
303 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
305 }
306
307 #[tokio::test]
308 async fn delay_events_at_capacity_pass() {
309 let config = toml::from_str::<DelayConfig>(indoc! {r#"
310 delay_ms = 200
311 queue_capacity = 1
312 overflow_strategy = "forward"
313 "#})
314 .unwrap();
315
316 let delay =
317 Transform::event_task(Delay::new(&config, &TransformContext::default()).unwrap());
318
319 let delay = delay.into_task();
320
321 let (mut tx, rx) = futures::channel::mpsc::channel(10);
322 let mut out_stream = delay.transform_events(Box::pin(rx));
323
324 tx.send(LogEvent::default().into()).await.unwrap();
325 tx.send(TraceEvent::default().into()).await.unwrap();
326
327 let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
329 panic!("Unexpectedly received None or Pending in output stream");
330 };
331 assert!(event.try_into_trace().is_some());
332
333 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
335
336 tokio::time::sleep(Duration::from_secs_f64(0.3)).await;
338
339 let Poll::Ready(Some(event)) = futures::poll!(out_stream.next()) else {
340 panic!("Unexpectedly received None or Pending in output stream");
341 };
342 assert!(event.try_into_log().is_some());
343 }
344}