1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 marker::PhantomData,
5 str::FromStr,
6 sync::{
7 LazyLock, Mutex, MutexGuard, OnceLock,
8 atomic::{AtomicBool, Ordering},
9 },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15 broadcast::{self, Receiver, Sender},
16 oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22 Layer,
23 filter::LevelFilter,
24 layer::{Context, SubscriberExt},
25 registry::LookupSpan,
26 util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::SpanField;
30use vector_lib::lookup::event_path;
31use vrl::value::Value;
32
33use crate::event::LogEvent;
34
35static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
42
43static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
46
47static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
50 Mutex::new(Some(Vec::new()));
51
52static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
55
56fn metrics_layer_enabled() -> bool {
57 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
58}
59
60pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
61 let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
62 "logging filter targets were not formatted correctly or did not specify a valid level",
63 );
64
65 let metrics_layer =
66 metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
67
68 let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
72
73 let subscriber = tracing_subscriber::registry()
74 .with(metrics_layer)
75 .with(broadcast_layer);
76
77 #[cfg(feature = "tokio-console")]
78 let subscriber = {
79 let console_layer = console_subscriber::ConsoleLayer::builder()
80 .with_default_env()
81 .spawn();
82
83 subscriber.with(console_layer)
84 };
85
86 #[cfg(feature = "allocation-tracing")]
87 let subscriber = {
88 let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
89 .with_filter(LevelFilter::ERROR);
90
91 subscriber.with(allocation_layer)
92 };
93
94 if json {
95 let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
96
97 #[cfg(test)]
98 let formatter = formatter.with_test_writer();
99
100 let rate_limited =
101 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
102 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
103
104 _ = subscriber.try_init();
105 } else {
106 let formatter = tracing_subscriber::fmt::layer()
107 .with_ansi(color)
108 .with_writer(std::io::stderr);
109
110 #[cfg(test)]
111 let formatter = formatter.with_test_writer();
112
113 let rate_limited =
114 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
115 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
116
117 _ = subscriber.try_init();
118 }
119}
120
121#[cfg(test)]
122pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
123 get_early_buffer().replace(Vec::new())
124}
125
126fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
128 BUFFER
129 .lock()
130 .expect("Couldn't acquire lock on internal logs buffer")
131}
132
133fn should_process_tracing_event() -> bool {
138 get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
139}
140
141fn try_buffer_event(log: &LogEvent) -> bool {
143 if SHOULD_BUFFER.load(Ordering::Acquire)
144 && let Some(buffer) = get_early_buffer().as_mut()
145 {
146 buffer.push(log.clone());
147 return true;
148 }
149
150 false
151}
152
153fn try_broadcast_event(log: LogEvent) {
157 if let Some(sender) = maybe_get_trace_sender() {
158 _ = sender.send(log);
159 }
160}
161
162fn consume_early_buffer() -> Vec<LogEvent> {
168 get_early_buffer()
169 .take()
170 .expect("early buffer was already consumed")
171}
172
173fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
175 SENDER.get_or_init(|| broadcast::channel(99).0)
176}
177
178fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
182 SENDER.get()
183}
184
185fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
189 get_trace_sender().subscribe()
190}
191
192fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
194 SUBSCRIBERS.lock().expect("poisoned locks are dumb")
195}
196
197fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
203 if SHOULD_BUFFER.load(Ordering::Acquire) {
204 get_trace_subscriber_list().as_mut().map(|subscribers| {
208 let (tx, rx) = oneshot::channel();
209 subscribers.push(tx);
210 rx
211 })
212 } else {
213 None
215 }
216}
217
218pub fn stop_early_buffering() {
223 if SHOULD_BUFFER
227 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
228 .is_err()
229 {
230 return;
231 }
232
233 let subscribers = get_trace_subscriber_list().take();
236 if let Some(subscribers_tx) = subscribers {
237 let buffered_events = consume_early_buffer();
239 for subscriber_tx in subscribers_tx {
240 _ = subscriber_tx.send(buffered_events.clone());
242 }
243 }
244}
245
246pub struct TraceSubscription {
252 buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
253 trace_rx: Receiver<LogEvent>,
254}
255
256impl TraceSubscription {
257 pub fn subscribe() -> TraceSubscription {
259 let buffered_events_rx = try_register_for_early_events();
260 let trace_rx = get_trace_receiver();
261
262 Self {
263 buffered_events_rx,
264 trace_rx,
265 }
266 }
267
268 pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
274 match self.buffered_events_rx.take() {
277 Some(rx) => rx.await.ok(),
278 None => None,
279 }
280 }
281
282 pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
284 BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
287 }
288}
289
290struct BroadcastLayer<S> {
291 _subscriber: PhantomData<S>,
292}
293
294impl<S> BroadcastLayer<S> {
295 const fn new() -> Self {
296 BroadcastLayer {
297 _subscriber: PhantomData,
298 }
299 }
300}
301
302impl<S> Layer<S> for BroadcastLayer<S>
303where
304 S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
305{
306 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
307 if should_process_tracing_event() {
308 let mut log = LogEvent::from(event);
309 if let Some(parent_span) = ctx.event_span(event) {
311 for span in parent_span.scope().from_root() {
312 if let Some(fields) = span.extensions().get::<SpanFields>() {
313 for (k, v) in &fields.0 {
314 log.insert(event_path!("vector", *k), v.clone());
315 }
316 }
317 }
318 }
319 if !try_buffer_event(&log) {
322 try_broadcast_event(log);
323 }
324 }
325 }
326
327 fn on_new_span(
328 &self,
329 attrs: &tracing_core::span::Attributes<'_>,
330 id: &tracing_core::span::Id,
331 ctx: Context<'_, S>,
332 ) {
333 let span = ctx.span(id).expect("span must already exist!");
334 let mut fields = SpanFields::default();
335 attrs.values().record(&mut fields);
336 span.extensions_mut().insert(fields);
337 }
338}
339
340#[derive(Default, Debug)]
341struct SpanFields(HashMap<&'static str, Value>);
342
343inventory::submit!(SpanField("component_id"));
344inventory::submit!(SpanField("component_type"));
345inventory::submit!(SpanField("component_kind"));
346
347static SPAN_FIELDS: LazyLock<HashSet<&'static str>> =
352 LazyLock::new(|| inventory::iter::<SpanField>().map(|f| f.0).collect());
353
354impl SpanFields {
355 fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
356 let name = field.name();
357 if SPAN_FIELDS.contains(name) {
366 self.0.insert(name, value.into());
367 }
368 }
369}
370
371impl tracing::field::Visit for SpanFields {
372 fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
373 self.record(field, value);
374 }
375
376 fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
377 self.record(field, value);
378 }
379
380 fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
381 self.record(field, value);
382 }
383
384 fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
385 self.record(field, value);
386 }
387
388 fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
389 self.record(field, format!("{value:?}"));
390 }
391}