vector/
trace.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    marker::PhantomData,
5    str::FromStr,
6    sync::{
7        LazyLock, Mutex, MutexGuard, OnceLock,
8        atomic::{AtomicBool, Ordering},
9    },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15    broadcast::{self, Receiver, Sender},
16    oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22    Layer,
23    filter::LevelFilter,
24    layer::{Context, SubscriberExt},
25    registry::LookupSpan,
26    util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::SpanField;
30use vector_lib::lookup::event_path;
31use vrl::value::Value;
32
33use crate::event::LogEvent;
34
35/// BUFFER contains all of the internal log events generated by Vector between the initialization of `tracing` and early
36/// buffering being stopped, which occurs once the topology reports as having successfully started.
37///
38/// This means that callers must subscribe during the configuration phase of their components, and not in the core loop
39/// of the component, as the topology can only report when a component has been spawned, but not necessarily always
40/// when it has started doing, or waiting, for input.
41static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
42
43/// SHOULD_BUFFER controls whether or not internal log events should be buffered or sent directly to the trace broadcast
44/// channel.
45static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
46
47/// SUBSCRIBERS contains a list of callers interested in internal log events who will be notified when early buffering
48/// is disabled, by receiving a copy of all buffered internal log events.
49static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
50    Mutex::new(Some(Vec::new()));
51
52/// SENDER holds the sender/receiver handle that will receive a copy of all the internal log events *after* the topology
53/// has been initialized.
54static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
55
56fn metrics_layer_enabled() -> bool {
57    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
58}
59
60pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
61    let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
62        "logging filter targets were not formatted correctly or did not specify a valid level",
63    );
64
65    let metrics_layer =
66        metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
67
68    // BroadcastLayer should NOT be rate limited because it feeds the internal_logs source,
69    // which users rely on to capture ALL internal Vector logs for debugging/monitoring.
70    // Console output (stdout/stderr) has its own separate rate limiting below.
71    let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
72
73    let subscriber = tracing_subscriber::registry()
74        .with(metrics_layer)
75        .with(broadcast_layer);
76
77    #[cfg(feature = "tokio-console")]
78    let subscriber = {
79        let console_layer = console_subscriber::ConsoleLayer::builder()
80            .with_default_env()
81            .spawn();
82
83        subscriber.with(console_layer)
84    };
85
86    #[cfg(feature = "allocation-tracing")]
87    let subscriber = {
88        let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
89            .with_filter(LevelFilter::ERROR);
90
91        subscriber.with(allocation_layer)
92    };
93
94    if json {
95        let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
96
97        #[cfg(test)]
98        let formatter = formatter.with_test_writer();
99
100        let rate_limited =
101            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
102        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
103
104        _ = subscriber.try_init();
105    } else {
106        let formatter = tracing_subscriber::fmt::layer()
107            .with_ansi(color)
108            .with_writer(std::io::stderr);
109
110        #[cfg(test)]
111        let formatter = formatter.with_test_writer();
112
113        let rate_limited =
114            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
115        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
116
117        _ = subscriber.try_init();
118    }
119}
120
121#[cfg(test)]
122pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
123    get_early_buffer().replace(Vec::new())
124}
125
126/// Gets a  mutable reference to the early buffer.
127fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
128    BUFFER
129        .lock()
130        .expect("Couldn't acquire lock on internal logs buffer")
131}
132
133/// Determines whether tracing events should be processed (e.g. converted to log
134/// events) to avoid unnecessary performance overhead.
135///
136/// Checks if [`BUFFER`] is set or if a trace sender exists
137fn should_process_tracing_event() -> bool {
138    get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
139}
140
141/// Attempts to buffer an event into the early buffer.
142fn try_buffer_event(log: &LogEvent) -> bool {
143    if SHOULD_BUFFER.load(Ordering::Acquire)
144        && let Some(buffer) = get_early_buffer().as_mut()
145    {
146        buffer.push(log.clone());
147        return true;
148    }
149
150    false
151}
152
153/// Attempts to broadcast an event to subscribers.
154///
155/// If no subscribers are connected, this does nothing.
156fn try_broadcast_event(log: LogEvent) {
157    if let Some(sender) = maybe_get_trace_sender() {
158        _ = sender.send(log);
159    }
160}
161
162/// Consumes the early buffered events.
163///
164/// # Panics
165///
166/// If the early buffered events have already been consumed, this function will panic.
167fn consume_early_buffer() -> Vec<LogEvent> {
168    get_early_buffer()
169        .take()
170        .expect("early buffer was already consumed")
171}
172
173/// Gets or creates a trace sender for sending internal log events.
174fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
175    SENDER.get_or_init(|| broadcast::channel(99).0)
176}
177
178/// Attempts to get the trace sender for sending internal log events.
179///
180/// If the trace sender has not yet been created, `None` is returned.
181fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
182    SENDER.get()
183}
184
185/// Creates a trace receiver that receives internal log events.
186///
187/// This will create a trace sender if one did not already exist.
188fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
189    get_trace_sender().subscribe()
190}
191
192/// Gets a mutable reference to the list of waiting subscribers, if it exists.
193fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
194    SUBSCRIBERS.lock().expect("poisoned locks are dumb")
195}
196
197/// Attempts to register for early buffered events.
198///
199/// If early buffering has not yet been stopped, `Some(receiver)` is returned. The given receiver will resolve to a
200/// vector of all early buffered events once early buffering has been stopped. Otherwise, if early buffering is already
201/// stopped, `None` is returned.
202fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
203    if SHOULD_BUFFER.load(Ordering::Acquire) {
204        // We're still in early buffering mode. Attempt to subscribe by adding a oneshot sender
205        // to SUBSCRIBERS. If it's already been consumed, then we've gotten beaten out by a
206        // caller that is disabling early buffering, so we just go with the flow either way.
207        get_trace_subscriber_list().as_mut().map(|subscribers| {
208            let (tx, rx) = oneshot::channel();
209            subscribers.push(tx);
210            rx
211        })
212    } else {
213        // Early buffering is being or has been disabled, so we can no longer register.
214        None
215    }
216}
217
218/// Stops early buffering.
219///
220/// This flushes any buffered log events to waiting subscribers and redirects log events from the buffer to the
221/// broadcast stream.
222pub fn stop_early_buffering() {
223    // Try and disable early buffering.
224    //
225    // If it was already disabled, or we lost the race to disable it, just return.
226    if SHOULD_BUFFER
227        .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
228        .is_err()
229    {
230        return;
231    }
232
233    // We won the right to capture all buffered events and forward them to any waiting subscribers,
234    // so let's grab the subscriber list and see if there's actually anyone waiting.
235    let subscribers = get_trace_subscriber_list().take();
236    if let Some(subscribers_tx) = subscribers {
237        // Consume the early buffer, and send a copy of it to every waiting subscriber.
238        let buffered_events = consume_early_buffer();
239        for subscriber_tx in subscribers_tx {
240            // Ignore any errors sending since the caller may have dropped or something else.
241            _ = subscriber_tx.send(buffered_events.clone());
242        }
243    }
244}
245
246/// A subscription to the log events flowing in via `tracing`, in the Vector native format.
247///
248/// Used to capture tracing events from internal log telemetry, via `tracing`, and convert them to native Vector events,
249/// specifically `LogEvent`, such that they can be shuttled around and treated as normal events.  Currently only powers
250/// the `internal_logs` source, but could be used for other purposes if need be.
251pub struct TraceSubscription {
252    buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
253    trace_rx: Receiver<LogEvent>,
254}
255
256impl TraceSubscription {
257    /// Registers a subscription to the internal log event stream.
258    pub fn subscribe() -> TraceSubscription {
259        let buffered_events_rx = try_register_for_early_events();
260        let trace_rx = get_trace_receiver();
261
262        Self {
263            buffered_events_rx,
264            trace_rx,
265        }
266    }
267
268    /// Gets any early buffered log events.
269    ///
270    /// If this subscription was registered after early buffering was turned off, `None` will be returned immediately.
271    /// Otherwise, waits for early buffering to be stopped and returns `Some(events)` where `events` contains all events
272    /// seen from the moment `tracing` was initialized to the moment early buffering was stopped.
273    pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
274        // If we have a receiver for buffered events, and it returns them successfully, then pass
275        // them back.  We don't care if the sender drops in the meantime, so just swallow that error.
276        match self.buffered_events_rx.take() {
277            Some(rx) => rx.await.ok(),
278            None => None,
279        }
280    }
281
282    /// Converts this subscription into a raw stream of log events.
283    pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
284        // We ignore errors because the only error we get is when the broadcast receiver lags, and there's nothing we
285        // can actually do about that so there's no reason to force callers to even deal with it.
286        BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
287    }
288}
289
290struct BroadcastLayer<S> {
291    _subscriber: PhantomData<S>,
292}
293
294impl<S> BroadcastLayer<S> {
295    const fn new() -> Self {
296        BroadcastLayer {
297            _subscriber: PhantomData,
298        }
299    }
300}
301
302impl<S> Layer<S> for BroadcastLayer<S>
303where
304    S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
305{
306    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
307        if should_process_tracing_event() {
308            let mut log = LogEvent::from(event);
309            // Add span fields if available
310            if let Some(parent_span) = ctx.event_span(event) {
311                for span in parent_span.scope().from_root() {
312                    if let Some(fields) = span.extensions().get::<SpanFields>() {
313                        for (k, v) in &fields.0 {
314                            log.insert(event_path!("vector", *k), v.clone());
315                        }
316                    }
317                }
318            }
319            // Try buffering the event, and if we're not buffering anymore, try to
320            // send it along via the trace sender if it's been established.
321            if !try_buffer_event(&log) {
322                try_broadcast_event(log);
323            }
324        }
325    }
326
327    fn on_new_span(
328        &self,
329        attrs: &tracing_core::span::Attributes<'_>,
330        id: &tracing_core::span::Id,
331        ctx: Context<'_, S>,
332    ) {
333        let span = ctx.span(id).expect("span must already exist!");
334        let mut fields = SpanFields::default();
335        attrs.values().record(&mut fields);
336        span.extensions_mut().insert(fields);
337    }
338}
339
340#[derive(Default, Debug)]
341struct SpanFields(HashMap<&'static str, Value>);
342
343inventory::submit!(SpanField("component_id"));
344inventory::submit!(SpanField("component_type"));
345inventory::submit!(SpanField("component_kind"));
346
347/// Snapshot of every registered [`SpanField`],
348/// materialized once on first access. `inventory` populates submissions before `main`, so the
349/// snapshot is guaranteed to capture every entry; the read path on every traced span event is
350/// then a single set lookup against this static.
351static SPAN_FIELDS: LazyLock<HashSet<&'static str>> =
352    LazyLock::new(|| inventory::iter::<SpanField>().map(|f| f.0).collect());
353
354impl SpanFields {
355    fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
356        let name = field.name();
357        // Filter for span fields such as component_id, component_type, etc.
358        //
359        // This captures all the basic component information provided in the
360        // span that each component is spawned with. We don't capture all fields
361        // to avoid adding unintentional noise and to prevent accidental
362        // security/privacy issues (e.g. leaking sensitive data). Downstream
363        // crates can extend the allowlist by name through
364        // `register_extra_span_field!` (see `vector_lib::SpanField`).
365        if SPAN_FIELDS.contains(name) {
366            self.0.insert(name, value.into());
367        }
368    }
369}
370
371impl tracing::field::Visit for SpanFields {
372    fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
373        self.record(field, value);
374    }
375
376    fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
377        self.record(field, value);
378    }
379
380    fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
381        self.record(field, value);
382    }
383
384    fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
385        self.record(field, value);
386    }
387
388    fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
389        self.record(field, format!("{value:?}"));
390    }
391}