1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10 configurable::configurable_component,
11 event::{
12 MetricValue,
13 metric::{Metric, MetricData, MetricKind, MetricSeries},
14 },
15};
16
17use crate::{
18 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19 event::{Event, EventMetadata},
20 internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21 schema,
22 transforms::{TaskTransform, Transform},
23};
24
25#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30 #[serde(default = "default_interval_ms")]
34 #[configurable(metadata(docs::human_name = "Flush Interval"))]
35 pub interval_ms: u64,
36 #[serde(default = "default_mode")]
40 #[configurable(derived)]
41 pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48 #[default]
50 Auto,
51
52 Sum,
54
55 Latest,
57
58 Count,
60
61 Diff,
63
64 Max,
66
67 Min,
69
70 Mean,
72
73 Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79 #[default]
81 Auto,
82
83 Sum,
85
86 Latest,
88
89 Count,
91
92 Diff {
94 prev_map: HashMap<MetricSeries, MetricEntry>,
95 },
96
97 Max,
99
100 Min,
102
103 Mean {
105 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106 },
107
108 Stdev {
110 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111 },
112}
113
114impl From<AggregationMode> for InnerMode {
115 fn from(value: AggregationMode) -> Self {
116 match value {
117 AggregationMode::Auto => InnerMode::Auto,
118 AggregationMode::Sum => InnerMode::Sum,
119 AggregationMode::Latest => InnerMode::Latest,
120 AggregationMode::Count => InnerMode::Count,
121 AggregationMode::Diff => InnerMode::Diff {
122 prev_map: HashMap::default(),
123 },
124 AggregationMode::Max => InnerMode::Max,
125 AggregationMode::Min => InnerMode::Min,
126 AggregationMode::Mean => InnerMode::Mean {
127 multi_map: HashMap::default(),
128 },
129 AggregationMode::Stdev => InnerMode::Stdev {
130 multi_map: HashMap::default(),
131 },
132 }
133 }
134}
135
136const fn default_mode() -> AggregationMode {
137 AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141 10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150 Aggregate::new(self).map(Transform::event_task)
151 }
152
153 fn input(&self) -> Input {
154 Input::metric()
155 }
156
157 fn outputs(
158 &self,
159 _: &TransformContext,
160 _: &[(OutputId, schema::Definition)],
161 ) -> Vec<TransformOutput> {
162 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163 }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170 interval: Duration,
171 map: HashMap<MetricSeries, MetricEntry>,
172 mode: InnerMode,
173}
174
175impl Aggregate {
176 pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177 Ok(Self {
178 interval: Duration::from_millis(config.interval_ms),
179 map: Default::default(),
180 mode: config.mode.into(),
181 })
182 }
183
184 pub fn record(&mut self, event: Event) -> Option<Event> {
185 let (series, data, metadata) = event.into_metric().into_parts();
186
187 match (&mut self.mode, data.kind) {
188 (InnerMode::Sum, MetricKind::Absolute)
189 | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Incremental)
190 | (InnerMode::Max | InnerMode::Min, MetricKind::Incremental)
191 | (InnerMode::Mean { .. } | InnerMode::Stdev { .. }, MetricKind::Incremental) => {
192 return Some(Event::Metric(Metric::from_parts(series, data, metadata)));
193 }
194 (InnerMode::Auto | InnerMode::Sum, MetricKind::Incremental) => {
195 self.record_sum(series, data, metadata);
196 }
197 (InnerMode::Auto, MetricKind::Absolute)
198 | (InnerMode::Latest | InnerMode::Diff { .. }, MetricKind::Absolute) => {
199 self.map.insert(series, (data, metadata));
200 }
201 (InnerMode::Count, _) => {
202 self.record_count(series, data, metadata);
203 }
204 (InnerMode::Max | InnerMode::Min, MetricKind::Absolute) => {
205 self.record_comparison(series, data, metadata);
206 }
207 (
208 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map },
209 MetricKind::Absolute,
210 ) => {
211 if matches!(data.value, MetricValue::Gauge { value: _ }) {
212 match multi_map.entry(series) {
213 Entry::Occupied(mut entry) => entry.get_mut().push((data, metadata)),
214 Entry::Vacant(entry) => {
215 entry.insert(vec![(data, metadata)]);
216 }
217 }
218 }
219 }
220 }
221 emit!(AggregateEventRecorded);
222 None
223 }
224
225 fn record_count(
226 &mut self,
227 series: MetricSeries,
228 mut data: MetricData,
229 metadata: EventMetadata,
230 ) {
231 let mut count_data = data.clone();
232 let existing = self.map.entry(series).or_insert_with(|| {
233 *data.value_mut() = MetricValue::Counter { value: 0f64 };
234 (data.clone(), metadata.clone())
235 });
236 *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
237 if existing.0.kind == data.kind && existing.0.update(&count_data) {
238 existing.1.merge(metadata);
239 } else {
240 emit!(AggregateUpdateFailed);
241 }
242 }
243
244 fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
245 match self.map.entry(series) {
246 Entry::Occupied(mut entry) => {
247 let existing = entry.get_mut();
248 if existing.0.kind == data.kind && existing.0.update(&data) {
250 existing.1.merge(metadata);
251 } else {
252 emit!(AggregateUpdateFailed);
253 *existing = (data, metadata);
254 }
255 }
256 Entry::Vacant(entry) => {
257 entry.insert((data, metadata));
258 }
259 }
260 }
261
262 fn record_comparison(
263 &mut self,
264 series: MetricSeries,
265 data: MetricData,
266 metadata: EventMetadata,
267 ) {
268 match self.map.entry(series) {
269 Entry::Occupied(mut entry) => {
270 let existing = entry.get_mut();
271 if existing.0.kind == data.kind {
273 if let MetricValue::Gauge {
274 value: existing_value,
275 } = existing.0.value()
276 && let MetricValue::Gauge { value: new_value } = data.value()
277 {
278 let should_update = match self.mode {
279 InnerMode::Max => new_value > existing_value,
280 InnerMode::Min => new_value < existing_value,
281 _ => false,
282 };
283 if should_update {
284 *existing = (data, metadata);
285 }
286 }
287 } else {
288 emit!(AggregateUpdateFailed);
289 *existing = (data, metadata);
290 }
291 }
292 Entry::Vacant(entry) => {
293 entry.insert((data, metadata));
294 }
295 }
296 }
297
298 pub fn flush_into(&mut self, output: &mut Vec<Event>) {
299 let map = std::mem::take(&mut self.map);
300 for (series, entry) in map.clone().into_iter() {
301 let mut metric = Metric::from_parts(series, entry.0, entry.1);
302 if let InnerMode::Diff { prev_map } = &self.mode
303 && let Some(prev_entry) = prev_map.get(metric.series())
304 && metric.data().kind == prev_entry.0.kind
305 && !metric.subtract(&prev_entry.0)
306 {
307 emit!(AggregateUpdateFailed);
308 }
309 output.push(Event::Metric(metric));
310 }
311
312 let multi_map = match &mut self.mode {
313 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
314 std::mem::take(multi_map)
315 }
316 _ => HashMap::default(),
317 };
318
319 'outer: for (series, entries) in multi_map.into_iter() {
320 if entries.is_empty() {
321 continue;
322 }
323
324 let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
325 for (data, metadata) in entries.iter().skip(1) {
326 if !final_sum.update(data) {
327 emit!(AggregateUpdateFailed);
329 continue 'outer;
330 }
331 final_metadata.merge(metadata.clone());
332 }
333
334 let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
335 *value /= entries.len() as f64;
337 *value
338 } else {
339 0.0
340 };
341
342 let final_mean = final_sum.clone();
343 match self.mode {
344 InnerMode::Mean { .. } => {
345 let metric = Metric::from_parts(series, final_mean, final_metadata);
346 output.push(Event::Metric(metric));
347 }
348 InnerMode::Stdev { .. } => {
349 let variance = entries
350 .iter()
351 .filter_map(|(data, _)| {
352 if let MetricValue::Gauge { value } = data.value() {
353 let diff = final_mean_value - value;
354 Some(diff * diff)
355 } else {
356 None
357 }
358 })
359 .sum::<f64>()
360 / entries.len() as f64;
361 let mut final_stdev = final_mean;
362 if let MetricValue::Gauge { value } = final_stdev.value_mut() {
363 *value = variance.sqrt()
364 }
365 let metric = Metric::from_parts(series, final_stdev, final_metadata);
366 output.push(Event::Metric(metric));
367 }
368 _ => (),
369 }
370 }
371
372 if let InnerMode::Diff { prev_map } = &mut self.mode {
373 *prev_map = map;
374 }
375 emit!(AggregateFlushed);
376 }
377}
378
379impl TaskTransform<Event> for Aggregate {
380 fn transform(
381 mut self: Box<Self>,
382 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
383 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
384 where
385 Self: 'static,
386 {
387 let mut flush_stream = tokio::time::interval(self.interval);
388
389 Box::pin(stream! {
390 let mut output = Vec::new();
391 let mut done = false;
392 while !done {
393 tokio::select! {
394 _ = flush_stream.tick() => {
395 self.flush_into(&mut output);
396 },
397 maybe_event = input_rx.next() => {
398 match maybe_event {
399 None => {
400 self.flush_into(&mut output);
401 done = true;
402 }
403 Some(event) => {
404 if let Some(passthrough) = self.record(event) {
405 output.push(passthrough);
406 }
407 }
408 }
409 }
410 };
411 for event in output.drain(..) {
412 yield event;
413 }
414 }
415 })
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use std::{collections::BTreeSet, sync::Arc, task::Poll};
422
423 use futures::stream;
424 use tokio::sync::mpsc;
425 use tokio_stream::wrappers::ReceiverStream;
426 use vector_lib::config::{ComponentKey, LogNamespace};
427 use vrl::value::Kind;
428
429 use super::*;
430 use crate::{
431 event::{
432 Event, Metric,
433 metric::{MetricKind, MetricValue},
434 },
435 schema::Definition,
436 test_util::components::assert_transform_compliance,
437 transforms::test::create_topology,
438 };
439
440 #[test]
441 fn generate_config() {
442 crate::test_util::test_generate_config::<AggregateConfig>();
443 }
444
445 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
446 let mut event = Event::Metric(Metric::new(name, kind, value))
447 .with_source_id(Arc::new(ComponentKey::from("in")))
448 .with_upstream_id(Arc::new(OutputId::from("transform")));
449 event.metadata_mut().set_schema_definition(&Arc::new(
450 Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
451 ));
452
453 event.metadata_mut().set_source_type("unit_test_stream");
454
455 event
456 }
457
458 #[test]
459 fn incremental_auto() {
460 let mut agg = Aggregate::new(&AggregateConfig {
461 interval_ms: 1000_u64,
462 mode: AggregationMode::Auto,
463 })
464 .unwrap();
465
466 let counter_a_1 = make_metric(
467 "counter_a",
468 MetricKind::Incremental,
469 MetricValue::Counter { value: 42.0 },
470 );
471 let counter_a_2 = make_metric(
472 "counter_a",
473 MetricKind::Incremental,
474 MetricValue::Counter { value: 43.0 },
475 );
476 let counter_a_summed = make_metric(
477 "counter_a",
478 MetricKind::Incremental,
479 MetricValue::Counter { value: 85.0 },
480 );
481
482 assert_eq!(agg.record(counter_a_1.clone()), None);
484 let mut out = vec![];
485 agg.flush_into(&mut out);
487 assert_eq!(1, out.len());
488 assert_eq!(&counter_a_1, &out[0]);
489
490 out.clear();
492 agg.flush_into(&mut out);
493 assert_eq!(0, out.len());
494
495 out.clear();
497 agg.flush_into(&mut out);
498 assert_eq!(0, out.len());
499
500 assert_eq!(agg.record(counter_a_1.clone()), None);
502 assert_eq!(agg.record(counter_a_2), None);
503 out.clear();
504 agg.flush_into(&mut out);
505 assert_eq!(1, out.len());
506 assert_eq!(&counter_a_summed, &out[0]);
507
508 let counter_b_1 = make_metric(
509 "counter_b",
510 MetricKind::Incremental,
511 MetricValue::Counter { value: 44.0 },
512 );
513 assert_eq!(agg.record(counter_a_1.clone()), None);
515 assert_eq!(agg.record(counter_b_1.clone()), None);
516 out.clear();
517 agg.flush_into(&mut out);
518 assert_eq!(2, out.len());
519 for event in out {
521 match event.as_metric().series().name.name.as_str() {
522 "counter_a" => assert_eq!(counter_a_1, event),
523 "counter_b" => assert_eq!(counter_b_1, event),
524 _ => panic!("Unexpected metric name in aggregate output"),
525 }
526 }
527 }
528
529 #[test]
530 fn absolute_auto() {
531 let mut agg = Aggregate::new(&AggregateConfig {
532 interval_ms: 1000_u64,
533 mode: AggregationMode::Auto,
534 })
535 .unwrap();
536
537 let gauge_a_1 = make_metric(
538 "gauge_a",
539 MetricKind::Absolute,
540 MetricValue::Gauge { value: 42.0 },
541 );
542 let gauge_a_2 = make_metric(
543 "gauge_a",
544 MetricKind::Absolute,
545 MetricValue::Gauge { value: 43.0 },
546 );
547
548 assert_eq!(agg.record(gauge_a_1.clone()), None);
550 let mut out = vec![];
551 agg.flush_into(&mut out);
553 assert_eq!(1, out.len());
554 assert_eq!(&gauge_a_1, &out[0]);
555
556 out.clear();
558 agg.flush_into(&mut out);
559 assert_eq!(0, out.len());
560
561 out.clear();
563 agg.flush_into(&mut out);
564 assert_eq!(0, out.len());
565
566 assert_eq!(agg.record(gauge_a_1.clone()), None);
568 assert_eq!(agg.record(gauge_a_2.clone()), None);
569 out.clear();
570 agg.flush_into(&mut out);
571 assert_eq!(1, out.len());
572 assert_eq!(&gauge_a_2, &out[0]);
573
574 let gauge_b_1 = make_metric(
575 "gauge_b",
576 MetricKind::Absolute,
577 MetricValue::Gauge { value: 44.0 },
578 );
579 assert_eq!(agg.record(gauge_a_1.clone()), None);
581 assert_eq!(agg.record(gauge_b_1.clone()), None);
582 out.clear();
583 agg.flush_into(&mut out);
584 assert_eq!(2, out.len());
585 for event in out {
587 match event.as_metric().series().name.name.as_str() {
588 "gauge_a" => assert_eq!(gauge_a_1, event),
589 "gauge_b" => assert_eq!(gauge_b_1, event),
590 _ => panic!("Unexpected metric name in aggregate output"),
591 }
592 }
593 }
594
595 #[test]
596 fn count_agg() {
597 let mut agg = Aggregate::new(&AggregateConfig {
598 interval_ms: 1000_u64,
599 mode: AggregationMode::Count,
600 })
601 .unwrap();
602
603 let gauge_a_1 = make_metric(
604 "gauge_a",
605 MetricKind::Absolute,
606 MetricValue::Gauge { value: 42.0 },
607 );
608 let gauge_a_2 = make_metric(
609 "gauge_a",
610 MetricKind::Absolute,
611 MetricValue::Gauge { value: 43.0 },
612 );
613 let result_count = make_metric(
614 "gauge_a",
615 MetricKind::Absolute,
616 MetricValue::Counter { value: 1.0 },
617 );
618 let result_count_2 = make_metric(
619 "gauge_a",
620 MetricKind::Absolute,
621 MetricValue::Counter { value: 2.0 },
622 );
623
624 assert_eq!(agg.record(gauge_a_1.clone()), None);
626 let mut out = vec![];
627 agg.flush_into(&mut out);
629 assert_eq!(1, out.len());
630 assert_eq!(&result_count, &out[0]);
631
632 out.clear();
634 agg.flush_into(&mut out);
635 assert_eq!(0, out.len());
636
637 out.clear();
639 agg.flush_into(&mut out);
640 assert_eq!(0, out.len());
641
642 assert_eq!(agg.record(gauge_a_1.clone()), None);
644 assert_eq!(agg.record(gauge_a_2.clone()), None);
645 out.clear();
646 agg.flush_into(&mut out);
647 assert_eq!(1, out.len());
648 assert_eq!(&result_count_2, &out[0]);
649 }
650
651 #[test]
652 fn absolute_max() {
653 let mut agg = Aggregate::new(&AggregateConfig {
654 interval_ms: 1000_u64,
655 mode: AggregationMode::Max,
656 })
657 .unwrap();
658
659 let gauge_a_1 = make_metric(
660 "gauge_a",
661 MetricKind::Absolute,
662 MetricValue::Gauge { value: 112.0 },
663 );
664 let gauge_a_2 = make_metric(
665 "gauge_a",
666 MetricKind::Absolute,
667 MetricValue::Gauge { value: 89.0 },
668 );
669
670 assert_eq!(agg.record(gauge_a_2.clone()), None);
672 let mut out = vec![];
673 agg.flush_into(&mut out);
675 assert_eq!(1, out.len());
676 assert_eq!(&gauge_a_2, &out[0]);
677
678 out.clear();
680 agg.flush_into(&mut out);
681 assert_eq!(0, out.len());
682
683 out.clear();
685 agg.flush_into(&mut out);
686 assert_eq!(0, out.len());
687
688 assert_eq!(agg.record(gauge_a_1.clone()), None);
690 assert_eq!(agg.record(gauge_a_2.clone()), None);
691 out.clear();
692 agg.flush_into(&mut out);
693 assert_eq!(1, out.len());
694 assert_eq!(&gauge_a_1, &out[0]);
695 }
696
697 #[test]
698 fn absolute_min() {
699 let mut agg = Aggregate::new(&AggregateConfig {
700 interval_ms: 1000_u64,
701 mode: AggregationMode::Min,
702 })
703 .unwrap();
704
705 let gauge_a_1 = make_metric(
706 "gauge_a",
707 MetricKind::Absolute,
708 MetricValue::Gauge { value: 32.0 },
709 );
710 let gauge_a_2 = make_metric(
711 "gauge_a",
712 MetricKind::Absolute,
713 MetricValue::Gauge { value: 89.0 },
714 );
715
716 assert_eq!(agg.record(gauge_a_2.clone()), None);
718 let mut out = vec![];
719 agg.flush_into(&mut out);
721 assert_eq!(1, out.len());
722 assert_eq!(&gauge_a_2, &out[0]);
723
724 out.clear();
726 agg.flush_into(&mut out);
727 assert_eq!(0, out.len());
728
729 out.clear();
731 agg.flush_into(&mut out);
732 assert_eq!(0, out.len());
733
734 assert_eq!(agg.record(gauge_a_1.clone()), None);
736 assert_eq!(agg.record(gauge_a_2.clone()), None);
737 out.clear();
738 agg.flush_into(&mut out);
739 assert_eq!(1, out.len());
740 assert_eq!(&gauge_a_1, &out[0]);
741 }
742
743 #[test]
744 fn absolute_diff() {
745 let mut agg = Aggregate::new(&AggregateConfig {
746 interval_ms: 1000_u64,
747 mode: AggregationMode::Diff,
748 })
749 .unwrap();
750
751 let gauge_a_1 = make_metric(
752 "gauge_a",
753 MetricKind::Absolute,
754 MetricValue::Gauge { value: 32.0 },
755 );
756 let gauge_a_2 = make_metric(
757 "gauge_a",
758 MetricKind::Absolute,
759 MetricValue::Gauge { value: 82.0 },
760 );
761 let result = make_metric(
762 "gauge_a",
763 MetricKind::Absolute,
764 MetricValue::Gauge { value: 50.0 },
765 );
766
767 assert_eq!(agg.record(gauge_a_2.clone()), None);
769 let mut out = vec![];
770 agg.flush_into(&mut out);
772 assert_eq!(1, out.len());
773 assert_eq!(&gauge_a_2, &out[0]);
774
775 out.clear();
777 agg.flush_into(&mut out);
778 assert_eq!(0, out.len());
779
780 out.clear();
782 agg.flush_into(&mut out);
783 assert_eq!(0, out.len());
784
785 assert_eq!(agg.record(gauge_a_1.clone()), None);
787 out.clear();
788 agg.flush_into(&mut out);
789 assert_eq!(1, out.len());
790 assert_eq!(&gauge_a_1, &out[0]);
791
792 assert_eq!(agg.record(gauge_a_2.clone()), None);
793 out.clear();
794 agg.flush_into(&mut out);
795 assert_eq!(1, out.len());
796 assert_eq!(&result, &out[0]);
797 }
798
799 #[test]
800 fn absolute_diff_conflicting_type() {
801 let mut agg = Aggregate::new(&AggregateConfig {
802 interval_ms: 1000_u64,
803 mode: AggregationMode::Diff,
804 })
805 .unwrap();
806
807 let gauge_a_1 = make_metric(
808 "gauge_a",
809 MetricKind::Absolute,
810 MetricValue::Gauge { value: 32.0 },
811 );
812 let gauge_a_2 = make_metric(
813 "gauge_a",
814 MetricKind::Absolute,
815 MetricValue::Counter { value: 1.0 },
816 );
817
818 let mut out = vec![];
819 assert_eq!(agg.record(gauge_a_1.clone()), None);
821 out.clear();
822 agg.flush_into(&mut out);
823 assert_eq!(1, out.len());
824 assert_eq!(&gauge_a_1, &out[0]);
825
826 assert_eq!(agg.record(gauge_a_2.clone()), None);
827 out.clear();
828 agg.flush_into(&mut out);
829 assert_eq!(1, out.len());
830 assert_eq!(&gauge_a_2, &out[0]);
832 }
833
834 #[test]
835 fn absolute_mean() {
836 let mut agg = Aggregate::new(&AggregateConfig {
837 interval_ms: 1000_u64,
838 mode: AggregationMode::Mean,
839 })
840 .unwrap();
841
842 let gauge_a_1 = make_metric(
843 "gauge_a",
844 MetricKind::Absolute,
845 MetricValue::Gauge { value: 32.0 },
846 );
847 let gauge_a_2 = make_metric(
848 "gauge_a",
849 MetricKind::Absolute,
850 MetricValue::Gauge { value: 82.0 },
851 );
852 let gauge_a_3 = make_metric(
853 "gauge_a",
854 MetricKind::Absolute,
855 MetricValue::Gauge { value: 51.0 },
856 );
857 let mean_result = make_metric(
858 "gauge_a",
859 MetricKind::Absolute,
860 MetricValue::Gauge { value: 55.0 },
861 );
862
863 assert_eq!(agg.record(gauge_a_2.clone()), None);
865 let mut out = vec![];
866 agg.flush_into(&mut out);
868 assert_eq!(1, out.len());
869 assert_eq!(&gauge_a_2, &out[0]);
870
871 out.clear();
873 agg.flush_into(&mut out);
874 assert_eq!(0, out.len());
875
876 out.clear();
878 agg.flush_into(&mut out);
879 assert_eq!(0, out.len());
880
881 assert_eq!(agg.record(gauge_a_1.clone()), None);
883 assert_eq!(agg.record(gauge_a_2.clone()), None);
884 assert_eq!(agg.record(gauge_a_3.clone()), None);
885 out.clear();
886 agg.flush_into(&mut out);
887 assert_eq!(1, out.len());
888 assert_eq!(&mean_result, &out[0]);
889 }
890
891 #[test]
892 fn absolute_stdev() {
893 let mut agg = Aggregate::new(&AggregateConfig {
894 interval_ms: 1000_u64,
895 mode: AggregationMode::Stdev,
896 })
897 .unwrap();
898
899 let gauges = vec![
900 make_metric(
901 "gauge_a",
902 MetricKind::Absolute,
903 MetricValue::Gauge { value: 25.0 },
904 ),
905 make_metric(
906 "gauge_a",
907 MetricKind::Absolute,
908 MetricValue::Gauge { value: 30.0 },
909 ),
910 make_metric(
911 "gauge_a",
912 MetricKind::Absolute,
913 MetricValue::Gauge { value: 35.0 },
914 ),
915 make_metric(
916 "gauge_a",
917 MetricKind::Absolute,
918 MetricValue::Gauge { value: 40.0 },
919 ),
920 make_metric(
921 "gauge_a",
922 MetricKind::Absolute,
923 MetricValue::Gauge { value: 45.0 },
924 ),
925 make_metric(
926 "gauge_a",
927 MetricKind::Absolute,
928 MetricValue::Gauge { value: 50.0 },
929 ),
930 make_metric(
931 "gauge_a",
932 MetricKind::Absolute,
933 MetricValue::Gauge { value: 55.0 },
934 ),
935 ];
936 let stdev_result = make_metric(
937 "gauge_a",
938 MetricKind::Absolute,
939 MetricValue::Gauge { value: 10.0 },
940 );
941
942 for gauge in gauges {
943 assert_eq!(agg.record(gauge), None);
944 }
945 let mut out = vec![];
946 agg.flush_into(&mut out);
947 assert_eq!(1, out.len());
948 assert_eq!(&stdev_result, &out[0]);
949 }
950
951 #[test]
952 fn passes_through_ignored_kind() {
953 let mut agg = Aggregate::new(&AggregateConfig {
955 interval_ms: 1000_u64,
956 mode: AggregationMode::Sum,
957 })
958 .unwrap();
959
960 let counter_1 = make_metric(
961 "counter_a",
962 MetricKind::Incremental,
963 MetricValue::Counter { value: 10.0 },
964 );
965 let counter_2 = make_metric(
966 "counter_a",
967 MetricKind::Incremental,
968 MetricValue::Counter { value: 5.0 },
969 );
970 let counter_summed = make_metric(
971 "counter_a",
972 MetricKind::Incremental,
973 MetricValue::Counter { value: 15.0 },
974 );
975 let gauge_1 = make_metric(
976 "gauge_a",
977 MetricKind::Absolute,
978 MetricValue::Gauge { value: 42.0 },
979 );
980 let gauge_2 = make_metric(
981 "gauge_a",
982 MetricKind::Absolute,
983 MetricValue::Gauge { value: 99.0 },
984 );
985
986 assert_eq!(agg.record(gauge_1.clone()), Some(gauge_1));
988 assert_eq!(agg.record(gauge_2.clone()), Some(gauge_2));
989
990 assert_eq!(agg.record(counter_1), None);
992 assert_eq!(agg.record(counter_2), None);
993
994 let mut out = vec![];
995 agg.flush_into(&mut out);
996 assert_eq!(1, out.len());
998 assert_eq!(&counter_summed, &out[0]);
999 }
1000
1001 #[test]
1002 fn conflicting_value_type() {
1003 let mut agg = Aggregate::new(&AggregateConfig {
1004 interval_ms: 1000_u64,
1005 mode: AggregationMode::Auto,
1006 })
1007 .unwrap();
1008
1009 let counter = make_metric(
1010 "the-thing",
1011 MetricKind::Incremental,
1012 MetricValue::Counter { value: 42.0 },
1013 );
1014 let mut values = BTreeSet::<String>::new();
1015 values.insert("a".into());
1016 values.insert("b".into());
1017 let set = make_metric(
1018 "the-thing",
1019 MetricKind::Incremental,
1020 MetricValue::Set { values },
1021 );
1022 let summed = make_metric(
1023 "the-thing",
1024 MetricKind::Incremental,
1025 MetricValue::Counter { value: 84.0 },
1026 );
1027
1028 assert_eq!(agg.record(counter.clone()), None);
1032 assert_eq!(agg.record(counter.clone()), None);
1034 assert_eq!(agg.record(set.clone()), None);
1036 assert_eq!(agg.record(set.clone()), None);
1038 let mut out = vec![];
1039 agg.flush_into(&mut out);
1041 assert_eq!(1, out.len());
1042 assert_eq!(&set, &out[0]);
1043
1044 assert_eq!(agg.record(set.clone()), None);
1046 assert_eq!(agg.record(set), None);
1048 assert_eq!(agg.record(counter.clone()), None);
1050 assert_eq!(agg.record(counter), None);
1052 let mut out = vec![];
1053 agg.flush_into(&mut out);
1055 assert_eq!(1, out.len());
1056 assert_eq!(&summed, &out[0]);
1057 }
1058
1059 #[test]
1060 fn conflicting_kinds() {
1061 let mut agg = Aggregate::new(&AggregateConfig {
1062 interval_ms: 1000_u64,
1063 mode: AggregationMode::Auto,
1064 })
1065 .unwrap();
1066
1067 let incremental = make_metric(
1068 "the-thing",
1069 MetricKind::Incremental,
1070 MetricValue::Counter { value: 42.0 },
1071 );
1072 let absolute = make_metric(
1073 "the-thing",
1074 MetricKind::Absolute,
1075 MetricValue::Counter { value: 43.0 },
1076 );
1077 let summed = make_metric(
1078 "the-thing",
1079 MetricKind::Incremental,
1080 MetricValue::Counter { value: 84.0 },
1081 );
1082
1083 assert_eq!(agg.record(incremental.clone()), None);
1087 assert_eq!(agg.record(incremental.clone()), None);
1089 assert_eq!(agg.record(absolute.clone()), None);
1091 assert_eq!(agg.record(absolute.clone()), None);
1093 let mut out = vec![];
1094 agg.flush_into(&mut out);
1096 assert_eq!(1, out.len());
1097 assert_eq!(&absolute, &out[0]);
1098
1099 assert_eq!(agg.record(absolute.clone()), None);
1101 assert_eq!(agg.record(absolute), None);
1103 assert_eq!(agg.record(incremental.clone()), None);
1105 assert_eq!(agg.record(incremental), None);
1107 let mut out = vec![];
1108 agg.flush_into(&mut out);
1110 assert_eq!(1, out.len());
1111 assert_eq!(&summed, &out[0]);
1112 }
1113
1114 #[tokio::test]
1115 async fn transform_shutdown() {
1116 let agg = toml::from_str::<AggregateConfig>(
1117 r"
1118interval_ms = 999999
1119",
1120 )
1121 .unwrap()
1122 .build(&TransformContext::default())
1123 .await
1124 .unwrap();
1125
1126 let agg = agg.into_task();
1127
1128 let counter_a_1 = make_metric(
1129 "counter_a",
1130 MetricKind::Incremental,
1131 MetricValue::Counter { value: 42.0 },
1132 );
1133 let counter_a_2 = make_metric(
1134 "counter_a",
1135 MetricKind::Incremental,
1136 MetricValue::Counter { value: 43.0 },
1137 );
1138 let counter_a_summed = make_metric(
1139 "counter_a",
1140 MetricKind::Incremental,
1141 MetricValue::Counter { value: 85.0 },
1142 );
1143 let gauge_a_1 = make_metric(
1144 "gauge_a",
1145 MetricKind::Absolute,
1146 MetricValue::Gauge { value: 42.0 },
1147 );
1148 let gauge_a_2 = make_metric(
1149 "gauge_a",
1150 MetricKind::Absolute,
1151 MetricValue::Gauge { value: 43.0 },
1152 );
1153 let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1154
1155 let in_stream = Box::pin(stream::iter(inputs));
1157 let mut out_stream = agg.transform_events(in_stream);
1159
1160 let mut count = 0_u8;
1164 while let Some(event) = out_stream.next().await {
1165 count += 1;
1166 match event.as_metric().series().name.name.as_str() {
1167 "counter_a" => assert_eq!(counter_a_summed, event),
1168 "gauge_a" => assert_eq!(gauge_a_2, event),
1169 _ => panic!("Unexpected metric name in aggregate output"),
1170 };
1171 }
1172 assert_eq!(2, count);
1174 }
1175
1176 #[tokio::test]
1177 async fn transform_interval() {
1178 let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1179
1180 let counter_a_1 = make_metric(
1181 "counter_a",
1182 MetricKind::Incremental,
1183 MetricValue::Counter { value: 42.0 },
1184 );
1185 let counter_a_2 = make_metric(
1186 "counter_a",
1187 MetricKind::Incremental,
1188 MetricValue::Counter { value: 43.0 },
1189 );
1190 let counter_a_summed = make_metric(
1191 "counter_a",
1192 MetricKind::Incremental,
1193 MetricValue::Counter { value: 85.0 },
1194 );
1195 let gauge_a_1 = make_metric(
1196 "gauge_a",
1197 MetricKind::Absolute,
1198 MetricValue::Gauge { value: 42.0 },
1199 );
1200 let gauge_a_2 = make_metric(
1201 "gauge_a",
1202 MetricKind::Absolute,
1203 MetricValue::Gauge { value: 43.0 },
1204 );
1205
1206 assert_transform_compliance(async {
1207 let (tx, rx) = mpsc::channel(10);
1208 let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1209 let mut out = ReceiverStream::new(out);
1210
1211 tokio::time::pause();
1212
1213 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1216
1217 tx.send(counter_a_1).await.unwrap();
1219 tx.send(counter_a_2).await.unwrap();
1220 tx.send(gauge_a_1).await.unwrap();
1221 tx.send(gauge_a_2.clone()).await.unwrap();
1222 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1224 tokio::time::advance(Duration::from_secs(11)).await;
1226 let mut count = 0_u8;
1229 while count < 2 {
1230 match out.next().await {
1231 Some(event) => {
1232 match event.as_metric().series().name.name.as_str() {
1233 "counter_a" => assert_eq!(counter_a_summed, event),
1234 "gauge_a" => assert_eq!(gauge_a_2, event),
1235 _ => panic!("Unexpected metric name in aggregate output"),
1236 };
1237 count += 1;
1238 }
1239 _ => {
1240 panic!("Unexpectedly received None in output stream");
1241 }
1242 }
1243 }
1244 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1246
1247 drop(tx);
1248 topology.stop().await;
1249 assert_eq!(out.next().await, None);
1250 })
1251 .await;
1252 }
1253}