1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18pub use self::label_filter::{LABELS, MetricLabel};
19use self::{
20 label_filter::VectorLabelFilter,
21 recorder::{Registry, VectorRecorder},
22};
23use crate::{
24 config::metrics_expiration::PerMetricSetExpiration,
25 event::{Metric, MetricValue},
26};
27
28type Result<T> = std::result::Result<T, Error>;
29
30#[derive(Clone, Debug, PartialEq, Snafu)]
31pub enum Error {
32 #[snafu(display("Recorder already initialized."))]
33 AlreadyInitialized,
34 #[snafu(display("Metrics system was not initialized."))]
35 NotInitialized,
36 #[snafu(display("Timeout value of {} must be positive.", timeout))]
37 TimeoutMustBePositive { timeout: f64 },
38 #[snafu(display("Invalid regex pattern: {}.", pattern))]
39 InvalidRegexPattern { pattern: String },
40}
41
42static CONTROLLER: OnceLock<Controller> = OnceLock::new();
43
44const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
48static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
49
50const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
52static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
53
54pub struct Controller {
56 recorder: VectorRecorder,
57}
58
59fn metrics_enabled() -> bool {
60 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
61}
62
63fn tracing_context_layer_enabled() -> bool {
64 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
65}
66
67fn init(recorder: VectorRecorder) -> Result<()> {
68 if !metrics_enabled() {
71 metrics::set_global_recorder(metrics::NoopRecorder)
72 .map_err(|_| Error::AlreadyInitialized)?;
73 info!(message = "Internal metrics core is disabled.");
74 return Ok(());
75 }
76
77 if tracing_context_layer_enabled() {
85 metrics::set_global_recorder(
87 TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
88 )
89 .map_err(|_| Error::AlreadyInitialized)?;
90 } else {
91 metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
92 }
93
94 let controller = Controller { recorder };
104 CONTROLLER
105 .set(controller)
106 .map_err(|_| Error::AlreadyInitialized)?;
107
108 Ok(())
109}
110
111pub fn init_global() -> Result<()> {
117 init(VectorRecorder::new_global())
118}
119
120pub fn init_test() {
123 if init(VectorRecorder::new_test()).is_err() {
124 while CONTROLLER.get().is_none() {}
131 }
132}
133
134impl Controller {
135 pub fn reset(&self) {
137 self.recorder.with_registry(Registry::clear);
138 }
139
140 pub fn get() -> Result<&'static Self> {
147 CONTROLLER.get().ok_or(Error::NotInitialized)
148 }
149
150 pub fn set_expiry(
157 &self,
158 global_timeout: Option<f64>,
159 expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
160 ) -> Result<()> {
161 if let Some(timeout) = global_timeout
162 && timeout <= 0.0
163 {
164 return Err(Error::TimeoutMustBePositive { timeout });
165 }
166 let per_metric_expiration = expire_metrics_per_metric_set
167 .into_iter()
168 .map(TryInto::try_into)
169 .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
170
171 self.recorder.with_registry(|registry| {
172 registry.set_expiry(
173 global_timeout.map(Duration::from_secs_f64),
174 per_metric_expiration,
175 );
176 });
177 Ok(())
178 }
179
180 pub fn capture_metrics(&self) -> Vec<Metric> {
183 let timestamp = Utc::now();
184
185 let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
186
187 #[allow(clippy::cast_precision_loss)]
188 let value = (metrics.len() + 2) as f64;
189 metrics.push(Metric::from_metric_kv(
190 &CARDINALITY_KEY,
191 MetricValue::Gauge { value },
192 timestamp,
193 ));
194 metrics.push(Metric::from_metric_kv(
195 &CARDINALITY_COUNTER_KEY,
196 MetricValue::Counter { value },
197 timestamp,
198 ));
199
200 metrics
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use strum::IntoEnumIterator;
207 use vector_common::{
208 counter, gauge,
209 internal_event::{CounterName, GaugeName},
210 };
211
212 use super::*;
213 use crate::{
214 config::metrics_expiration::{
215 MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
216 },
217 event::MetricKind,
218 };
219
220 const IDLE_TIMEOUT: f64 = 0.5;
221
222 fn init_metrics() -> &'static Controller {
223 init_test();
224 Controller::get().expect("Could not get global metrics controller")
225 }
226
227 #[test]
228 fn cardinality_matches() {
229 for cardinality in [0, 1, 10, 100, 1000, 10000] {
230 init_test();
231 let controller = Controller::get().unwrap();
232 controller.reset();
233
234 let name = CounterName::iter().next().unwrap();
235 for idx in 0..cardinality {
236 counter!(name, "idx" => idx.to_string()).increment(1);
237 }
238
239 let metrics = controller.capture_metrics();
240 assert_eq!(metrics.len(), cardinality + 2);
241
242 #[allow(clippy::cast_precision_loss)]
243 let value = metrics.len() as f64;
244 for metric in metrics {
245 match metric.name() {
246 CARDINALITY_KEY_NAME => {
247 assert_eq!(metric.value(), &MetricValue::Gauge { value });
248 assert_eq!(metric.kind(), MetricKind::Absolute);
249 }
250 CARDINALITY_COUNTER_KEY_NAME => {
251 assert_eq!(metric.value(), &MetricValue::Counter { value });
252 assert_eq!(metric.kind(), MetricKind::Absolute);
253 }
254 _ => {}
255 }
256 }
257 }
258 }
259
260 #[test]
261 fn handles_registered_metrics() {
262 let controller = init_metrics();
263
264 let counter = counter!(CounterName::iter().next().unwrap());
265 assert_eq!(controller.capture_metrics().len(), 3);
266 counter.increment(1);
267 assert_eq!(controller.capture_metrics().len(), 3);
268 let gauge = gauge!(GaugeName::iter().next().unwrap());
269 assert_eq!(controller.capture_metrics().len(), 4);
270 gauge.set(1.0);
271 assert_eq!(controller.capture_metrics().len(), 4);
272 }
273
274 #[test]
275 fn expires_metrics() {
276 let controller = init_metrics();
277 controller
278 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
279 .unwrap();
280
281 let mut names = CounterName::iter();
282 let name_a = names.next().unwrap();
283 let name_b = names.next().unwrap();
284
285 counter!(name_a).increment(1);
286 counter!(name_b).increment(2);
287 assert_eq!(controller.capture_metrics().len(), 4);
288
289 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
290 counter!(name_a).increment(3);
291 assert_eq!(controller.capture_metrics().len(), 3);
292 }
293
294 #[test]
295 fn expires_metrics_tags() {
296 let controller = init_metrics();
297 controller
298 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
299 .unwrap();
300
301 let name = CounterName::iter().next().unwrap();
302 counter!(name, "tag" => "value1").increment(1);
303 counter!(name, "tag" => "value2").increment(2);
304 assert_eq!(controller.capture_metrics().len(), 4);
305
306 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
307 counter!(name, "tag" => "value1").increment(3);
308 assert_eq!(controller.capture_metrics().len(), 3);
309 }
310
311 #[test]
312 fn skips_expiring_registered() {
313 let controller = init_metrics();
314 controller
315 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
316 .unwrap();
317
318 let mut names = CounterName::iter();
319 let name_a = names.next().unwrap();
320 let name_b = names.next().unwrap();
321
322 let a = counter!(name_a);
323 counter!(name_b).increment(5);
324 assert_eq!(controller.capture_metrics().len(), 4);
325 a.increment(1);
326 assert_eq!(controller.capture_metrics().len(), 4);
327
328 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
329 assert_eq!(controller.capture_metrics().len(), 3);
330
331 a.increment(1);
332 let metrics = controller.capture_metrics();
333 assert_eq!(metrics.len(), 3);
334 let metric = metrics
335 .into_iter()
336 .find(|metric| metric.name() == name_a.as_str())
337 .expect("Test metric is not present");
338 match metric.value() {
339 MetricValue::Counter { value } => assert_eq!(*value, 2.0),
340 value => panic!("Invalid metric value {value:?}"),
341 }
342 }
343
344 #[test]
345 fn expires_metrics_per_set() {
346 let controller = init_metrics();
347
348 let mut names = CounterName::iter();
349 let name_a = names.next().unwrap();
350 let name_b = names.next().unwrap();
351
352 controller
353 .set_expiry(
354 None,
355 vec![PerMetricSetExpiration {
356 name: Some(MetricNameMatcherConfig::Exact {
357 value: name_b.as_str().to_string(),
358 }),
359 labels: None,
360 expire_secs: IDLE_TIMEOUT,
361 }],
362 )
363 .unwrap();
364
365 counter!(name_a).increment(1);
366 counter!(name_b).increment(2);
367 assert_eq!(controller.capture_metrics().len(), 4);
368
369 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
370 counter!(name_a).increment(3);
371 assert_eq!(controller.capture_metrics().len(), 3);
372 }
373
374 #[test]
375 fn expires_metrics_multiple_different_sets() {
376 let controller = init_metrics();
377
378 let mut names = CounterName::iter();
379 let name_a = names.next().unwrap();
380 let name_b = names.next().unwrap();
381 let name_c = names.next().unwrap();
382 let name_d = names.next().unwrap();
383
384 controller
385 .set_expiry(
386 Some(IDLE_TIMEOUT * 3.0),
387 vec![
388 PerMetricSetExpiration {
389 name: Some(MetricNameMatcherConfig::Exact {
390 value: name_c.as_str().to_string(),
391 }),
392 labels: None,
393 expire_secs: IDLE_TIMEOUT,
394 },
395 PerMetricSetExpiration {
396 name: None,
397 labels: Some(MetricLabelMatcherConfig::All {
398 matchers: vec![MetricLabelMatcher::Exact {
399 key: "tag".to_string(),
400 value: "value1".to_string(),
401 }],
402 }),
403 expire_secs: IDLE_TIMEOUT * 2.0,
404 },
405 ],
406 )
407 .unwrap();
408
409 counter!(name_a).increment(1);
410 counter!(name_b).increment(1);
411 counter!(name_c).increment(2);
412 counter!(name_d, "tag" => "value1").increment(3);
413 assert_eq!(controller.capture_metrics().len(), 6);
414
415 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
416 counter!(name_b).increment(3);
417 assert_eq!(controller.capture_metrics().len(), 5);
418
419 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
420 counter!(name_b).increment(3);
421 assert_eq!(controller.capture_metrics().len(), 4);
422
423 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
424 counter!(name_b).increment(3);
425 assert_eq!(controller.capture_metrics().len(), 3);
426 }
427}