1use chrono::Utc;
2use futures::{StreamExt, stream};
3use vector_lib::{
4 codecs::BytesDeserializerConfig,
5 config::{LegacyKey, LogNamespace, log_schema},
6 configurable::configurable_component,
7 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
8 schema::Definition,
9};
10use vrl::value::Kind;
11
12use crate::{
13 SourceSender,
14 config::{DataType, SourceConfig, SourceContext, SourceOutput},
15 event::{EstimatedJsonEncodedSizeOf, Event},
16 internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
17 shutdown::ShutdownSignal,
18 trace::TraceSubscription,
19};
20
21#[configurable_component(source(
23 "internal_logs",
24 "Expose internal log messages emitted by the running Vector instance."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct InternalLogsConfig {
29 host_key: Option<OptionalValuePath>,
37
38 #[serde(default = "default_pid_key")]
44 pid_key: OptionalValuePath,
45
46 #[configurable(metadata(docs::hidden))]
48 #[serde(default)]
49 log_namespace: Option<bool>,
50}
51
52fn default_pid_key() -> OptionalValuePath {
53 OptionalValuePath::from(owned_value_path!("pid"))
54}
55
56impl_generate_config_from_default!(InternalLogsConfig);
57
58impl Default for InternalLogsConfig {
59 fn default() -> InternalLogsConfig {
60 InternalLogsConfig {
61 host_key: None,
62 pid_key: default_pid_key(),
63 log_namespace: None,
64 }
65 }
66}
67
68impl InternalLogsConfig {
69 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
71 let host_key = self
72 .host_key
73 .clone()
74 .unwrap_or(log_schema().host_key().cloned().into())
75 .path
76 .map(LegacyKey::Overwrite);
77 let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
78
79 BytesDeserializerConfig
82 .schema_definition(log_namespace)
83 .with_standard_vector_source_metadata()
84 .with_source_metadata(
85 InternalLogsConfig::NAME,
86 host_key,
87 &owned_value_path!("host"),
88 Kind::bytes().or_undefined(),
89 Some("host"),
90 )
91 .with_source_metadata(
92 InternalLogsConfig::NAME,
93 pid_key,
94 &owned_value_path!("pid"),
95 Kind::integer(),
96 None,
97 )
98 }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "internal_logs")]
103impl SourceConfig for InternalLogsConfig {
104 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
105 let host_key = self
106 .host_key
107 .clone()
108 .unwrap_or(log_schema().host_key().cloned().into())
109 .path;
110 let pid_key = self.pid_key.clone().path;
111
112 let subscription = TraceSubscription::subscribe();
113
114 let log_namespace = cx.log_namespace(self.log_namespace);
115
116 Ok(Box::pin(run(
117 host_key,
118 pid_key,
119 subscription,
120 cx.out,
121 cx.shutdown,
122 log_namespace,
123 )))
124 }
125
126 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
127 let schema_definition =
128 self.schema_definition(global_log_namespace.merge(self.log_namespace));
129
130 vec![SourceOutput::new_maybe_logs(
131 DataType::Log,
132 schema_definition,
133 )]
134 }
135
136 fn can_acknowledge(&self) -> bool {
137 false
138 }
139}
140
141async fn run(
142 host_key: Option<OwnedValuePath>,
143 pid_key: Option<OwnedValuePath>,
144 mut subscription: TraceSubscription,
145 mut out: SourceSender,
146 shutdown: ShutdownSignal,
147 log_namespace: LogNamespace,
148) -> Result<(), ()> {
149 let hostname = crate::get_hostname();
150 let pid = std::process::id();
151
152 let buffered_events = subscription.buffered_events().await;
155 let mut rx = stream::iter(buffered_events.into_iter().flatten())
156 .chain(subscription.into_stream())
157 .take_until(shutdown);
158
159 while let Some(mut log) = rx.next().await {
163 let byte_size = log.estimated_json_encoded_size_of().get();
165 let json_byte_size = log.estimated_json_encoded_size_of();
166 emit!(InternalLogsBytesReceived { byte_size });
168 emit!(InternalLogsEventsReceived {
169 count: 1,
170 byte_size: json_byte_size,
171 });
172
173 if let Ok(hostname) = &hostname {
174 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
175 log_namespace.insert_source_metadata(
176 InternalLogsConfig::NAME,
177 &mut log,
178 legacy_host_key,
179 path!("host"),
180 hostname.to_owned(),
181 );
182 }
183
184 let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
185 log_namespace.insert_source_metadata(
186 InternalLogsConfig::NAME,
187 &mut log,
188 legacy_pid_key,
189 path!("pid"),
190 pid,
191 );
192
193 log_namespace.insert_standard_vector_source_metadata(
194 &mut log,
195 InternalLogsConfig::NAME,
196 Utc::now(),
197 );
198
199 if (out.send_event(Event::from(log)).await).is_err() {
200 emit!(StreamClosedError { count: 1 });
202 return Err(());
203 }
204 }
205
206 Ok(())
207}
208
209#[cfg(test)]
210mod tests {
211 use futures::Stream;
212 use tokio::time::{Duration, sleep};
213 use vector_lib::{SpanField, event::Value, lookup::OwnedTargetPath};
214 use vrl::value::kind::Collection;
215
216 use serial_test::serial;
217
218 use super::*;
219 use crate::{
220 event::Event,
221 test_util::{
222 collect_ready,
223 components::{SOURCE_TAGS, assert_source_compliance},
224 },
225 trace,
226 };
227
228 #[test]
229 fn generates_config() {
230 crate::test_util::test_generate_config::<InternalLogsConfig>();
231 }
232
233 #[tokio::test]
239 #[serial]
240 async fn receives_logs() {
241 trace::init(false, false, "debug", 10);
242 trace::reset_early_buffer();
243
244 assert_source_compliance(&SOURCE_TAGS, run_test()).await;
245 }
246
247 inventory::submit!(SpanField("component_new_field"));
249 inventory::submit!(SpanField("component_numerical_field"));
250
251 async fn run_test() {
252 let test_id: u8 = rand::random();
253 let start = chrono::Utc::now();
254
255 error!(message = "Before source started without span.", %test_id);
256
257 let span = error_span!(
258 "source",
259 component_kind = "source",
260 component_id = "foo",
261 component_type = "internal_logs",
262 );
263 let enter = span.enter();
264
265 error!(message = "Before source started.", %test_id);
266
267 drop(enter); let rx = start_source().await;
270
271 let enter = span.enter();
272
273 error!(message = "After source started.", %test_id);
274
275 {
276 let nested_span = error_span!(
277 "nested span",
278 component_kind = "bar",
279 component_new_field = "baz",
280 component_numerical_field = 1,
281 ignored_field = "foobarbaz",
282 );
283 let _enter = nested_span.enter();
284 error!(message = "In a nested span.", %test_id);
285 }
286
287 drop(enter);
288
289 sleep(Duration::from_millis(1)).await;
290 let mut events = collect_ready(rx).await;
291 let test_id = Value::from(test_id.to_string());
292 events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
293
294 let end = chrono::Utc::now();
295
296 assert_eq!(events.len(), 4);
297
298 assert_eq!(
299 events[0].as_log()["message"],
300 "Before source started without span.".into()
301 );
302 assert_eq!(
303 events[1].as_log()["message"],
304 "Before source started.".into()
305 );
306 assert_eq!(
307 events[2].as_log()["message"],
308 "After source started.".into()
309 );
310 assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
311
312 for (i, event) in events.iter().enumerate() {
313 let log = event.as_log();
314 let timestamp = *log["timestamp"]
315 .as_timestamp()
316 .expect("timestamp isn't a timestamp");
317 assert!(timestamp >= start);
318 assert!(timestamp <= end);
319 assert_eq!(log["metadata.kind"], "event".into());
320 assert_eq!(log["metadata.level"], "ERROR".into());
321 if i == 0 {
323 assert!(log.get("vector.component_id").is_none());
324 assert!(log.get("vector.component_kind").is_none());
325 assert!(log.get("vector.component_type").is_none());
326 } else if i < 3 {
327 assert_eq!(log["vector.component_id"], "foo".into());
328 assert_eq!(log["vector.component_kind"], "source".into());
329 assert_eq!(log["vector.component_type"], "internal_logs".into());
330 } else {
331 assert_eq!(log["vector.component_id"], "foo".into());
335 assert_eq!(log["vector.component_kind"], "bar".into());
336 assert_eq!(log["vector.component_type"], "internal_logs".into());
337 assert_eq!(log["vector.component_new_field"], "baz".into());
338 assert_eq!(log["vector.component_numerical_field"], 1.into());
339 assert!(log.get("vector.ignored_field").is_none());
340 }
341 }
342 }
343
344 async fn start_source() -> impl Stream<Item = Event> + Unpin {
345 let (tx, rx) = SourceSender::new_test();
346
347 let source = InternalLogsConfig::default()
348 .build(SourceContext::new_test(tx, None))
349 .await
350 .unwrap();
351 tokio::spawn(source);
352 sleep(Duration::from_millis(1)).await;
353 trace::stop_early_buffering();
354 rx
355 }
356
357 vector_lib::register_extra_span_field!("internal_logs_test_extra_field");
362
363 #[tokio::test]
364 #[serial]
365 async fn registered_extra_span_field_is_captured() {
366 trace::init(false, false, "info", 10);
367 trace::reset_early_buffer();
368
369 let test_id: u8 = rand::random();
370 let rx = start_source().await;
371
372 {
373 let span = error_span!(
374 "extras",
375 component_id = "foo",
376 internal_logs_test_extra_field = "captured",
377 some_other_field = "dropped",
378 );
379 let _enter = span.enter();
380 error!(message = "With extra field.", %test_id);
381 }
382
383 sleep(Duration::from_millis(1)).await;
384 let mut events = collect_ready(rx).await;
385 let test_id_value = Value::from(test_id.to_string());
386 events.retain(|event| event.as_log().get("test_id") == Some(&test_id_value));
387
388 assert_eq!(events.len(), 1);
389 let log = events[0].as_log();
390 assert_eq!(
391 log["vector.internal_logs_test_extra_field"],
392 "captured".into()
393 );
394 assert!(log.get("vector.some_other_field").is_none());
396 }
397
398 #[tokio::test]
401 #[serial]
402 async fn repeated_logs_are_not_rate_limited() {
403 trace::init(false, false, "info", 10);
404 trace::reset_early_buffer();
405
406 let rx = start_source().await;
407
408 for _ in 0..20 {
410 info!(component_id = "test", "Repeated test message.");
411 }
412
413 sleep(Duration::from_millis(50)).await;
414 let events = collect_ready(rx).await;
415
416 let test_events: Vec<_> = events
418 .iter()
419 .filter(|e| {
420 e.as_log()
421 .get("message")
422 .map(|m| m.to_string_lossy() == "Repeated test message.")
423 .unwrap_or(false)
424 })
425 .collect();
426
427 assert_eq!(
429 test_events.len(),
430 20,
431 "internal_logs source should capture all repeated messages without rate limiting"
432 );
433 }
434
435 #[test]
436 fn output_schema_definition_vector_namespace() {
437 let config = InternalLogsConfig::default();
438
439 let definitions = config
440 .outputs(LogNamespace::Vector)
441 .remove(0)
442 .schema_definition(true);
443
444 let expected_definition =
445 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
446 .with_meaning(OwnedTargetPath::event_root(), "message")
447 .with_metadata_field(
448 &owned_value_path!("vector", "source_type"),
449 Kind::bytes(),
450 None,
451 )
452 .with_metadata_field(
453 &owned_value_path!(InternalLogsConfig::NAME, "pid"),
454 Kind::integer(),
455 None,
456 )
457 .with_metadata_field(
458 &owned_value_path!("vector", "ingest_timestamp"),
459 Kind::timestamp(),
460 None,
461 )
462 .with_metadata_field(
463 &owned_value_path!(InternalLogsConfig::NAME, "host"),
464 Kind::bytes().or_undefined(),
465 Some("host"),
466 );
467
468 assert_eq!(definitions, Some(expected_definition))
469 }
470
471 #[test]
472 fn output_schema_definition_legacy_namespace() {
473 let mut config = InternalLogsConfig::default();
474
475 let pid_key = "pid_a_pid_a_pid_pid_pid";
476
477 config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
478
479 let definitions = config
480 .outputs(LogNamespace::Legacy)
481 .remove(0)
482 .schema_definition(true);
483
484 let expected_definition = Definition::new_with_default_metadata(
485 Kind::object(Collection::empty()),
486 [LogNamespace::Legacy],
487 )
488 .with_event_field(
489 &owned_value_path!("message"),
490 Kind::bytes(),
491 Some("message"),
492 )
493 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
494 .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
495 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
496 .with_event_field(
497 &owned_value_path!("host"),
498 Kind::bytes().or_undefined(),
499 Some("host"),
500 );
501
502 assert_eq!(definitions, Some(expected_definition))
503 }
504}