vector_core/metrics/
mod.rs

1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18pub use self::label_filter::{LABELS, MetricLabel};
19use self::{
20    label_filter::VectorLabelFilter,
21    recorder::{Registry, VectorRecorder},
22};
23use crate::{
24    config::metrics_expiration::PerMetricSetExpiration,
25    event::{Metric, MetricValue},
26};
27
28type Result<T> = std::result::Result<T, Error>;
29
30#[derive(Clone, Debug, PartialEq, Snafu)]
31pub enum Error {
32    #[snafu(display("Recorder already initialized."))]
33    AlreadyInitialized,
34    #[snafu(display("Metrics system was not initialized."))]
35    NotInitialized,
36    #[snafu(display("Timeout value of {} must be positive.", timeout))]
37    TimeoutMustBePositive { timeout: f64 },
38    #[snafu(display("Invalid regex pattern: {}.", pattern))]
39    InvalidRegexPattern { pattern: String },
40}
41
42static CONTROLLER: OnceLock<Controller> = OnceLock::new();
43
44// Cardinality counter parameters, expose the internal metrics registry
45// cardinality. Useful for the end users to help understand the characteristics
46// of their environment and how vectors acts in it.
47const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
48static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
49
50// Older deprecated counter key name
51const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
52static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
53
54/// Controller allows capturing metric snapshots.
55pub struct Controller {
56    recorder: VectorRecorder,
57}
58
59fn metrics_enabled() -> bool {
60    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
61}
62
63fn tracing_context_layer_enabled() -> bool {
64    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
65}
66
67fn init(recorder: VectorRecorder) -> Result<()> {
68    // An escape hatch to allow disabling internal metrics core. May be used for
69    // performance reasons. This is a hidden and undocumented functionality.
70    if !metrics_enabled() {
71        metrics::set_global_recorder(metrics::NoopRecorder)
72            .map_err(|_| Error::AlreadyInitialized)?;
73        info!(message = "Internal metrics core is disabled.");
74        return Ok(());
75    }
76
77    ////
78    //// Initialize the recorder.
79    ////
80
81    // The recorder is the interface between metrics-rs and our registry. In our
82    // case it doesn't _do_ much other than shepherd into the registry and
83    // update the cardinality counter, see above, as needed.
84    if tracing_context_layer_enabled() {
85        // Apply a layer to capture tracing span fields as labels.
86        metrics::set_global_recorder(
87            TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
88        )
89        .map_err(|_| Error::AlreadyInitialized)?;
90    } else {
91        metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
92    }
93
94    ////
95    //// Prepare the controller
96    ////
97
98    // The `Controller` is a safe spot in memory for us to stash a clone of the registry -- where
99    // metrics are actually kept -- so that our sub-systems interested in these metrics can grab
100    // copies. See `capture_metrics` and its callers for an example. Note that this is done last to
101    // allow `init_test` below to use the initialization state of `CONTROLLER` to wait for the above
102    // steps to complete in another thread.
103    let controller = Controller { recorder };
104    CONTROLLER
105        .set(controller)
106        .map_err(|_| Error::AlreadyInitialized)?;
107
108    Ok(())
109}
110
111/// Initialize the default metrics sub-system
112///
113/// # Errors
114///
115/// This function will error if it is called multiple times.
116pub fn init_global() -> Result<()> {
117    init(VectorRecorder::new_global())
118}
119
120/// Initialize the thread-local metrics sub-system. This function will loop until a recorder is
121/// actually set.
122pub fn init_test() {
123    if init(VectorRecorder::new_test()).is_err() {
124        // The only error case returned by `init` is `AlreadyInitialized`. A race condition is
125        // possible here: if metrics are being initialized by two (or more) test threads
126        // simultaneously, the ones that fail to set return immediately, possibly allowing
127        // subsequent code to execute before the static recorder value is actually set within the
128        // `metrics` crate. To prevent subsequent code from running with an unset recorder, loop
129        // here until a recorder is available.
130        while CONTROLLER.get().is_none() {}
131    }
132}
133
134impl Controller {
135    /// Clear all metrics from the registry.
136    pub fn reset(&self) {
137        self.recorder.with_registry(Registry::clear);
138    }
139
140    /// Get a handle to the globally registered controller, if it's initialized.
141    ///
142    /// # Errors
143    ///
144    /// This function will fail if the metrics subsystem has not been correctly
145    /// initialized.
146    pub fn get() -> Result<&'static Self> {
147        CONTROLLER.get().ok_or(Error::NotInitialized)
148    }
149
150    /// Set or clear the expiry time after which idle metrics are dropped from the set of captured
151    /// metrics. Invalid timeouts (zero or negative values) are silently remapped to no expiry.
152    ///
153    /// # Errors
154    ///
155    /// The contained timeout value must be positive.
156    pub fn set_expiry(
157        &self,
158        global_timeout: Option<f64>,
159        expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
160    ) -> Result<()> {
161        if let Some(timeout) = global_timeout
162            && timeout <= 0.0
163        {
164            return Err(Error::TimeoutMustBePositive { timeout });
165        }
166        let per_metric_expiration = expire_metrics_per_metric_set
167            .into_iter()
168            .map(TryInto::try_into)
169            .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
170
171        self.recorder.with_registry(|registry| {
172            registry.set_expiry(
173                global_timeout.map(Duration::from_secs_f64),
174                per_metric_expiration,
175            );
176        });
177        Ok(())
178    }
179
180    /// Take a snapshot of all gathered metrics and expose them as metric
181    /// [`Event`](crate::event::Event)s.
182    pub fn capture_metrics(&self) -> Vec<Metric> {
183        let timestamp = Utc::now();
184
185        let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
186
187        #[allow(clippy::cast_precision_loss)]
188        let value = (metrics.len() + 2) as f64;
189        metrics.push(Metric::from_metric_kv(
190            &CARDINALITY_KEY,
191            MetricValue::Gauge { value },
192            timestamp,
193        ));
194        metrics.push(Metric::from_metric_kv(
195            &CARDINALITY_COUNTER_KEY,
196            MetricValue::Counter { value },
197            timestamp,
198        ));
199
200        metrics
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use strum::IntoEnumIterator;
207    use vector_common::{
208        counter, gauge,
209        internal_event::{CounterName, GaugeName},
210    };
211
212    use super::*;
213    use crate::{
214        config::metrics_expiration::{
215            MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
216        },
217        event::MetricKind,
218    };
219
220    const IDLE_TIMEOUT: f64 = 0.5;
221
222    fn init_metrics() -> &'static Controller {
223        init_test();
224        Controller::get().expect("Could not get global metrics controller")
225    }
226
227    #[test]
228    fn cardinality_matches() {
229        for cardinality in [0, 1, 10, 100, 1000, 10000] {
230            init_test();
231            let controller = Controller::get().unwrap();
232            controller.reset();
233
234            let name = CounterName::iter().next().unwrap();
235            for idx in 0..cardinality {
236                counter!(name, "idx" => idx.to_string()).increment(1);
237            }
238
239            let metrics = controller.capture_metrics();
240            assert_eq!(metrics.len(), cardinality + 2);
241
242            #[allow(clippy::cast_precision_loss)]
243            let value = metrics.len() as f64;
244            for metric in metrics {
245                match metric.name() {
246                    CARDINALITY_KEY_NAME => {
247                        assert_eq!(metric.value(), &MetricValue::Gauge { value });
248                        assert_eq!(metric.kind(), MetricKind::Absolute);
249                    }
250                    CARDINALITY_COUNTER_KEY_NAME => {
251                        assert_eq!(metric.value(), &MetricValue::Counter { value });
252                        assert_eq!(metric.kind(), MetricKind::Absolute);
253                    }
254                    _ => {}
255                }
256            }
257        }
258    }
259
260    #[test]
261    fn handles_registered_metrics() {
262        let controller = init_metrics();
263
264        let counter = counter!(CounterName::iter().next().unwrap());
265        assert_eq!(controller.capture_metrics().len(), 3);
266        counter.increment(1);
267        assert_eq!(controller.capture_metrics().len(), 3);
268        let gauge = gauge!(GaugeName::iter().next().unwrap());
269        assert_eq!(controller.capture_metrics().len(), 4);
270        gauge.set(1.0);
271        assert_eq!(controller.capture_metrics().len(), 4);
272    }
273
274    #[test]
275    fn expires_metrics() {
276        let controller = init_metrics();
277        controller
278            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
279            .unwrap();
280
281        let mut names = CounterName::iter();
282        let name_a = names.next().unwrap();
283        let name_b = names.next().unwrap();
284
285        counter!(name_a).increment(1);
286        counter!(name_b).increment(2);
287        assert_eq!(controller.capture_metrics().len(), 4);
288
289        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
290        counter!(name_a).increment(3);
291        assert_eq!(controller.capture_metrics().len(), 3);
292    }
293
294    #[test]
295    fn expires_metrics_tags() {
296        let controller = init_metrics();
297        controller
298            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
299            .unwrap();
300
301        let name = CounterName::iter().next().unwrap();
302        counter!(name, "tag" => "value1").increment(1);
303        counter!(name, "tag" => "value2").increment(2);
304        assert_eq!(controller.capture_metrics().len(), 4);
305
306        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
307        counter!(name, "tag" => "value1").increment(3);
308        assert_eq!(controller.capture_metrics().len(), 3);
309    }
310
311    #[test]
312    fn skips_expiring_registered() {
313        let controller = init_metrics();
314        controller
315            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
316            .unwrap();
317
318        let mut names = CounterName::iter();
319        let name_a = names.next().unwrap();
320        let name_b = names.next().unwrap();
321
322        let a = counter!(name_a);
323        counter!(name_b).increment(5);
324        assert_eq!(controller.capture_metrics().len(), 4);
325        a.increment(1);
326        assert_eq!(controller.capture_metrics().len(), 4);
327
328        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
329        assert_eq!(controller.capture_metrics().len(), 3);
330
331        a.increment(1);
332        let metrics = controller.capture_metrics();
333        assert_eq!(metrics.len(), 3);
334        let metric = metrics
335            .into_iter()
336            .find(|metric| metric.name() == name_a.as_str())
337            .expect("Test metric is not present");
338        match metric.value() {
339            MetricValue::Counter { value } => assert_eq!(*value, 2.0),
340            value => panic!("Invalid metric value {value:?}"),
341        }
342    }
343
344    #[test]
345    fn expires_metrics_per_set() {
346        let controller = init_metrics();
347
348        let mut names = CounterName::iter();
349        let name_a = names.next().unwrap();
350        let name_b = names.next().unwrap();
351
352        controller
353            .set_expiry(
354                None,
355                vec![PerMetricSetExpiration {
356                    name: Some(MetricNameMatcherConfig::Exact {
357                        value: name_b.as_str().to_string(),
358                    }),
359                    labels: None,
360                    expire_secs: IDLE_TIMEOUT,
361                }],
362            )
363            .unwrap();
364
365        counter!(name_a).increment(1);
366        counter!(name_b).increment(2);
367        assert_eq!(controller.capture_metrics().len(), 4);
368
369        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
370        counter!(name_a).increment(3);
371        assert_eq!(controller.capture_metrics().len(), 3);
372    }
373
374    #[test]
375    fn expires_metrics_multiple_different_sets() {
376        let controller = init_metrics();
377
378        let mut names = CounterName::iter();
379        let name_a = names.next().unwrap();
380        let name_b = names.next().unwrap();
381        let name_c = names.next().unwrap();
382        let name_d = names.next().unwrap();
383
384        controller
385            .set_expiry(
386                Some(IDLE_TIMEOUT * 3.0),
387                vec![
388                    PerMetricSetExpiration {
389                        name: Some(MetricNameMatcherConfig::Exact {
390                            value: name_c.as_str().to_string(),
391                        }),
392                        labels: None,
393                        expire_secs: IDLE_TIMEOUT,
394                    },
395                    PerMetricSetExpiration {
396                        name: None,
397                        labels: Some(MetricLabelMatcherConfig::All {
398                            matchers: vec![MetricLabelMatcher::Exact {
399                                key: "tag".to_string(),
400                                value: "value1".to_string(),
401                            }],
402                        }),
403                        expire_secs: IDLE_TIMEOUT * 2.0,
404                    },
405                ],
406            )
407            .unwrap();
408
409        counter!(name_a).increment(1);
410        counter!(name_b).increment(1);
411        counter!(name_c).increment(2);
412        counter!(name_d, "tag" => "value1").increment(3);
413        assert_eq!(controller.capture_metrics().len(), 6);
414
415        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
416        counter!(name_b).increment(3);
417        assert_eq!(controller.capture_metrics().len(), 5);
418
419        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
420        counter!(name_b).increment(3);
421        assert_eq!(controller.capture_metrics().len(), 4);
422
423        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
424        counter!(name_b).increment(3);
425        assert_eq!(controller.capture_metrics().len(), 3);
426    }
427}