vector/sinks/util/buffer/metrics/
normalize.rs

1use std::{
2    marker::PhantomData,
3    time::{Duration, Instant},
4};
5
6use indexmap::IndexMap;
7use lru::LruCache;
8use serde_with::serde_as;
9use snafu::Snafu;
10use vector_config_macros::configurable_component;
11use vector_lib::{
12    ByteSizeOf,
13    event::{
14        EventMetadata, Metric, MetricKind,
15        metric::{MetricData, MetricSeries},
16    },
17};
18
19#[derive(Debug, Snafu, PartialEq, Eq)]
20pub enum NormalizerError {
21    #[snafu(display("`max_bytes` must be greater than zero"))]
22    InvalidMaxBytes,
23    #[snafu(display("`max_events` must be greater than zero"))]
24    InvalidMaxEvents,
25    #[snafu(display("`time_to_live` must be greater than zero"))]
26    InvalidTimeToLive,
27}
28
29/// Defines behavior for creating the MetricNormalizer
30#[serde_as]
31#[configurable_component]
32#[configurable(metadata(docs::advanced))]
33#[derive(Clone, Copy, Debug, Default)]
34pub struct NormalizerConfig<D: NormalizerSettings + Clone> {
35    /// The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead.
36    #[serde(default = "default_max_bytes::<D>")]
37    #[configurable(metadata(docs::type_unit = "bytes"))]
38    pub max_bytes: Option<usize>,
39
40    /// The maximum number of events of the metrics normalizer cache
41    #[serde(default = "default_max_events::<D>")]
42    #[configurable(metadata(docs::type_unit = "events"))]
43    pub max_events: Option<usize>,
44
45    /// The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache.
46    #[serde(default = "default_time_to_live::<D>")]
47    #[configurable(metadata(docs::type_unit = "seconds"))]
48    #[configurable(metadata(docs::human_name = "Time To Live"))]
49    pub time_to_live: Option<u64>,
50
51    #[serde(skip)]
52    pub _d: PhantomData<D>,
53}
54
55const fn default_max_bytes<D: NormalizerSettings>() -> Option<usize> {
56    D::MAX_BYTES
57}
58
59const fn default_max_events<D: NormalizerSettings>() -> Option<usize> {
60    D::MAX_EVENTS
61}
62
63const fn default_time_to_live<D: NormalizerSettings>() -> Option<u64> {
64    D::TIME_TO_LIVE
65}
66
67impl<D: NormalizerSettings + Clone> NormalizerConfig<D> {
68    pub fn validate(&self) -> Result<NormalizerConfig<D>, NormalizerError> {
69        let config = NormalizerConfig::<D> {
70            max_bytes: self.max_bytes.or(D::MAX_BYTES),
71            max_events: self.max_events.or(D::MAX_EVENTS),
72            time_to_live: self.time_to_live.or(D::TIME_TO_LIVE),
73            _d: Default::default(),
74        };
75        match (config.max_bytes, config.max_events, config.time_to_live) {
76            (Some(0), _, _) => Err(NormalizerError::InvalidMaxBytes),
77            (_, Some(0), _) => Err(NormalizerError::InvalidMaxEvents),
78            (_, _, Some(0)) => Err(NormalizerError::InvalidTimeToLive),
79            _ => Ok(config),
80        }
81    }
82
83    pub const fn into_settings(self) -> MetricSetSettings {
84        MetricSetSettings {
85            max_bytes: self.max_bytes,
86            max_events: self.max_events,
87            time_to_live: self.time_to_live,
88        }
89    }
90}
91
92pub trait NormalizerSettings {
93    const MAX_EVENTS: Option<usize>;
94    const MAX_BYTES: Option<usize>;
95    const TIME_TO_LIVE: Option<u64>;
96}
97
98#[derive(Clone, Copy, Debug, Default)]
99pub struct DefaultNormalizerSettings;
100
101impl NormalizerSettings for DefaultNormalizerSettings {
102    const MAX_EVENTS: Option<usize> = None;
103    const MAX_BYTES: Option<usize> = None;
104    const TIME_TO_LIVE: Option<u64> = None;
105}
106
107/// Normalizes metrics according to a set of rules.
108///
109/// Depending on the system in which they are being sent to, metrics may have to be modified in order to fit the data
110/// model or constraints placed on that system.  Typically, this boils down to whether or not the system can accept
111/// absolute metrics or incremental metrics: the latest value of a metric, or the delta between the last time the
112/// metric was observed and now, respective. Other rules may need to be applied, such as dropping metrics of a specific
113/// type that the system does not support.
114///
115/// The trait provides a simple interface to apply this logic uniformly, given a reference to a simple state container
116/// that allows tracking the necessary information of a given metric over time. As well, given the optional return, it
117/// composes nicely with iterators (i.e. using `filter_map`) in order to filter metrics within existing
118/// iterator/stream-based approaches.
119pub trait MetricNormalize {
120    /// Normalizes the metric against the given state.
121    ///
122    /// If the metric was normalized successfully, `Some(metric)` will be returned. Otherwise, `None` is returned.
123    ///
124    /// In some cases, a metric may be successfully added/tracked within the given state, but due to the normalization
125    /// logic, it cannot yet be emitted. An example of this is normalizing all metrics to be incremental.
126    ///
127    /// In this example, if an incoming metric is already incremental, it can be passed through unchanged.  If the
128    /// incoming metric is absolute, however, we need to see it at least twice in order to calculate the incremental
129    /// delta necessary to emit an incremental version. This means that the first time an absolute metric is seen,
130    /// `normalize` would return `None`, and the subsequent calls would return `Some(metric)`.
131    ///
132    /// However, a metric may simply not be supported by a normalization implementation, and so `None` may or may not be
133    /// a common return value. This behavior is, thus, implementation defined.
134    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
135}
136
137/// A self-contained metric normalizer.
138///
139/// The normalization state is stored internally, and it can only be created from a normalizer implementation that is
140/// either `Default` or is constructed ahead of time, so it is primarily useful for constructing a usable normalizer
141/// via implicit conversion methods or when no special parameters are required for configuring the underlying normalizer.
142pub struct MetricNormalizer<N> {
143    state: MetricSet,
144    normalizer: N,
145}
146
147impl<N> MetricNormalizer<N> {
148    /// Creates a new normalizer with the given configuration.
149    pub fn with_config<D: NormalizerSettings + Clone>(
150        normalizer: N,
151        config: NormalizerConfig<D>,
152    ) -> Self {
153        let settings = config
154            .validate()
155            .unwrap_or_else(|e| panic!("Invalid cache settings: {e:?}"))
156            .into_settings();
157        Self {
158            state: MetricSet::new(settings),
159            normalizer,
160        }
161    }
162
163    /// Gets a mutable reference to the current metric state for this normalizer.
164    pub const fn get_state_mut(&mut self) -> &mut MetricSet {
165        &mut self.state
166    }
167}
168
169impl<N: MetricNormalize> MetricNormalizer<N> {
170    /// Normalizes the metric against the internal normalization state.
171    ///
172    /// For more information about normalization, see the documentation for [`MetricNormalize::normalize`].
173    pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
174        self.normalizer.normalize(&mut self.state, metric)
175    }
176}
177
178impl<N: Default> Default for MetricNormalizer<N> {
179    fn default() -> Self {
180        Self {
181            state: MetricSet::default(),
182            normalizer: N::default(),
183        }
184    }
185}
186
187impl<N> From<N> for MetricNormalizer<N> {
188    fn from(normalizer: N) -> Self {
189        Self {
190            state: MetricSet::default(),
191            normalizer,
192        }
193    }
194}
195
196/// Represents a stored metric entry with its data, metadata, and timestamp.
197#[derive(Clone, Debug)]
198pub struct MetricEntry {
199    /// The metric data containing the value and kind
200    pub data: MetricData,
201    /// Event metadata associated with this metric
202    pub metadata: EventMetadata,
203    /// Optional timestamp for TTL tracking
204    pub timestamp: Option<Instant>,
205}
206
207impl ByteSizeOf for MetricEntry {
208    fn allocated_bytes(&self) -> usize {
209        self.data.allocated_bytes() + self.metadata.allocated_bytes()
210    }
211}
212
213impl MetricEntry {
214    /// Creates a new MetricEntry with the given data, metadata, and timestamp.
215    pub const fn new(
216        data: MetricData,
217        metadata: EventMetadata,
218        timestamp: Option<Instant>,
219    ) -> Self {
220        Self {
221            data,
222            metadata,
223            timestamp,
224        }
225    }
226
227    /// Creates a new MetricEntry from a Metric.
228    pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
229        let (series, data, metadata) = metric.into_parts();
230        let entry = Self::new(data, metadata, timestamp);
231        (series, entry)
232    }
233
234    /// Converts this entry back to a Metric with the given series.
235    pub fn into_metric(self, series: MetricSeries) -> Metric {
236        Metric::from_parts(series, self.data, self.metadata)
237    }
238
239    /// Updates this entry's timestamp.
240    pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
241        self.timestamp = timestamp;
242    }
243
244    /// Checks if this entry has expired based on the given TTL and reference time.
245    ///
246    /// Using a provided reference time ensures consistency across multiple expiration checks.
247    pub fn is_expired(&self, ttl: Duration, reference_time: Instant) -> bool {
248        match self.timestamp {
249            Some(ts) => reference_time.duration_since(ts) >= ttl,
250            None => false,
251        }
252    }
253}
254
255/// Configuration for capacity-based eviction (memory and/or entry count limits).
256#[derive(Clone, Debug)]
257pub struct CapacityPolicy {
258    /// Maximum memory usage in bytes
259    pub max_bytes: Option<usize>,
260    /// Maximum number of entries
261    pub max_events: Option<usize>,
262    /// Current memory usage tracking
263    current_memory: usize,
264}
265
266impl CapacityPolicy {
267    /// Creates a new capacity policy with both memory and entry limits.
268    pub const fn new(max_bytes: Option<usize>, max_events: Option<usize>) -> Self {
269        Self {
270            max_bytes,
271            max_events,
272            current_memory: 0,
273        }
274    }
275
276    /// Gets the current memory usage.
277    pub const fn current_memory(&self) -> usize {
278        self.current_memory
279    }
280
281    /// Updates memory tracking when an entry is removed.
282    const fn remove_memory(&mut self, bytes: usize) {
283        self.current_memory = self.current_memory.saturating_sub(bytes);
284    }
285
286    /// Frees the memory for an item if max_bytes is set.
287    /// Only calculates and tracks memory when max_bytes is specified.
288    pub fn free_item(&mut self, series: &MetricSeries, entry: &MetricEntry) {
289        if self.max_bytes.is_some() {
290            let freed_memory = self.item_size(series, entry);
291            self.remove_memory(freed_memory);
292        }
293    }
294
295    /// Updates memory tracking.
296    const fn replace_memory(&mut self, old_bytes: usize, new_bytes: usize) {
297        self.current_memory = self
298            .current_memory
299            .saturating_sub(old_bytes)
300            .saturating_add(new_bytes);
301    }
302
303    /// Checks if the current state exceeds memory limits.
304    const fn exceeds_memory_limit(&self) -> bool {
305        if let Some(max_bytes) = self.max_bytes {
306            self.current_memory > max_bytes
307        } else {
308            false
309        }
310    }
311
312    /// Checks if the given entry count exceeds entry limits.
313    const fn exceeds_entry_limit(&self, entry_count: usize) -> bool {
314        if let Some(max_events) = self.max_events {
315            entry_count > max_events
316        } else {
317            false
318        }
319    }
320
321    /// Returns true if any limits are currently exceeded.
322    const fn needs_eviction(&self, entry_count: usize) -> bool {
323        self.exceeds_memory_limit() || self.exceeds_entry_limit(entry_count)
324    }
325
326    /// Gets the total memory size of entry/series, excluding LRU cache overhead.
327    pub fn item_size(&self, series: &MetricSeries, entry: &MetricEntry) -> usize {
328        entry.allocated_bytes() + series.allocated_bytes()
329    }
330}
331
332#[derive(Clone, Debug)]
333pub struct TtlPolicy {
334    /// Time-to-live for entries
335    pub ttl: Duration,
336    /// How often to run cleanup
337    pub cleanup_interval: Duration,
338    /// Last time cleanup was performed
339    pub(crate) last_cleanup: Instant,
340}
341
342/// Configuration for automatic cleanup of expired entries.
343impl TtlPolicy {
344    /// Creates a new TTL policy with the given duration.
345    /// Cleanup interval defaults to TTL/10 with a 10-second minimum.
346    pub fn new(ttl: Duration) -> Self {
347        Self {
348            ttl,
349            cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
350            last_cleanup: Instant::now(),
351        }
352    }
353
354    /// Checks if it's time to run cleanup.
355    ///
356    /// Returns Some(current_time) if cleanup should be performed, None otherwise.
357    pub fn should_cleanup(&self) -> Option<Instant> {
358        let now = Instant::now();
359        if now.duration_since(self.last_cleanup) >= self.cleanup_interval {
360            Some(now)
361        } else {
362            None
363        }
364    }
365
366    /// Marks cleanup as having been performed with the provided timestamp.
367    pub const fn mark_cleanup_done(&mut self, now: Instant) {
368        self.last_cleanup = now;
369    }
370}
371
372#[derive(Debug, Clone, Copy, Default)]
373pub struct MetricSetSettings {
374    pub max_bytes: Option<usize>,
375    pub max_events: Option<usize>,
376    pub time_to_live: Option<u64>,
377}
378
379/// Inner storage for `MetricSet`.
380///
381/// Uses `IndexMap` when no capacity eviction policy is configured — avoiding the
382/// per-access LRU bookkeeping (pointer chasing in a doubly-linked list) that
383/// `LruCache::get_mut` performs unconditionally.  `LruCache` is used only when a
384/// capacity policy is set, so that LRU eviction order is maintained correctly.
385#[derive(Clone, Debug)]
386enum MetricSetInner {
387    /// Unbounded storage with no eviction.  Hash-map lookup only, no LRU overhead.
388    Unbounded(IndexMap<MetricSeries, MetricEntry>),
389    /// Bounded storage with LRU eviction semantics.
390    Bounded(LruCache<MetricSeries, MetricEntry>),
391}
392
393impl MetricSetInner {
394    fn len(&self) -> usize {
395        match self {
396            Self::Unbounded(m) => m.len(),
397            Self::Bounded(m) => m.len(),
398        }
399    }
400
401    fn is_empty(&self) -> bool {
402        match self {
403            Self::Unbounded(m) => m.is_empty(),
404            Self::Bounded(m) => m.is_empty(),
405        }
406    }
407
408    /// Returns a mutable reference to the entry.
409    ///
410    /// For `Unbounded` this is a plain hash-map lookup.
411    /// For `Bounded` this also promotes the entry to the MRU end of the LRU list.
412    fn get_mut(&mut self, key: &MetricSeries) -> Option<&mut MetricEntry> {
413        match self {
414            Self::Unbounded(m) => m.get_mut(key),
415            Self::Bounded(m) => m.get_mut(key),
416        }
417    }
418
419    /// Inserts or replaces an entry, returning the previous value if any.
420    fn put(&mut self, key: MetricSeries, value: MetricEntry) -> Option<MetricEntry> {
421        match self {
422            Self::Unbounded(m) => m.insert(key, value),
423            Self::Bounded(m) => m.put(key, value),
424        }
425    }
426
427    /// Removes an entry by key, returning it if present.
428    fn pop(&mut self, key: &MetricSeries) -> Option<MetricEntry> {
429        match self {
430            // swap_remove is O(1) vs shift_remove's O(n); insertion order is not required here.
431            Self::Unbounded(m) => m.swap_remove(key),
432            Self::Bounded(m) => m.pop(key),
433        }
434    }
435
436    fn iter(&self) -> MetricSetIter<'_> {
437        match self {
438            Self::Unbounded(m) => MetricSetIter::Unbounded(m.iter()),
439            Self::Bounded(m) => MetricSetIter::Bounded(m.iter()),
440        }
441    }
442}
443
444enum MetricSetIter<'a> {
445    Unbounded(indexmap::map::Iter<'a, MetricSeries, MetricEntry>),
446    Bounded(lru::Iter<'a, MetricSeries, MetricEntry>),
447}
448
449impl<'a> Iterator for MetricSetIter<'a> {
450    type Item = (&'a MetricSeries, &'a MetricEntry);
451
452    fn next(&mut self) -> Option<Self::Item> {
453        match self {
454            Self::Unbounded(it) => it.next(),
455            Self::Bounded(it) => it.next(),
456        }
457    }
458}
459
460/// Dual-limit cache for metric normalization with optional capacity and TTL policies.
461///
462/// Uses `IndexMap` internally when no capacity eviction policy is configured, avoiding
463/// the per-access LRU pointer-manipulation overhead of `LruCache`. Switches to
464/// `LruCache` only when a `max_bytes` or `max_events` capacity policy is set, so that
465/// LRU eviction ordering is preserved for those cases.
466#[derive(Clone, Debug)]
467pub struct MetricSet {
468    inner: MetricSetInner,
469    /// Optional capacity policy for memory and/or entry count limits
470    capacity_policy: Option<CapacityPolicy>,
471    /// Optional TTL policy for time-based expiration
472    ttl_policy: Option<TtlPolicy>,
473}
474
475impl MetricSet {
476    /// Creates a new MetricSet with the given settings.
477    pub fn new(settings: MetricSetSettings) -> Self {
478        // Create capacity policy if any capacity limit is set
479        let capacity_policy = match (settings.max_bytes, settings.max_events) {
480            (None, None) => None,
481            (max_bytes, max_events) => Some(CapacityPolicy::new(max_bytes, max_events)),
482        };
483
484        // Create TTL policy if time-to-live is set
485        let ttl_policy = settings
486            .time_to_live
487            .map(|ttl| TtlPolicy::new(Duration::from_secs(ttl)));
488
489        Self::with_policies(capacity_policy, ttl_policy)
490    }
491
492    /// Creates a new MetricSet with the given policies.
493    pub fn with_policies(
494        capacity_policy: Option<CapacityPolicy>,
495        ttl_policy: Option<TtlPolicy>,
496    ) -> Self {
497        // Use LruCache only when a capacity policy requires LRU eviction ordering.
498        // Without a capacity policy, IndexMap avoids the per-access LRU overhead.
499        let inner = if capacity_policy.is_some() {
500            MetricSetInner::Bounded(LruCache::unbounded())
501        } else {
502            MetricSetInner::Unbounded(IndexMap::default())
503        };
504        Self {
505            inner,
506            capacity_policy,
507            ttl_policy,
508        }
509    }
510
511    /// Gets the current capacity policy.
512    pub const fn capacity_policy(&self) -> Option<&CapacityPolicy> {
513        self.capacity_policy.as_ref()
514    }
515
516    /// Gets the current TTL policy.
517    pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
518        self.ttl_policy.as_ref()
519    }
520
521    /// Gets a mutable reference to the TTL policy configuration.
522    pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
523        self.ttl_policy.as_mut()
524    }
525
526    /// Gets the current number of entries in the cache.
527    pub fn len(&self) -> usize {
528        self.inner.len()
529    }
530
531    /// Returns true if the cache contains no entries.
532    pub fn is_empty(&self) -> bool {
533        self.inner.is_empty()
534    }
535
536    /// Gets the current memory usage in bytes.
537    pub fn weighted_size(&self) -> u64 {
538        self.capacity_policy
539            .as_ref()
540            .map_or(0, |cp| cp.current_memory() as u64)
541    }
542
543    /// Creates a timestamp if TTL is enabled.
544    fn create_timestamp(&self) -> Option<Instant> {
545        self.ttl_policy.as_ref().map(|_| Instant::now())
546    }
547
548    /// Enforce memory and entry limits by evicting LRU entries.
549    fn enforce_capacity_policy(&mut self) {
550        let Some(ref mut capacity_policy) = self.capacity_policy else {
551            return; // No capacity limits configured
552        };
553
554        // A capacity policy is only set when inner is Bounded; this should always be true.
555        let MetricSetInner::Bounded(ref mut lru) = self.inner else {
556            debug_assert!(false, "capacity policy set but inner is not Bounded");
557            return;
558        };
559
560        // Keep evicting until we're within limits
561        while capacity_policy.needs_eviction(lru.len()) {
562            if let Some((series, entry)) = lru.pop_lru() {
563                capacity_policy.free_item(&series, &entry);
564            } else {
565                break; // No more entries to evict
566            }
567        }
568    }
569
570    /// Perform TTL cleanup if configured and needed.
571    fn maybe_cleanup(&mut self) {
572        // Check if cleanup is needed and get the current timestamp in one operation
573        let now = match self.ttl_policy().and_then(|config| config.should_cleanup()) {
574            Some(timestamp) => timestamp,
575            None => return, // No cleanup needed
576        };
577
578        // Perform the cleanup using the same timestamp
579        self.cleanup_expired(now);
580
581        // Mark cleanup as done with the same timestamp
582        if let Some(config) = self.ttl_policy_mut() {
583            config.mark_cleanup_done(now);
584        }
585    }
586
587    /// Remove expired entries based on TTL using the provided timestamp.
588    fn cleanup_expired(&mut self, now: Instant) {
589        // Get the TTL from the policy
590        let Some(ttl) = self.ttl_policy().map(|policy| policy.ttl) else {
591            return; // No TTL policy, nothing to do
592        };
593
594        // Collect expired keys using the provided timestamp
595        let expired_keys: Vec<MetricSeries> = self
596            .inner
597            .iter()
598            .filter(|(_, e)| e.is_expired(ttl, now))
599            .map(|(s, _)| s.clone())
600            .collect();
601
602        // Remove expired entries and update memory tracking (if max_bytes is set)
603        for series in expired_keys {
604            if let Some(entry) = self.inner.pop(&series)
605                && let Some(ref mut capacity_policy) = self.capacity_policy
606            {
607                capacity_policy.free_item(&series, &entry);
608            }
609        }
610    }
611
612    /// Internal insert that updates memory tracking and enforces limits.
613    fn insert_with_tracking(&mut self, series: MetricSeries, entry: MetricEntry) {
614        let Some(ref mut capacity_policy) = self.capacity_policy else {
615            self.inner.put(series, entry);
616            return; // No capacity limits configured, return immediately
617        };
618
619        // Handle differently based on whether we need to track memory
620        if capacity_policy.max_bytes.is_some() {
621            // When tracking memory, we need to calculate sizes before and after
622            let entry_size = capacity_policy.item_size(&series, &entry);
623
624            if let Some(existing_entry) = self.inner.put(series.clone(), entry) {
625                // If we had an existing entry, calculate its size and adjust memory tracking
626                let existing_size = capacity_policy.item_size(&series, &existing_entry);
627                capacity_policy.replace_memory(existing_size, entry_size);
628            } else {
629                // No existing entry, just add the new entry's size
630                capacity_policy.replace_memory(0, entry_size);
631            }
632        } else {
633            // When not tracking memory (only entry count limits), just put directly
634            self.inner.put(series, entry);
635        }
636
637        // Enforce limits after insertion
638        self.enforce_capacity_policy();
639    }
640
641    /// Consumes this MetricSet and returns a vector of Metric.
642    pub fn into_metrics(mut self) -> Vec<Metric> {
643        // Clean up expired entries first (using current time)
644        self.cleanup_expired(Instant::now());
645        match self.inner {
646            MetricSetInner::Unbounded(m) => m
647                .into_iter()
648                .map(|(series, entry)| entry.into_metric(series))
649                .collect(),
650            MetricSetInner::Bounded(mut m) => {
651                let mut metrics = Vec::with_capacity(m.len());
652                while let Some((series, entry)) = m.pop_lru() {
653                    metrics.push(entry.into_metric(series));
654                }
655                metrics
656            }
657        }
658    }
659
660    /// Either pass the metric through as-is if absolute, or convert it
661    /// to absolute if incremental.
662    pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
663        self.maybe_cleanup();
664        match metric.kind() {
665            MetricKind::Absolute => Some(metric),
666            MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
667        }
668    }
669
670    /// Either convert the metric to incremental if absolute, or
671    /// aggregate it with any previous value if already incremental.
672    pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
673        self.maybe_cleanup();
674        match metric.kind() {
675            MetricKind::Absolute => self.absolute_to_incremental(metric),
676            MetricKind::Incremental => Some(metric),
677        }
678    }
679
680    /// Convert the incremental metric into an absolute one, using the
681    /// state buffer to keep track of the value throughout the entire
682    /// application uptime.
683    fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
684        let timestamp = self.create_timestamp();
685        // We always call insert() to track memory usage
686        match self.inner.get_mut(metric.series()) {
687            Some(existing) => {
688                let mut new_value = existing.data.value().clone();
689                if new_value.add(metric.value()) {
690                    // Update the stored value
691                    metric = metric.with_value(new_value);
692                }
693                // Insert the updated stored value, or as store a new reference value (if the Metric changed type)
694                self.insert(metric.clone(), timestamp);
695            }
696            None => {
697                self.insert(metric.clone(), timestamp);
698            }
699        }
700        metric.into_absolute()
701    }
702
703    /// Convert the absolute metric into an incremental by calculating
704    /// the increment from the last saved absolute state.
705    fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
706        // NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could
707        // this lead to issues where a metric isn't seen again and we, in effect, never emit it?
708        //
709        // You're not wrong, and that does happen based on the logic below.  However, the main
710        // problem this logic solves is avoiding massive counter updates when Vector restarts.
711        //
712        // If we emitted a metric for a newly-seen absolute metric in this method, we would
713        // naturally have to emit an incremental version where the value was the absolute value,
714        // with subsequent updates being only delta updates.  If we restarted Vector, however, we
715        // would be back to not having yet seen the metric before, so the first emission of the
716        // metric after converting it here would be... its absolute value.  Even if the value only
717        // changed by 1 between Vector stopping and restarting, we could be incrementing the counter
718        // by some outrageous amount.
719        //
720        // Thus, we only emit a metric when we've calculated an actual delta for it, which means
721        // that, yes, we're risking never seeing a metric if it's not re-emitted, and we're
722        // introducing a small amount of lag before a metric is emitted by having to wait to see it
723        // again, but this is a behavior we have to observe for sinks that can only handle
724        // incremental updates.
725        let timestamp = self.create_timestamp();
726        // We always call insert() to track memory usage
727        match self.inner.get_mut(metric.series()) {
728            Some(reference) => {
729                let new_value = metric.value().clone();
730                // Create a copy of the reference so we can insert and
731                // replace the existing entry, tracking memory usage
732                let mut new_reference = reference.clone();
733                // From the stored reference value, emit an increment
734                if metric.subtract(&reference.data) {
735                    new_reference.data.value = new_value;
736                    new_reference.timestamp = timestamp;
737                    self.insert_with_tracking(metric.series().clone(), new_reference);
738                    Some(metric.into_incremental())
739                } else {
740                    // Metric changed type, store this and emit nothing
741                    self.insert(metric, timestamp);
742                    None
743                }
744            }
745            None => {
746                // No reference so store this and emit nothing
747                self.insert(metric, timestamp);
748                None
749            }
750        }
751    }
752
753    fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
754        let (series, entry) = MetricEntry::from_metric(metric, timestamp);
755        self.insert_with_tracking(series, entry);
756    }
757
758    pub fn insert_update(&mut self, metric: Metric) {
759        self.maybe_cleanup();
760        let timestamp = self.create_timestamp();
761        let update = match metric.kind() {
762            MetricKind::Absolute => Some(metric),
763            MetricKind::Incremental => {
764                // Incremental metrics update existing entries, if present
765                match self.inner.get_mut(metric.series()) {
766                    Some(existing) => {
767                        // Create a copy of the reference so we can insert and
768                        // replace the existing entry, tracking memory usage
769                        let mut new_existing = existing.clone();
770                        let (series, data, metadata) = metric.into_parts();
771                        if new_existing.data.update(&data) {
772                            new_existing.metadata.merge(metadata);
773                            new_existing.update_timestamp(timestamp);
774                            self.insert_with_tracking(series, new_existing);
775                            None
776                        } else {
777                            warn!(message = "Metric changed type, dropping old value.", %series);
778                            Some(Metric::from_parts(series, data, metadata))
779                        }
780                    }
781                    None => Some(metric),
782                }
783            }
784        };
785        if let Some(metric) = update {
786            self.insert(metric, timestamp);
787        }
788    }
789
790    /// Removes a series from the cache.
791    ///
792    /// If the series existed and was removed, returns true.  Otherwise, false.
793    pub fn remove(&mut self, series: &MetricSeries) -> bool {
794        if let Some(entry) = self.inner.pop(series) {
795            if let Some(ref mut capacity_policy) = self.capacity_policy {
796                capacity_policy.free_item(series, &entry);
797            }
798            return true;
799        }
800        false
801    }
802}
803
804impl Default for MetricSet {
805    fn default() -> Self {
806        Self::new(MetricSetSettings::default())
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use vector_lib::event::metric::{MetricKind, MetricValue};
813
814    use super::*;
815
816    fn counter(name: &str, value: f64, kind: MetricKind) -> Metric {
817        Metric::new(name, kind, MetricValue::Counter { value })
818    }
819
820    // Verifies that the default (no capacity policy) path uses IndexMap and that
821    // make_absolute / into_metrics behave correctly across multiple updates.
822    #[test]
823    fn unbounded_incremental_to_absolute_accumulates() {
824        let mut set = MetricSet::default();
825        assert!(matches!(set.inner, MetricSetInner::Unbounded(_)));
826
827        // First incremental: stored as reference, emitted as absolute 1.0
828        let out = set.make_absolute(counter("hits", 1.0, MetricKind::Incremental));
829        assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 1.0 });
830
831        // Second incremental: accumulated with previous, emitted as absolute 3.0
832        let out = set.make_absolute(counter("hits", 2.0, MetricKind::Incremental));
833        assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 3.0 });
834
835        // into_metrics drains the set and returns all tracked series
836        let metrics = set.into_metrics();
837        assert_eq!(metrics.len(), 1);
838        assert_eq!(metrics[0].name(), "hits");
839    }
840
841    #[test]
842    fn unbounded_absolute_passes_through() {
843        let mut set = MetricSet::default();
844
845        let out = set.make_absolute(counter("rps", 42.0, MetricKind::Absolute));
846        assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 42.0 });
847
848        // Absolute metrics are not stored in the set
849        assert!(set.is_empty());
850    }
851
852    // Verifies that capacity policy switches to the LruCache (Bounded) path.
853    #[test]
854    fn bounded_path_selected_when_capacity_policy_set() {
855        let set = MetricSet::new(MetricSetSettings {
856            max_events: Some(10),
857            ..Default::default()
858        });
859        assert!(matches!(set.inner, MetricSetInner::Bounded(_)));
860    }
861}