vector/transforms/
aggregate.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10    configurable::configurable_component,
11    event::{
12        MetricValue,
13        metric::{Metric, MetricData, MetricKind, MetricSeries},
14    },
15};
16
17use crate::{
18    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19    event::{Event, EventMetadata},
20    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21    schema,
22    transforms::{TaskTransform, Transform},
23};
24
25/// Configuration for the `aggregate` transform.
26#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30    /// The interval between flushes, in milliseconds.
31    ///
32    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
33    #[serde(default = "default_interval_ms")]
34    #[configurable(metadata(docs::human_name = "Flush Interval"))]
35    pub interval_ms: u64,
36    /// Function to use for aggregation.
37    ///
38    /// Some of the functions may only function on incremental and some only on absolute metrics.
39    #[serde(default = "default_mode")]
40    #[configurable(derived)]
41    pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
49    #[default]
50    Auto,
51
52    /// Sums incremental metrics; absolute metrics pass through unchanged.
53    Sum,
54
55    /// Returns the latest value for absolute metrics; incremental metrics pass through unchanged.
56    Latest,
57
58    /// Counts metrics for incremental and absolute metrics
59    Count,
60
61    /// Returns difference between latest value for absolute; incremental metrics pass through unchanged.
62    Diff,
63
64    /// Max value of absolute metric; incremental metrics pass through unchanged.
65    Max,
66
67    /// Min value of absolute metric; incremental metrics pass through unchanged.
68    Min,
69
70    /// Mean value of absolute metric; incremental metrics pass through unchanged.
71    Mean,
72
73    /// Stdev value of absolute metric; incremental metrics pass through unchanged.
74    Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
80    #[default]
81    Auto,
82
83    /// Sums incremental metrics; absolute metrics pass through unchanged.
84    Sum,
85
86    /// Returns the latest value for absolute metrics; incremental metrics pass through unchanged.
87    Latest,
88
89    /// Counts metrics for incremental and absolute metrics
90    Count,
91
92    /// Returns difference between latest value for absolute; incremental metrics pass through unchanged.
93    Diff {
94        prev_map: HashMap<MetricSeries, MetricEntry>,
95    },
96
97    /// Max value of absolute metric; incremental metrics pass through unchanged.
98    Max,
99
100    /// Min value of absolute metric; incremental metrics pass through unchanged.
101    Min,
102
103    /// Mean value of absolute metric; incremental metrics pass through unchanged.
104    Mean {
105        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106    },
107
108    /// Stdev value of absolute metric; incremental metrics pass through unchanged.
109    Stdev {
110        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111    },
112}
113
114impl From<AggregationMode> for InnerMode {
115    fn from(value: AggregationMode) -> Self {
116        match value {
117            AggregationMode::Auto => InnerMode::Auto,
118            AggregationMode::Sum => InnerMode::Sum,
119            AggregationMode::Latest => InnerMode::Latest,
120            AggregationMode::Count => InnerMode::Count,
121            AggregationMode::Diff => InnerMode::Diff {
122                prev_map: HashMap::default(),
123            },
124            AggregationMode::Max => InnerMode::Max,
125            AggregationMode::Min => InnerMode::Min,
126            AggregationMode::Mean => InnerMode::Mean {
127                multi_map: HashMap::default(),
128            },
129            AggregationMode::Stdev => InnerMode::Stdev {
130                multi_map: HashMap::default(),
131            },
132        }
133    }
134}
135
136const fn default_mode() -> AggregationMode {
137    AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141    10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150        Aggregate::new(self).map(Transform::event_task)
151    }
152
153    fn input(&self) -> Input {
154        Input::metric()
155    }
156
157    fn outputs(
158        &self,
159        _: &TransformContext,
160        _: &[(OutputId, schema::Definition)],
161    ) -> Vec<TransformOutput> {
162        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163    }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170    interval: Duration,
171    map: HashMap<MetricSeries, MetricEntry>,
172    mode: InnerMode,
173}
174
175impl Aggregate {
176    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177        Ok(Self {
178            interval: Duration::from_millis(config.interval_ms),
179            map: Default::default(),
180            mode: config.mode.into(),
181        })
182    }
183
184    pub fn record(&mut self, event: Event) -> Option<Event> {
185        let (series, data, metadata) = event.into_metric().into_parts();
186
187        match (&mut self.mode, data.kind) {
188            (InnerMode::Sum, MetricKind::Absolute)
189            | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Incremental)
190            | (InnerMode::Max | InnerMode::Min, MetricKind::Incremental)
191            | (InnerMode::Mean { .. } | InnerMode::Stdev { .. }, MetricKind::Incremental) => {
192                return Some(Event::Metric(Metric::from_parts(series, data, metadata)));
193            }
194            (InnerMode::Auto | InnerMode::Sum, MetricKind::Incremental) => {
195                self.record_sum(series, data, metadata);
196            }
197            (InnerMode::Auto, MetricKind::Absolute)
198            | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Absolute) => {
199                self.map.insert(series, (data, metadata));
200            }
201            (InnerMode::Count, _) => {
202                self.record_count(series, data, metadata);
203            }
204            (InnerMode::Max | InnerMode::Min, MetricKind::Absolute) => {
205                self.record_comparison(series, data, metadata);
206            }
207            (
208                InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map },
209                MetricKind::Absolute,
210            ) => {
211                if matches!(data.value, MetricValue::Gauge { value: _ }) {
212                    match multi_map.entry(series) {
213                        Entry::Occupied(mut entry) => entry.get_mut().push((data, metadata)),
214                        Entry::Vacant(entry) => {
215                            entry.insert(vec![(data, metadata)]);
216                        }
217                    }
218                }
219            }
220        }
221        emit!(AggregateEventRecorded);
222        None
223    }
224
225    fn record_count(
226        &mut self,
227        series: MetricSeries,
228        mut data: MetricData,
229        metadata: EventMetadata,
230    ) {
231        let mut count_data = data.clone();
232        let existing = self.map.entry(series).or_insert_with(|| {
233            *data.value_mut() = MetricValue::Counter { value: 0f64 };
234            (data.clone(), metadata.clone())
235        });
236        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
237        if existing.0.kind == data.kind && existing.0.update(&count_data) {
238            existing.1.merge(metadata);
239        } else {
240            emit!(AggregateUpdateFailed);
241        }
242    }
243
244    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
245        match self.map.entry(series) {
246            Entry::Occupied(mut entry) => {
247                let existing = entry.get_mut();
248                // In order to update (add) the new and old kind's must match
249                if existing.0.kind == data.kind && existing.0.update(&data) {
250                    existing.1.merge(metadata);
251                } else {
252                    emit!(AggregateUpdateFailed);
253                    *existing = (data, metadata);
254                }
255            }
256            Entry::Vacant(entry) => {
257                entry.insert((data, metadata));
258            }
259        }
260    }
261
262    fn record_comparison(
263        &mut self,
264        series: MetricSeries,
265        data: MetricData,
266        metadata: EventMetadata,
267    ) {
268        match self.map.entry(series) {
269            Entry::Occupied(mut entry) => {
270                let existing = entry.get_mut();
271                // In order to update (add) the new and old kind's must match
272                if existing.0.kind == data.kind {
273                    if let MetricValue::Gauge {
274                        value: existing_value,
275                    } = existing.0.value()
276                        && let MetricValue::Gauge { value: new_value } = data.value()
277                    {
278                        let should_update = match self.mode {
279                            InnerMode::Max => new_value > existing_value,
280                            InnerMode::Min => new_value < existing_value,
281                            _ => false,
282                        };
283                        if should_update {
284                            *existing = (data, metadata);
285                        }
286                    }
287                } else {
288                    emit!(AggregateUpdateFailed);
289                    *existing = (data, metadata);
290                }
291            }
292            Entry::Vacant(entry) => {
293                entry.insert((data, metadata));
294            }
295        }
296    }
297
298    pub fn flush_into(&mut self, output: &mut Vec<Event>) {
299        let map = std::mem::take(&mut self.map);
300        for (series, entry) in map.clone().into_iter() {
301            let mut metric = Metric::from_parts(series, entry.0, entry.1);
302            if let InnerMode::Diff { prev_map } = &self.mode
303                && let Some(prev_entry) = prev_map.get(metric.series())
304                && metric.data().kind == prev_entry.0.kind
305                && !metric.subtract(&prev_entry.0)
306            {
307                emit!(AggregateUpdateFailed);
308            }
309            output.push(Event::Metric(metric));
310        }
311
312        let multi_map = match &mut self.mode {
313            InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
314                std::mem::take(multi_map)
315            }
316            _ => HashMap::default(),
317        };
318
319        'outer: for (series, entries) in multi_map.into_iter() {
320            if entries.is_empty() {
321                continue;
322            }
323
324            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
325            for (data, metadata) in entries.iter().skip(1) {
326                if !final_sum.update(data) {
327                    // Incompatible types, skip this metric
328                    emit!(AggregateUpdateFailed);
329                    continue 'outer;
330                }
331                final_metadata.merge(metadata.clone());
332            }
333
334            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
335                // Entries are not empty so this is safe.
336                *value /= entries.len() as f64;
337                *value
338            } else {
339                0.0
340            };
341
342            let final_mean = final_sum.clone();
343            match self.mode {
344                InnerMode::Mean { .. } => {
345                    let metric = Metric::from_parts(series, final_mean, final_metadata);
346                    output.push(Event::Metric(metric));
347                }
348                InnerMode::Stdev { .. } => {
349                    let variance = entries
350                        .iter()
351                        .filter_map(|(data, _)| {
352                            if let MetricValue::Gauge { value } = data.value() {
353                                let diff = final_mean_value - value;
354                                Some(diff * diff)
355                            } else {
356                                None
357                            }
358                        })
359                        .sum::<f64>()
360                        / entries.len() as f64;
361                    let mut final_stdev = final_mean;
362                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
363                        *value = variance.sqrt()
364                    }
365                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
366                    output.push(Event::Metric(metric));
367                }
368                _ => (),
369            }
370        }
371
372        if let InnerMode::Diff { prev_map } = &mut self.mode {
373            *prev_map = map;
374        }
375        emit!(AggregateFlushed);
376    }
377}
378
379impl TaskTransform<Event> for Aggregate {
380    fn transform(
381        mut self: Box<Self>,
382        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
383    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
384    where
385        Self: 'static,
386    {
387        let mut flush_stream = tokio::time::interval(self.interval);
388
389        Box::pin(stream! {
390            let mut output = Vec::new();
391            let mut done = false;
392            while !done {
393                tokio::select! {
394                    _ = flush_stream.tick() => {
395                        self.flush_into(&mut output);
396                    },
397                    maybe_event = input_rx.next() => {
398                        match maybe_event {
399                            None => {
400                                self.flush_into(&mut output);
401                                done = true;
402                            }
403                            Some(event) => {
404                                if let Some(passthrough) = self.record(event) {
405                                    output.push(passthrough);
406                                }
407                            }
408                        }
409                    }
410                };
411                for event in output.drain(..) {
412                    yield event;
413                }
414            }
415        })
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use std::{collections::BTreeSet, sync::Arc, task::Poll};
422
423    use futures::stream;
424    use tokio::sync::mpsc;
425    use tokio_stream::wrappers::ReceiverStream;
426    use vector_lib::config::{ComponentKey, LogNamespace};
427    use vrl::value::Kind;
428
429    use super::*;
430    use crate::{
431        event::{
432            Event, Metric,
433            metric::{MetricKind, MetricValue},
434        },
435        schema::Definition,
436        test_util::components::assert_transform_compliance,
437        transforms::test::create_topology,
438    };
439
440    #[test]
441    fn generate_config() {
442        crate::test_util::test_generate_config::<AggregateConfig>();
443    }
444
445    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
446        let mut event = Event::Metric(Metric::new(name, kind, value))
447            .with_source_id(Arc::new(ComponentKey::from("in")))
448            .with_upstream_id(Arc::new(OutputId::from("transform")));
449        event.metadata_mut().set_schema_definition(&Arc::new(
450            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
451        ));
452
453        event.metadata_mut().set_source_type("unit_test_stream");
454
455        event
456    }
457
458    #[test]
459    fn incremental_auto() {
460        let mut agg = Aggregate::new(&AggregateConfig {
461            interval_ms: 1000_u64,
462            mode: AggregationMode::Auto,
463        })
464        .unwrap();
465
466        let counter_a_1 = make_metric(
467            "counter_a",
468            MetricKind::Incremental,
469            MetricValue::Counter { value: 42.0 },
470        );
471        let counter_a_2 = make_metric(
472            "counter_a",
473            MetricKind::Incremental,
474            MetricValue::Counter { value: 43.0 },
475        );
476        let counter_a_summed = make_metric(
477            "counter_a",
478            MetricKind::Incremental,
479            MetricValue::Counter { value: 85.0 },
480        );
481
482        // Single item, just stored regardless of kind
483        assert_eq!(agg.record(counter_a_1.clone()), None);
484        let mut out = vec![];
485        // We should flush 1 item counter_a_1
486        agg.flush_into(&mut out);
487        assert_eq!(1, out.len());
488        assert_eq!(&counter_a_1, &out[0]);
489
490        // A subsequent flush doesn't send out anything
491        out.clear();
492        agg.flush_into(&mut out);
493        assert_eq!(0, out.len());
494
495        // One more just to make sure that we don't re-see from the other buffer
496        out.clear();
497        agg.flush_into(&mut out);
498        assert_eq!(0, out.len());
499
500        // Two increments with the same series, should sum into 1
501        assert_eq!(agg.record(counter_a_1.clone()), None);
502        assert_eq!(agg.record(counter_a_2), None);
503        out.clear();
504        agg.flush_into(&mut out);
505        assert_eq!(1, out.len());
506        assert_eq!(&counter_a_summed, &out[0]);
507
508        let counter_b_1 = make_metric(
509            "counter_b",
510            MetricKind::Incremental,
511            MetricValue::Counter { value: 44.0 },
512        );
513        // Two increments with the different series, should get each back as-is
514        assert_eq!(agg.record(counter_a_1.clone()), None);
515        assert_eq!(agg.record(counter_b_1.clone()), None);
516        out.clear();
517        agg.flush_into(&mut out);
518        assert_eq!(2, out.len());
519        // B/c we don't know the order they'll come back
520        for event in out {
521            match event.as_metric().series().name.name.as_str() {
522                "counter_a" => assert_eq!(counter_a_1, event),
523                "counter_b" => assert_eq!(counter_b_1, event),
524                _ => panic!("Unexpected metric name in aggregate output"),
525            }
526        }
527    }
528
529    #[test]
530    fn absolute_auto() {
531        let mut agg = Aggregate::new(&AggregateConfig {
532            interval_ms: 1000_u64,
533            mode: AggregationMode::Auto,
534        })
535        .unwrap();
536
537        let gauge_a_1 = make_metric(
538            "gauge_a",
539            MetricKind::Absolute,
540            MetricValue::Gauge { value: 42.0 },
541        );
542        let gauge_a_2 = make_metric(
543            "gauge_a",
544            MetricKind::Absolute,
545            MetricValue::Gauge { value: 43.0 },
546        );
547
548        // Single item, just stored regardless of kind
549        assert_eq!(agg.record(gauge_a_1.clone()), None);
550        let mut out = vec![];
551        // We should flush 1 item gauge_a_1
552        agg.flush_into(&mut out);
553        assert_eq!(1, out.len());
554        assert_eq!(&gauge_a_1, &out[0]);
555
556        // A subsequent flush doesn't send out anything
557        out.clear();
558        agg.flush_into(&mut out);
559        assert_eq!(0, out.len());
560
561        // One more just to make sure that we don't re-see from the other buffer
562        out.clear();
563        agg.flush_into(&mut out);
564        assert_eq!(0, out.len());
565
566        // Two absolutes with the same series, should get the 2nd (last) back.
567        assert_eq!(agg.record(gauge_a_1.clone()), None);
568        assert_eq!(agg.record(gauge_a_2.clone()), None);
569        out.clear();
570        agg.flush_into(&mut out);
571        assert_eq!(1, out.len());
572        assert_eq!(&gauge_a_2, &out[0]);
573
574        let gauge_b_1 = make_metric(
575            "gauge_b",
576            MetricKind::Absolute,
577            MetricValue::Gauge { value: 44.0 },
578        );
579        // Two increments with the different series, should get each back as-is
580        assert_eq!(agg.record(gauge_a_1.clone()), None);
581        assert_eq!(agg.record(gauge_b_1.clone()), None);
582        out.clear();
583        agg.flush_into(&mut out);
584        assert_eq!(2, out.len());
585        // B/c we don't know the order they'll come back
586        for event in out {
587            match event.as_metric().series().name.name.as_str() {
588                "gauge_a" => assert_eq!(gauge_a_1, event),
589                "gauge_b" => assert_eq!(gauge_b_1, event),
590                _ => panic!("Unexpected metric name in aggregate output"),
591            }
592        }
593    }
594
595    #[test]
596    fn count_agg() {
597        let mut agg = Aggregate::new(&AggregateConfig {
598            interval_ms: 1000_u64,
599            mode: AggregationMode::Count,
600        })
601        .unwrap();
602
603        let gauge_a_1 = make_metric(
604            "gauge_a",
605            MetricKind::Absolute,
606            MetricValue::Gauge { value: 42.0 },
607        );
608        let gauge_a_2 = make_metric(
609            "gauge_a",
610            MetricKind::Absolute,
611            MetricValue::Gauge { value: 43.0 },
612        );
613        let result_count = make_metric(
614            "gauge_a",
615            MetricKind::Absolute,
616            MetricValue::Counter { value: 1.0 },
617        );
618        let result_count_2 = make_metric(
619            "gauge_a",
620            MetricKind::Absolute,
621            MetricValue::Counter { value: 2.0 },
622        );
623
624        // Single item, counter should be 1
625        assert_eq!(agg.record(gauge_a_1.clone()), None);
626        let mut out = vec![];
627        // We should flush 1 item gauge_a_1
628        agg.flush_into(&mut out);
629        assert_eq!(1, out.len());
630        assert_eq!(&result_count, &out[0]);
631
632        // A subsequent flush doesn't send out anything
633        out.clear();
634        agg.flush_into(&mut out);
635        assert_eq!(0, out.len());
636
637        // One more just to make sure that we don't re-see from the other buffer
638        out.clear();
639        agg.flush_into(&mut out);
640        assert_eq!(0, out.len());
641
642        // Two absolutes with the same series, counter should be 2
643        assert_eq!(agg.record(gauge_a_1.clone()), None);
644        assert_eq!(agg.record(gauge_a_2.clone()), None);
645        out.clear();
646        agg.flush_into(&mut out);
647        assert_eq!(1, out.len());
648        assert_eq!(&result_count_2, &out[0]);
649    }
650
651    #[test]
652    fn absolute_max() {
653        let mut agg = Aggregate::new(&AggregateConfig {
654            interval_ms: 1000_u64,
655            mode: AggregationMode::Max,
656        })
657        .unwrap();
658
659        let gauge_a_1 = make_metric(
660            "gauge_a",
661            MetricKind::Absolute,
662            MetricValue::Gauge { value: 112.0 },
663        );
664        let gauge_a_2 = make_metric(
665            "gauge_a",
666            MetricKind::Absolute,
667            MetricValue::Gauge { value: 89.0 },
668        );
669
670        // Single item, it should be returned as is
671        assert_eq!(agg.record(gauge_a_2.clone()), None);
672        let mut out = vec![];
673        // We should flush 1 item gauge_a_2
674        agg.flush_into(&mut out);
675        assert_eq!(1, out.len());
676        assert_eq!(&gauge_a_2, &out[0]);
677
678        // A subsequent flush doesn't send out anything
679        out.clear();
680        agg.flush_into(&mut out);
681        assert_eq!(0, out.len());
682
683        // One more just to make sure that we don't re-see from the other buffer
684        out.clear();
685        agg.flush_into(&mut out);
686        assert_eq!(0, out.len());
687
688        // Two absolutes, result should be higher of the 2
689        assert_eq!(agg.record(gauge_a_1.clone()), None);
690        assert_eq!(agg.record(gauge_a_2.clone()), None);
691        out.clear();
692        agg.flush_into(&mut out);
693        assert_eq!(1, out.len());
694        assert_eq!(&gauge_a_1, &out[0]);
695    }
696
697    #[test]
698    fn absolute_min() {
699        let mut agg = Aggregate::new(&AggregateConfig {
700            interval_ms: 1000_u64,
701            mode: AggregationMode::Min,
702        })
703        .unwrap();
704
705        let gauge_a_1 = make_metric(
706            "gauge_a",
707            MetricKind::Absolute,
708            MetricValue::Gauge { value: 32.0 },
709        );
710        let gauge_a_2 = make_metric(
711            "gauge_a",
712            MetricKind::Absolute,
713            MetricValue::Gauge { value: 89.0 },
714        );
715
716        // Single item, it should be returned as is
717        assert_eq!(agg.record(gauge_a_2.clone()), None);
718        let mut out = vec![];
719        // We should flush 1 item gauge_a_2
720        agg.flush_into(&mut out);
721        assert_eq!(1, out.len());
722        assert_eq!(&gauge_a_2, &out[0]);
723
724        // A subsequent flush doesn't send out anything
725        out.clear();
726        agg.flush_into(&mut out);
727        assert_eq!(0, out.len());
728
729        // One more just to make sure that we don't re-see from the other buffer
730        out.clear();
731        agg.flush_into(&mut out);
732        assert_eq!(0, out.len());
733
734        // Two absolutes, result should be lower of the 2
735        assert_eq!(agg.record(gauge_a_1.clone()), None);
736        assert_eq!(agg.record(gauge_a_2.clone()), None);
737        out.clear();
738        agg.flush_into(&mut out);
739        assert_eq!(1, out.len());
740        assert_eq!(&gauge_a_1, &out[0]);
741    }
742
743    #[test]
744    fn absolute_diff() {
745        let mut agg = Aggregate::new(&AggregateConfig {
746            interval_ms: 1000_u64,
747            mode: AggregationMode::Diff,
748        })
749        .unwrap();
750
751        let gauge_a_1 = make_metric(
752            "gauge_a",
753            MetricKind::Absolute,
754            MetricValue::Gauge { value: 32.0 },
755        );
756        let gauge_a_2 = make_metric(
757            "gauge_a",
758            MetricKind::Absolute,
759            MetricValue::Gauge { value: 82.0 },
760        );
761        let result = make_metric(
762            "gauge_a",
763            MetricKind::Absolute,
764            MetricValue::Gauge { value: 50.0 },
765        );
766
767        // Single item, it should be returned as is
768        assert_eq!(agg.record(gauge_a_2.clone()), None);
769        let mut out = vec![];
770        // We should flush 1 item gauge_a_2
771        agg.flush_into(&mut out);
772        assert_eq!(1, out.len());
773        assert_eq!(&gauge_a_2, &out[0]);
774
775        // A subsequent flush doesn't send out anything
776        out.clear();
777        agg.flush_into(&mut out);
778        assert_eq!(0, out.len());
779
780        // One more just to make sure that we don't re-see from the other buffer
781        out.clear();
782        agg.flush_into(&mut out);
783        assert_eq!(0, out.len());
784
785        // Two absolutes in 2 separate flushes, result should be diff between the 2
786        assert_eq!(agg.record(gauge_a_1.clone()), None);
787        out.clear();
788        agg.flush_into(&mut out);
789        assert_eq!(1, out.len());
790        assert_eq!(&gauge_a_1, &out[0]);
791
792        assert_eq!(agg.record(gauge_a_2.clone()), None);
793        out.clear();
794        agg.flush_into(&mut out);
795        assert_eq!(1, out.len());
796        assert_eq!(&result, &out[0]);
797    }
798
799    #[test]
800    fn absolute_diff_conflicting_type() {
801        let mut agg = Aggregate::new(&AggregateConfig {
802            interval_ms: 1000_u64,
803            mode: AggregationMode::Diff,
804        })
805        .unwrap();
806
807        let gauge_a_1 = make_metric(
808            "gauge_a",
809            MetricKind::Absolute,
810            MetricValue::Gauge { value: 32.0 },
811        );
812        let gauge_a_2 = make_metric(
813            "gauge_a",
814            MetricKind::Absolute,
815            MetricValue::Counter { value: 1.0 },
816        );
817
818        let mut out = vec![];
819        // Two absolutes in 2 separate flushes, result should be second one due to different types
820        assert_eq!(agg.record(gauge_a_1.clone()), None);
821        out.clear();
822        agg.flush_into(&mut out);
823        assert_eq!(1, out.len());
824        assert_eq!(&gauge_a_1, &out[0]);
825
826        assert_eq!(agg.record(gauge_a_2.clone()), None);
827        out.clear();
828        agg.flush_into(&mut out);
829        assert_eq!(1, out.len());
830        // Due to incompatible results, the new value just overwrites the old one
831        assert_eq!(&gauge_a_2, &out[0]);
832    }
833
834    #[test]
835    fn absolute_mean() {
836        let mut agg = Aggregate::new(&AggregateConfig {
837            interval_ms: 1000_u64,
838            mode: AggregationMode::Mean,
839        })
840        .unwrap();
841
842        let gauge_a_1 = make_metric(
843            "gauge_a",
844            MetricKind::Absolute,
845            MetricValue::Gauge { value: 32.0 },
846        );
847        let gauge_a_2 = make_metric(
848            "gauge_a",
849            MetricKind::Absolute,
850            MetricValue::Gauge { value: 82.0 },
851        );
852        let gauge_a_3 = make_metric(
853            "gauge_a",
854            MetricKind::Absolute,
855            MetricValue::Gauge { value: 51.0 },
856        );
857        let mean_result = make_metric(
858            "gauge_a",
859            MetricKind::Absolute,
860            MetricValue::Gauge { value: 55.0 },
861        );
862
863        // Single item, it should be returned as is
864        assert_eq!(agg.record(gauge_a_2.clone()), None);
865        let mut out = vec![];
866        // We should flush 1 item gauge_a_2
867        agg.flush_into(&mut out);
868        assert_eq!(1, out.len());
869        assert_eq!(&gauge_a_2, &out[0]);
870
871        // A subsequent flush doesn't send out anything
872        out.clear();
873        agg.flush_into(&mut out);
874        assert_eq!(0, out.len());
875
876        // One more just to make sure that we don't re-see from the other buffer
877        out.clear();
878        agg.flush_into(&mut out);
879        assert_eq!(0, out.len());
880
881        // Three absolutes, result should be mean
882        assert_eq!(agg.record(gauge_a_1.clone()), None);
883        assert_eq!(agg.record(gauge_a_2.clone()), None);
884        assert_eq!(agg.record(gauge_a_3.clone()), None);
885        out.clear();
886        agg.flush_into(&mut out);
887        assert_eq!(1, out.len());
888        assert_eq!(&mean_result, &out[0]);
889    }
890
891    #[test]
892    fn absolute_stdev() {
893        let mut agg = Aggregate::new(&AggregateConfig {
894            interval_ms: 1000_u64,
895            mode: AggregationMode::Stdev,
896        })
897        .unwrap();
898
899        let gauges = vec![
900            make_metric(
901                "gauge_a",
902                MetricKind::Absolute,
903                MetricValue::Gauge { value: 25.0 },
904            ),
905            make_metric(
906                "gauge_a",
907                MetricKind::Absolute,
908                MetricValue::Gauge { value: 30.0 },
909            ),
910            make_metric(
911                "gauge_a",
912                MetricKind::Absolute,
913                MetricValue::Gauge { value: 35.0 },
914            ),
915            make_metric(
916                "gauge_a",
917                MetricKind::Absolute,
918                MetricValue::Gauge { value: 40.0 },
919            ),
920            make_metric(
921                "gauge_a",
922                MetricKind::Absolute,
923                MetricValue::Gauge { value: 45.0 },
924            ),
925            make_metric(
926                "gauge_a",
927                MetricKind::Absolute,
928                MetricValue::Gauge { value: 50.0 },
929            ),
930            make_metric(
931                "gauge_a",
932                MetricKind::Absolute,
933                MetricValue::Gauge { value: 55.0 },
934            ),
935        ];
936        let stdev_result = make_metric(
937            "gauge_a",
938            MetricKind::Absolute,
939            MetricValue::Gauge { value: 10.0 },
940        );
941
942        for gauge in gauges {
943            assert_eq!(agg.record(gauge), None);
944        }
945        let mut out = vec![];
946        agg.flush_into(&mut out);
947        assert_eq!(1, out.len());
948        assert_eq!(&stdev_result, &out[0]);
949    }
950
951    #[test]
952    fn passes_through_ignored_kind() {
953        // Sum mode aggregates incremental, passes through absolute without collapsing.
954        let mut agg = Aggregate::new(&AggregateConfig {
955            interval_ms: 1000_u64,
956            mode: AggregationMode::Sum,
957        })
958        .unwrap();
959
960        let counter_1 = make_metric(
961            "counter_a",
962            MetricKind::Incremental,
963            MetricValue::Counter { value: 10.0 },
964        );
965        let counter_2 = make_metric(
966            "counter_a",
967            MetricKind::Incremental,
968            MetricValue::Counter { value: 5.0 },
969        );
970        let counter_summed = make_metric(
971            "counter_a",
972            MetricKind::Incremental,
973            MetricValue::Counter { value: 15.0 },
974        );
975        let gauge_1 = make_metric(
976            "gauge_a",
977            MetricKind::Absolute,
978            MetricValue::Gauge { value: 42.0 },
979        );
980        let gauge_2 = make_metric(
981            "gauge_a",
982            MetricKind::Absolute,
983            MetricValue::Gauge { value: 99.0 },
984        );
985
986        // Absolute metrics pass through immediately (not held until flush).
987        assert_eq!(agg.record(gauge_1.clone()), Some(gauge_1));
988        assert_eq!(agg.record(gauge_2.clone()), Some(gauge_2));
989
990        // Each is returned individually — no collapsing to latest.
991        assert_eq!(agg.record(counter_1), None);
992        assert_eq!(agg.record(counter_2), None);
993
994        let mut out = vec![];
995        agg.flush_into(&mut out);
996        // Only the summed incremental counter appears at flush; the gauges already passed through.
997        assert_eq!(1, out.len());
998        assert_eq!(&counter_summed, &out[0]);
999    }
1000
1001    #[test]
1002    fn conflicting_value_type() {
1003        let mut agg = Aggregate::new(&AggregateConfig {
1004            interval_ms: 1000_u64,
1005            mode: AggregationMode::Auto,
1006        })
1007        .unwrap();
1008
1009        let counter = make_metric(
1010            "the-thing",
1011            MetricKind::Incremental,
1012            MetricValue::Counter { value: 42.0 },
1013        );
1014        let mut values = BTreeSet::<String>::new();
1015        values.insert("a".into());
1016        values.insert("b".into());
1017        let set = make_metric(
1018            "the-thing",
1019            MetricKind::Incremental,
1020            MetricValue::Set { values },
1021        );
1022        let summed = make_metric(
1023            "the-thing",
1024            MetricKind::Incremental,
1025            MetricValue::Counter { value: 84.0 },
1026        );
1027
1028        // when types conflict the new values replaces whatever is there
1029
1030        // Start with an counter
1031        assert_eq!(agg.record(counter.clone()), None);
1032        // Another will "add" to it
1033        assert_eq!(agg.record(counter.clone()), None);
1034        // Then an set will replace it due to a failed update
1035        assert_eq!(agg.record(set.clone()), None);
1036        // Then a set union would be a noop
1037        assert_eq!(agg.record(set.clone()), None);
1038        let mut out = vec![];
1039        // We should flush 1 item counter
1040        agg.flush_into(&mut out);
1041        assert_eq!(1, out.len());
1042        assert_eq!(&set, &out[0]);
1043
1044        // Start out with an set
1045        assert_eq!(agg.record(set.clone()), None);
1046        // Union with itself, a noop
1047        assert_eq!(agg.record(set), None);
1048        // Send an counter with the same name, will replace due to a failed update
1049        assert_eq!(agg.record(counter.clone()), None);
1050        // Send another counter will "add"
1051        assert_eq!(agg.record(counter), None);
1052        let mut out = vec![];
1053        // We should flush 1 item counter
1054        agg.flush_into(&mut out);
1055        assert_eq!(1, out.len());
1056        assert_eq!(&summed, &out[0]);
1057    }
1058
1059    #[test]
1060    fn conflicting_kinds() {
1061        let mut agg = Aggregate::new(&AggregateConfig {
1062            interval_ms: 1000_u64,
1063            mode: AggregationMode::Auto,
1064        })
1065        .unwrap();
1066
1067        let incremental = make_metric(
1068            "the-thing",
1069            MetricKind::Incremental,
1070            MetricValue::Counter { value: 42.0 },
1071        );
1072        let absolute = make_metric(
1073            "the-thing",
1074            MetricKind::Absolute,
1075            MetricValue::Counter { value: 43.0 },
1076        );
1077        let summed = make_metric(
1078            "the-thing",
1079            MetricKind::Incremental,
1080            MetricValue::Counter { value: 84.0 },
1081        );
1082
1083        // when types conflict the new values replaces whatever is there
1084
1085        // Start with an incremental
1086        assert_eq!(agg.record(incremental.clone()), None);
1087        // Another will "add" to it
1088        assert_eq!(agg.record(incremental.clone()), None);
1089        // Then an absolute will replace it with a failed update
1090        assert_eq!(agg.record(absolute.clone()), None);
1091        // Then another absolute will replace it normally
1092        assert_eq!(agg.record(absolute.clone()), None);
1093        let mut out = vec![];
1094        // We should flush 1 item incremental
1095        agg.flush_into(&mut out);
1096        assert_eq!(1, out.len());
1097        assert_eq!(&absolute, &out[0]);
1098
1099        // Start out with an absolute
1100        assert_eq!(agg.record(absolute.clone()), None);
1101        // Replace it normally
1102        assert_eq!(agg.record(absolute), None);
1103        // Send an incremental with the same name, will replace due to a failed update
1104        assert_eq!(agg.record(incremental.clone()), None);
1105        // Send another incremental will "add"
1106        assert_eq!(agg.record(incremental), None);
1107        let mut out = vec![];
1108        // We should flush 1 item incremental
1109        agg.flush_into(&mut out);
1110        assert_eq!(1, out.len());
1111        assert_eq!(&summed, &out[0]);
1112    }
1113
1114    #[tokio::test]
1115    async fn transform_shutdown() {
1116        let agg = toml::from_str::<AggregateConfig>(
1117            r"
1118interval_ms = 999999
1119",
1120        )
1121        .unwrap()
1122        .build(&TransformContext::default())
1123        .await
1124        .unwrap();
1125
1126        let agg = agg.into_task();
1127
1128        let counter_a_1 = make_metric(
1129            "counter_a",
1130            MetricKind::Incremental,
1131            MetricValue::Counter { value: 42.0 },
1132        );
1133        let counter_a_2 = make_metric(
1134            "counter_a",
1135            MetricKind::Incremental,
1136            MetricValue::Counter { value: 43.0 },
1137        );
1138        let counter_a_summed = make_metric(
1139            "counter_a",
1140            MetricKind::Incremental,
1141            MetricValue::Counter { value: 85.0 },
1142        );
1143        let gauge_a_1 = make_metric(
1144            "gauge_a",
1145            MetricKind::Absolute,
1146            MetricValue::Gauge { value: 42.0 },
1147        );
1148        let gauge_a_2 = make_metric(
1149            "gauge_a",
1150            MetricKind::Absolute,
1151            MetricValue::Gauge { value: 43.0 },
1152        );
1153        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1154
1155        // Queue up some events to be consumed & recorded
1156        let in_stream = Box::pin(stream::iter(inputs));
1157        // Kick off the transform process which should consume & record them
1158        let mut out_stream = agg.transform_events(in_stream);
1159
1160        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1161        // part of the loop and do the shutting down final flush immediately. We'll already be able
1162        // to read our expected bits on the output.
1163        let mut count = 0_u8;
1164        while let Some(event) = out_stream.next().await {
1165            count += 1;
1166            match event.as_metric().series().name.name.as_str() {
1167                "counter_a" => assert_eq!(counter_a_summed, event),
1168                "gauge_a" => assert_eq!(gauge_a_2, event),
1169                _ => panic!("Unexpected metric name in aggregate output"),
1170            };
1171        }
1172        // There were only 2
1173        assert_eq!(2, count);
1174    }
1175
1176    #[tokio::test]
1177    async fn transform_interval() {
1178        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1179
1180        let counter_a_1 = make_metric(
1181            "counter_a",
1182            MetricKind::Incremental,
1183            MetricValue::Counter { value: 42.0 },
1184        );
1185        let counter_a_2 = make_metric(
1186            "counter_a",
1187            MetricKind::Incremental,
1188            MetricValue::Counter { value: 43.0 },
1189        );
1190        let counter_a_summed = make_metric(
1191            "counter_a",
1192            MetricKind::Incremental,
1193            MetricValue::Counter { value: 85.0 },
1194        );
1195        let gauge_a_1 = make_metric(
1196            "gauge_a",
1197            MetricKind::Absolute,
1198            MetricValue::Gauge { value: 42.0 },
1199        );
1200        let gauge_a_2 = make_metric(
1201            "gauge_a",
1202            MetricKind::Absolute,
1203            MetricValue::Gauge { value: 43.0 },
1204        );
1205
1206        assert_transform_compliance(async {
1207            let (tx, rx) = mpsc::channel(10);
1208            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1209            let mut out = ReceiverStream::new(out);
1210
1211            tokio::time::pause();
1212
1213            // tokio interval is always immediately ready, so we poll once to make sure
1214            // we trip it/set the interval in the future
1215            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1216
1217            // Now send our events
1218            tx.send(counter_a_1).await.unwrap();
1219            tx.send(counter_a_2).await.unwrap();
1220            tx.send(gauge_a_1).await.unwrap();
1221            tx.send(gauge_a_2.clone()).await.unwrap();
1222            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1223            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1224            // Now fast forward time enough that our flush should trigger.
1225            tokio::time::advance(Duration::from_secs(11)).await;
1226            // We should have had an interval fire now and our output aggregate events should be
1227            // available.
1228            let mut count = 0_u8;
1229            while count < 2 {
1230                match out.next().await {
1231                    Some(event) => {
1232                        match event.as_metric().series().name.name.as_str() {
1233                            "counter_a" => assert_eq!(counter_a_summed, event),
1234                            "gauge_a" => assert_eq!(gauge_a_2, event),
1235                            _ => panic!("Unexpected metric name in aggregate output"),
1236                        };
1237                        count += 1;
1238                    }
1239                    _ => {
1240                        panic!("Unexpectedly received None in output stream");
1241                    }
1242                }
1243            }
1244            // We should be back to pending, having nothing waiting for us
1245            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1246
1247            drop(tx);
1248            topology.stop().await;
1249            assert_eq!(out.next().await, None);
1250        })
1251        .await;
1252    }
1253}