1use std::{
2 marker::PhantomData,
3 time::{Duration, Instant},
4};
5
6use indexmap::IndexMap;
7use lru::LruCache;
8use serde_with::serde_as;
9use snafu::Snafu;
10use vector_config_macros::configurable_component;
11use vector_lib::{
12 ByteSizeOf,
13 event::{
14 EventMetadata, Metric, MetricKind,
15 metric::{MetricData, MetricSeries},
16 },
17};
18
19#[derive(Debug, Snafu, PartialEq, Eq)]
20pub enum NormalizerError {
21 #[snafu(display("`max_bytes` must be greater than zero"))]
22 InvalidMaxBytes,
23 #[snafu(display("`max_events` must be greater than zero"))]
24 InvalidMaxEvents,
25 #[snafu(display("`time_to_live` must be greater than zero"))]
26 InvalidTimeToLive,
27}
28
29#[serde_as]
31#[configurable_component]
32#[configurable(metadata(docs::advanced))]
33#[derive(Clone, Copy, Debug, Default)]
34pub struct NormalizerConfig<D: NormalizerSettings + Clone> {
35 #[serde(default = "default_max_bytes::<D>")]
37 #[configurable(metadata(docs::type_unit = "bytes"))]
38 pub max_bytes: Option<usize>,
39
40 #[serde(default = "default_max_events::<D>")]
42 #[configurable(metadata(docs::type_unit = "events"))]
43 pub max_events: Option<usize>,
44
45 #[serde(default = "default_time_to_live::<D>")]
47 #[configurable(metadata(docs::type_unit = "seconds"))]
48 #[configurable(metadata(docs::human_name = "Time To Live"))]
49 pub time_to_live: Option<u64>,
50
51 #[serde(skip)]
52 pub _d: PhantomData<D>,
53}
54
55const fn default_max_bytes<D: NormalizerSettings>() -> Option<usize> {
56 D::MAX_BYTES
57}
58
59const fn default_max_events<D: NormalizerSettings>() -> Option<usize> {
60 D::MAX_EVENTS
61}
62
63const fn default_time_to_live<D: NormalizerSettings>() -> Option<u64> {
64 D::TIME_TO_LIVE
65}
66
67impl<D: NormalizerSettings + Clone> NormalizerConfig<D> {
68 pub fn validate(&self) -> Result<NormalizerConfig<D>, NormalizerError> {
69 let config = NormalizerConfig::<D> {
70 max_bytes: self.max_bytes.or(D::MAX_BYTES),
71 max_events: self.max_events.or(D::MAX_EVENTS),
72 time_to_live: self.time_to_live.or(D::TIME_TO_LIVE),
73 _d: Default::default(),
74 };
75 match (config.max_bytes, config.max_events, config.time_to_live) {
76 (Some(0), _, _) => Err(NormalizerError::InvalidMaxBytes),
77 (_, Some(0), _) => Err(NormalizerError::InvalidMaxEvents),
78 (_, _, Some(0)) => Err(NormalizerError::InvalidTimeToLive),
79 _ => Ok(config),
80 }
81 }
82
83 pub const fn into_settings(self) -> MetricSetSettings {
84 MetricSetSettings {
85 max_bytes: self.max_bytes,
86 max_events: self.max_events,
87 time_to_live: self.time_to_live,
88 }
89 }
90}
91
92pub trait NormalizerSettings {
93 const MAX_EVENTS: Option<usize>;
94 const MAX_BYTES: Option<usize>;
95 const TIME_TO_LIVE: Option<u64>;
96}
97
98#[derive(Clone, Copy, Debug, Default)]
99pub struct DefaultNormalizerSettings;
100
101impl NormalizerSettings for DefaultNormalizerSettings {
102 const MAX_EVENTS: Option<usize> = None;
103 const MAX_BYTES: Option<usize> = None;
104 const TIME_TO_LIVE: Option<u64> = None;
105}
106
107pub trait MetricNormalize {
120 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
135}
136
137pub struct MetricNormalizer<N> {
143 state: MetricSet,
144 normalizer: N,
145}
146
147impl<N> MetricNormalizer<N> {
148 pub fn with_config<D: NormalizerSettings + Clone>(
150 normalizer: N,
151 config: NormalizerConfig<D>,
152 ) -> Self {
153 let settings = config
154 .validate()
155 .unwrap_or_else(|e| panic!("Invalid cache settings: {e:?}"))
156 .into_settings();
157 Self {
158 state: MetricSet::new(settings),
159 normalizer,
160 }
161 }
162
163 pub const fn get_state_mut(&mut self) -> &mut MetricSet {
165 &mut self.state
166 }
167}
168
169impl<N: MetricNormalize> MetricNormalizer<N> {
170 pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
174 self.normalizer.normalize(&mut self.state, metric)
175 }
176}
177
178impl<N: Default> Default for MetricNormalizer<N> {
179 fn default() -> Self {
180 Self {
181 state: MetricSet::default(),
182 normalizer: N::default(),
183 }
184 }
185}
186
187impl<N> From<N> for MetricNormalizer<N> {
188 fn from(normalizer: N) -> Self {
189 Self {
190 state: MetricSet::default(),
191 normalizer,
192 }
193 }
194}
195
196#[derive(Clone, Debug)]
198pub struct MetricEntry {
199 pub data: MetricData,
201 pub metadata: EventMetadata,
203 pub timestamp: Option<Instant>,
205}
206
207impl ByteSizeOf for MetricEntry {
208 fn allocated_bytes(&self) -> usize {
209 self.data.allocated_bytes() + self.metadata.allocated_bytes()
210 }
211}
212
213impl MetricEntry {
214 pub const fn new(
216 data: MetricData,
217 metadata: EventMetadata,
218 timestamp: Option<Instant>,
219 ) -> Self {
220 Self {
221 data,
222 metadata,
223 timestamp,
224 }
225 }
226
227 pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
229 let (series, data, metadata) = metric.into_parts();
230 let entry = Self::new(data, metadata, timestamp);
231 (series, entry)
232 }
233
234 pub fn into_metric(self, series: MetricSeries) -> Metric {
236 Metric::from_parts(series, self.data, self.metadata)
237 }
238
239 pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
241 self.timestamp = timestamp;
242 }
243
244 pub fn is_expired(&self, ttl: Duration, reference_time: Instant) -> bool {
248 match self.timestamp {
249 Some(ts) => reference_time.duration_since(ts) >= ttl,
250 None => false,
251 }
252 }
253}
254
255#[derive(Clone, Debug)]
257pub struct CapacityPolicy {
258 pub max_bytes: Option<usize>,
260 pub max_events: Option<usize>,
262 current_memory: usize,
264}
265
266impl CapacityPolicy {
267 pub const fn new(max_bytes: Option<usize>, max_events: Option<usize>) -> Self {
269 Self {
270 max_bytes,
271 max_events,
272 current_memory: 0,
273 }
274 }
275
276 pub const fn current_memory(&self) -> usize {
278 self.current_memory
279 }
280
281 const fn remove_memory(&mut self, bytes: usize) {
283 self.current_memory = self.current_memory.saturating_sub(bytes);
284 }
285
286 pub fn free_item(&mut self, series: &MetricSeries, entry: &MetricEntry) {
289 if self.max_bytes.is_some() {
290 let freed_memory = self.item_size(series, entry);
291 self.remove_memory(freed_memory);
292 }
293 }
294
295 const fn replace_memory(&mut self, old_bytes: usize, new_bytes: usize) {
297 self.current_memory = self
298 .current_memory
299 .saturating_sub(old_bytes)
300 .saturating_add(new_bytes);
301 }
302
303 const fn exceeds_memory_limit(&self) -> bool {
305 if let Some(max_bytes) = self.max_bytes {
306 self.current_memory > max_bytes
307 } else {
308 false
309 }
310 }
311
312 const fn exceeds_entry_limit(&self, entry_count: usize) -> bool {
314 if let Some(max_events) = self.max_events {
315 entry_count > max_events
316 } else {
317 false
318 }
319 }
320
321 const fn needs_eviction(&self, entry_count: usize) -> bool {
323 self.exceeds_memory_limit() || self.exceeds_entry_limit(entry_count)
324 }
325
326 pub fn item_size(&self, series: &MetricSeries, entry: &MetricEntry) -> usize {
328 entry.allocated_bytes() + series.allocated_bytes()
329 }
330}
331
332#[derive(Clone, Debug)]
333pub struct TtlPolicy {
334 pub ttl: Duration,
336 pub cleanup_interval: Duration,
338 pub(crate) last_cleanup: Instant,
340}
341
342impl TtlPolicy {
344 pub fn new(ttl: Duration) -> Self {
347 Self {
348 ttl,
349 cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
350 last_cleanup: Instant::now(),
351 }
352 }
353
354 pub fn should_cleanup(&self) -> Option<Instant> {
358 let now = Instant::now();
359 if now.duration_since(self.last_cleanup) >= self.cleanup_interval {
360 Some(now)
361 } else {
362 None
363 }
364 }
365
366 pub const fn mark_cleanup_done(&mut self, now: Instant) {
368 self.last_cleanup = now;
369 }
370}
371
372#[derive(Debug, Clone, Copy, Default)]
373pub struct MetricSetSettings {
374 pub max_bytes: Option<usize>,
375 pub max_events: Option<usize>,
376 pub time_to_live: Option<u64>,
377}
378
379#[derive(Clone, Debug)]
386enum MetricSetInner {
387 Unbounded(IndexMap<MetricSeries, MetricEntry>),
389 Bounded(LruCache<MetricSeries, MetricEntry>),
391}
392
393impl MetricSetInner {
394 fn len(&self) -> usize {
395 match self {
396 Self::Unbounded(m) => m.len(),
397 Self::Bounded(m) => m.len(),
398 }
399 }
400
401 fn is_empty(&self) -> bool {
402 match self {
403 Self::Unbounded(m) => m.is_empty(),
404 Self::Bounded(m) => m.is_empty(),
405 }
406 }
407
408 fn get_mut(&mut self, key: &MetricSeries) -> Option<&mut MetricEntry> {
413 match self {
414 Self::Unbounded(m) => m.get_mut(key),
415 Self::Bounded(m) => m.get_mut(key),
416 }
417 }
418
419 fn put(&mut self, key: MetricSeries, value: MetricEntry) -> Option<MetricEntry> {
421 match self {
422 Self::Unbounded(m) => m.insert(key, value),
423 Self::Bounded(m) => m.put(key, value),
424 }
425 }
426
427 fn pop(&mut self, key: &MetricSeries) -> Option<MetricEntry> {
429 match self {
430 Self::Unbounded(m) => m.swap_remove(key),
432 Self::Bounded(m) => m.pop(key),
433 }
434 }
435
436 fn iter(&self) -> MetricSetIter<'_> {
437 match self {
438 Self::Unbounded(m) => MetricSetIter::Unbounded(m.iter()),
439 Self::Bounded(m) => MetricSetIter::Bounded(m.iter()),
440 }
441 }
442}
443
444enum MetricSetIter<'a> {
445 Unbounded(indexmap::map::Iter<'a, MetricSeries, MetricEntry>),
446 Bounded(lru::Iter<'a, MetricSeries, MetricEntry>),
447}
448
449impl<'a> Iterator for MetricSetIter<'a> {
450 type Item = (&'a MetricSeries, &'a MetricEntry);
451
452 fn next(&mut self) -> Option<Self::Item> {
453 match self {
454 Self::Unbounded(it) => it.next(),
455 Self::Bounded(it) => it.next(),
456 }
457 }
458}
459
460#[derive(Clone, Debug)]
467pub struct MetricSet {
468 inner: MetricSetInner,
469 capacity_policy: Option<CapacityPolicy>,
471 ttl_policy: Option<TtlPolicy>,
473}
474
475impl MetricSet {
476 pub fn new(settings: MetricSetSettings) -> Self {
478 let capacity_policy = match (settings.max_bytes, settings.max_events) {
480 (None, None) => None,
481 (max_bytes, max_events) => Some(CapacityPolicy::new(max_bytes, max_events)),
482 };
483
484 let ttl_policy = settings
486 .time_to_live
487 .map(|ttl| TtlPolicy::new(Duration::from_secs(ttl)));
488
489 Self::with_policies(capacity_policy, ttl_policy)
490 }
491
492 pub fn with_policies(
494 capacity_policy: Option<CapacityPolicy>,
495 ttl_policy: Option<TtlPolicy>,
496 ) -> Self {
497 let inner = if capacity_policy.is_some() {
500 MetricSetInner::Bounded(LruCache::unbounded())
501 } else {
502 MetricSetInner::Unbounded(IndexMap::default())
503 };
504 Self {
505 inner,
506 capacity_policy,
507 ttl_policy,
508 }
509 }
510
511 pub const fn capacity_policy(&self) -> Option<&CapacityPolicy> {
513 self.capacity_policy.as_ref()
514 }
515
516 pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
518 self.ttl_policy.as_ref()
519 }
520
521 pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
523 self.ttl_policy.as_mut()
524 }
525
526 pub fn len(&self) -> usize {
528 self.inner.len()
529 }
530
531 pub fn is_empty(&self) -> bool {
533 self.inner.is_empty()
534 }
535
536 pub fn weighted_size(&self) -> u64 {
538 self.capacity_policy
539 .as_ref()
540 .map_or(0, |cp| cp.current_memory() as u64)
541 }
542
543 fn create_timestamp(&self) -> Option<Instant> {
545 self.ttl_policy.as_ref().map(|_| Instant::now())
546 }
547
548 fn enforce_capacity_policy(&mut self) {
550 let Some(ref mut capacity_policy) = self.capacity_policy else {
551 return; };
553
554 let MetricSetInner::Bounded(ref mut lru) = self.inner else {
556 debug_assert!(false, "capacity policy set but inner is not Bounded");
557 return;
558 };
559
560 while capacity_policy.needs_eviction(lru.len()) {
562 if let Some((series, entry)) = lru.pop_lru() {
563 capacity_policy.free_item(&series, &entry);
564 } else {
565 break; }
567 }
568 }
569
570 fn maybe_cleanup(&mut self) {
572 let now = match self.ttl_policy().and_then(|config| config.should_cleanup()) {
574 Some(timestamp) => timestamp,
575 None => return, };
577
578 self.cleanup_expired(now);
580
581 if let Some(config) = self.ttl_policy_mut() {
583 config.mark_cleanup_done(now);
584 }
585 }
586
587 fn cleanup_expired(&mut self, now: Instant) {
589 let Some(ttl) = self.ttl_policy().map(|policy| policy.ttl) else {
591 return; };
593
594 let expired_keys: Vec<MetricSeries> = self
596 .inner
597 .iter()
598 .filter(|(_, e)| e.is_expired(ttl, now))
599 .map(|(s, _)| s.clone())
600 .collect();
601
602 for series in expired_keys {
604 if let Some(entry) = self.inner.pop(&series)
605 && let Some(ref mut capacity_policy) = self.capacity_policy
606 {
607 capacity_policy.free_item(&series, &entry);
608 }
609 }
610 }
611
612 fn insert_with_tracking(&mut self, series: MetricSeries, entry: MetricEntry) {
614 let Some(ref mut capacity_policy) = self.capacity_policy else {
615 self.inner.put(series, entry);
616 return; };
618
619 if capacity_policy.max_bytes.is_some() {
621 let entry_size = capacity_policy.item_size(&series, &entry);
623
624 if let Some(existing_entry) = self.inner.put(series.clone(), entry) {
625 let existing_size = capacity_policy.item_size(&series, &existing_entry);
627 capacity_policy.replace_memory(existing_size, entry_size);
628 } else {
629 capacity_policy.replace_memory(0, entry_size);
631 }
632 } else {
633 self.inner.put(series, entry);
635 }
636
637 self.enforce_capacity_policy();
639 }
640
641 pub fn into_metrics(mut self) -> Vec<Metric> {
643 self.cleanup_expired(Instant::now());
645 match self.inner {
646 MetricSetInner::Unbounded(m) => m
647 .into_iter()
648 .map(|(series, entry)| entry.into_metric(series))
649 .collect(),
650 MetricSetInner::Bounded(mut m) => {
651 let mut metrics = Vec::with_capacity(m.len());
652 while let Some((series, entry)) = m.pop_lru() {
653 metrics.push(entry.into_metric(series));
654 }
655 metrics
656 }
657 }
658 }
659
660 pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
663 self.maybe_cleanup();
664 match metric.kind() {
665 MetricKind::Absolute => Some(metric),
666 MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
667 }
668 }
669
670 pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
673 self.maybe_cleanup();
674 match metric.kind() {
675 MetricKind::Absolute => self.absolute_to_incremental(metric),
676 MetricKind::Incremental => Some(metric),
677 }
678 }
679
680 fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
684 let timestamp = self.create_timestamp();
685 match self.inner.get_mut(metric.series()) {
687 Some(existing) => {
688 let mut new_value = existing.data.value().clone();
689 if new_value.add(metric.value()) {
690 metric = metric.with_value(new_value);
692 }
693 self.insert(metric.clone(), timestamp);
695 }
696 None => {
697 self.insert(metric.clone(), timestamp);
698 }
699 }
700 metric.into_absolute()
701 }
702
703 fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
706 let timestamp = self.create_timestamp();
726 match self.inner.get_mut(metric.series()) {
728 Some(reference) => {
729 let new_value = metric.value().clone();
730 let mut new_reference = reference.clone();
733 if metric.subtract(&reference.data) {
735 new_reference.data.value = new_value;
736 new_reference.timestamp = timestamp;
737 self.insert_with_tracking(metric.series().clone(), new_reference);
738 Some(metric.into_incremental())
739 } else {
740 self.insert(metric, timestamp);
742 None
743 }
744 }
745 None => {
746 self.insert(metric, timestamp);
748 None
749 }
750 }
751 }
752
753 fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
754 let (series, entry) = MetricEntry::from_metric(metric, timestamp);
755 self.insert_with_tracking(series, entry);
756 }
757
758 pub fn insert_update(&mut self, metric: Metric) {
759 self.maybe_cleanup();
760 let timestamp = self.create_timestamp();
761 let update = match metric.kind() {
762 MetricKind::Absolute => Some(metric),
763 MetricKind::Incremental => {
764 match self.inner.get_mut(metric.series()) {
766 Some(existing) => {
767 let mut new_existing = existing.clone();
770 let (series, data, metadata) = metric.into_parts();
771 if new_existing.data.update(&data) {
772 new_existing.metadata.merge(metadata);
773 new_existing.update_timestamp(timestamp);
774 self.insert_with_tracking(series, new_existing);
775 None
776 } else {
777 warn!(message = "Metric changed type, dropping old value.", %series);
778 Some(Metric::from_parts(series, data, metadata))
779 }
780 }
781 None => Some(metric),
782 }
783 }
784 };
785 if let Some(metric) = update {
786 self.insert(metric, timestamp);
787 }
788 }
789
790 pub fn remove(&mut self, series: &MetricSeries) -> bool {
794 if let Some(entry) = self.inner.pop(series) {
795 if let Some(ref mut capacity_policy) = self.capacity_policy {
796 capacity_policy.free_item(series, &entry);
797 }
798 return true;
799 }
800 false
801 }
802}
803
804impl Default for MetricSet {
805 fn default() -> Self {
806 Self::new(MetricSetSettings::default())
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use vector_lib::event::metric::{MetricKind, MetricValue};
813
814 use super::*;
815
816 fn counter(name: &str, value: f64, kind: MetricKind) -> Metric {
817 Metric::new(name, kind, MetricValue::Counter { value })
818 }
819
820 #[test]
823 fn unbounded_incremental_to_absolute_accumulates() {
824 let mut set = MetricSet::default();
825 assert!(matches!(set.inner, MetricSetInner::Unbounded(_)));
826
827 let out = set.make_absolute(counter("hits", 1.0, MetricKind::Incremental));
829 assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 1.0 });
830
831 let out = set.make_absolute(counter("hits", 2.0, MetricKind::Incremental));
833 assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 3.0 });
834
835 let metrics = set.into_metrics();
837 assert_eq!(metrics.len(), 1);
838 assert_eq!(metrics[0].name(), "hits");
839 }
840
841 #[test]
842 fn unbounded_absolute_passes_through() {
843 let mut set = MetricSet::default();
844
845 let out = set.make_absolute(counter("rps", 42.0, MetricKind::Absolute));
846 assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 42.0 });
847
848 assert!(set.is_empty());
850 }
851
852 #[test]
854 fn bounded_path_selected_when_capacity_policy_set() {
855 let set = MetricSet::new(MetricSetSettings {
856 max_events: Some(10),
857 ..Default::default()
858 });
859 assert!(matches!(set.inner, MetricSetInner::Bounded(_)));
860 }
861}