1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{BytesDeserializer, BytesDeserializerConfig},
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 file_source::{
17 file_server::{FileServer, Line, calculate_ignore_before},
18 paths_provider::{Glob, MatchOptions},
19 },
20 file_source_common::{
21 Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22 },
23 finalizer::OrderedFinalizer,
24 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30 SourceSender,
31 config::{
32 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33 log_schema,
34 },
35 encoding_transcode::{Decoder, Encoder},
36 event::{BatchNotifier, BatchStatus, LogEvent},
37 internal_events::{
38 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39 FileSourceInternalEventsEmitter, StreamClosedError,
40 },
41 line_agg::{self, LineAgg},
42 serde::bool_or_struct,
43 shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48 #[snafu(display(
49 "message_start_indicator {:?} is not a valid regex: {}",
50 indicator,
51 source
52 ))]
53 InvalidMessageStartIndicator {
54 indicator: String,
55 source: regex::Error,
56 },
57}
58
59#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67 pub include: Vec<PathBuf>,
68
69 #[serde(default)]
75 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76 pub exclude: Vec<PathBuf>,
77
78 #[serde(default = "default_file_key")]
84 #[configurable(metadata(docs::examples = "path"))]
85 pub file_key: OptionalValuePath,
86
87 #[configurable(
89 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90 )]
91 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 pub start_at_beginning: Option<bool>,
94
95 #[serde(default)]
99 pub ignore_checkpoints: Option<bool>,
100
101 #[serde(default = "default_read_from")]
102 #[configurable(derived)]
103 pub read_from: ReadFromConfig,
104
105 #[serde(alias = "ignore_older", default)]
107 #[configurable(metadata(docs::type_unit = "seconds"))]
108 #[configurable(metadata(docs::examples = 600))]
109 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110 pub ignore_older_secs: Option<u64>,
111
112 #[serde(default = "default_max_line_bytes")]
116 #[configurable(metadata(docs::type_unit = "bytes"))]
117 pub max_line_bytes: usize,
118
119 #[configurable(metadata(docs::examples = "hostname"))]
127 pub host_key: Option<OptionalValuePath>,
128
129 #[serde(default)]
138 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139 #[configurable(metadata(docs::human_name = "Data Directory"))]
140 pub data_dir: Option<PathBuf>,
141
142 #[serde(default)]
148 #[configurable(metadata(docs::examples = "offset"))]
149 pub offset_key: Option<OptionalValuePath>,
150
151 #[serde(
157 alias = "glob_minimum_cooldown",
158 default = "default_glob_minimum_cooldown_ms"
159 )]
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::type_unit = "milliseconds"))]
162 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163 pub glob_minimum_cooldown_ms: Duration,
164
165 #[configurable(derived)]
166 #[serde(alias = "fingerprinting", default)]
167 fingerprint: FingerprintConfig,
168
169 #[serde(default)]
173 pub ignore_not_found: bool,
174
175 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177 #[configurable(metadata(docs::hidden))]
178 #[serde(default)]
179 pub message_start_indicator: Option<String>,
180
181 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183 #[configurable(metadata(docs::hidden))]
184 #[serde(default = "default_multi_line_timeout")]
185 pub multi_line_timeout: u64,
186
187 #[configurable(derived)]
191 #[serde(default)]
192 pub multiline: Option<MultilineConfig>,
193
194 #[serde(default = "default_max_read_bytes")]
200 #[configurable(metadata(docs::type_unit = "bytes"))]
201 pub max_read_bytes: usize,
202
203 #[serde(default)]
205 pub oldest_first: bool,
206
207 #[serde(alias = "remove_after", default)]
211 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 0))]
213 #[configurable(metadata(docs::examples = 5))]
214 #[configurable(metadata(docs::examples = 60))]
215 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216 pub remove_after_secs: Option<u64>,
217
218 #[serde(default = "default_line_delimiter")]
220 #[configurable(metadata(docs::examples = "\r\n"))]
221 pub line_delimiter: String,
222
223 #[configurable(derived)]
224 #[serde(default)]
225 pub encoding: Option<EncodingConfig>,
226
227 #[configurable(derived)]
228 #[serde(default, deserialize_with = "bool_or_struct")]
229 acknowledgements: SourceAcknowledgementsConfig,
230
231 #[configurable(metadata(docs::hidden))]
233 #[serde(default)]
234 log_namespace: Option<bool>,
235
236 #[configurable(derived)]
237 #[serde(default)]
238 internal_metrics: FileInternalMetricsConfig,
239
240 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243 #[configurable(metadata(docs::type_unit = "seconds"))]
244 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245 pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249 bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253 OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257 ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261 Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265 1000
266} const fn default_max_read_bytes() -> usize {
269 2048
270}
271
272fn default_line_delimiter() -> String {
273 "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277 Duration::from_secs(u64::MAX / 2)
278}
279
280#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290 Checksum {
292 #[serde(default = "default_ignored_header_bytes")]
298 #[configurable(metadata(docs::type_unit = "bytes"))]
299 ignored_header_bytes: usize,
300
301 #[serde(default = "default_lines")]
308 #[configurable(metadata(docs::type_unit = "lines"))]
309 lines: usize,
310 },
311
312 #[serde(rename = "device_and_inode")]
316 DevInode,
317}
318
319impl Default for FingerprintConfig {
320 fn default() -> Self {
321 Self::Checksum {
322 ignored_header_bytes: 0,
323 lines: default_lines(),
324 }
325 }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329 0
330}
331
332const fn default_lines() -> usize {
333 1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337 fn from(config: FingerprintConfig) -> FingerprintStrategy {
338 match config {
339 FingerprintConfig::Checksum {
340 ignored_header_bytes,
341 lines,
342 } => FingerprintStrategy::FirstLinesChecksum {
343 ignored_header_bytes,
344 lines,
345 },
346 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347 }
348 }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353 pub(crate) file_id: FileFingerprint,
354 pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358 fn default() -> Self {
359 Self {
360 include: vec![PathBuf::from("/var/log/**/*.log")],
361 exclude: vec![],
362 file_key: default_file_key(),
363 start_at_beginning: None,
364 ignore_checkpoints: None,
365 read_from: default_read_from(),
366 ignore_older_secs: None,
367 max_line_bytes: default_max_line_bytes(),
368 fingerprint: FingerprintConfig::default(),
369 ignore_not_found: false,
370 host_key: None,
371 offset_key: None,
372 data_dir: None,
373 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374 message_start_indicator: None,
375 multi_line_timeout: default_multi_line_timeout(), multiline: None,
377 max_read_bytes: default_max_read_bytes(),
378 oldest_first: false,
379 remove_after_secs: None,
380 line_delimiter: default_line_delimiter(),
381 encoding: None,
382 acknowledgements: Default::default(),
383 log_namespace: None,
384 internal_metrics: Default::default(),
385 rotate_wait: default_rotate_wait(),
386 }
387 }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396 let data_dir = cx
401 .globals
402 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405 #[allow(clippy::suspicious_else_formatting)]
407 {
408 if let Some(ref config) = self.multiline {
409 let _: line_agg::Config = config.try_into()?;
410 }
411
412 if let Some(ref indicator) = self.message_start_indicator {
413 Regex::new(indicator)
414 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415 }
416 }
417
418 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420 let log_namespace = cx.log_namespace(self.log_namespace);
421
422 Ok(file_source(
423 self,
424 data_dir,
425 cx.shutdown,
426 cx.out,
427 acknowledgements,
428 log_namespace,
429 ))
430 }
431
432 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434 let host_key = self
435 .host_key
436 .clone()
437 .unwrap_or(log_schema().host_key().cloned().into())
438 .path
439 .map(LegacyKey::Overwrite);
440
441 let offset_key = self
442 .offset_key
443 .clone()
444 .and_then(|k| k.path)
445 .map(LegacyKey::Overwrite);
446
447 let schema_definition = BytesDeserializerConfig
448 .schema_definition(global_log_namespace.merge(self.log_namespace))
449 .with_standard_vector_source_metadata()
450 .with_source_metadata(
451 Self::NAME,
452 host_key,
453 &owned_value_path!("host"),
454 Kind::bytes().or_undefined(),
455 Some("host"),
456 )
457 .with_source_metadata(
458 Self::NAME,
459 offset_key,
460 &owned_value_path!("offset"),
461 Kind::integer(),
462 None,
463 )
464 .with_source_metadata(
465 Self::NAME,
466 file_key,
467 &owned_value_path!("path"),
468 Kind::bytes(),
469 None,
470 );
471
472 vec![SourceOutput::new_maybe_logs(
473 DataType::Log,
474 schema_definition,
475 )]
476 }
477
478 fn can_acknowledge(&self) -> bool {
479 true
480 }
481}
482
483pub fn file_source(
484 config: &FileConfig,
485 data_dir: PathBuf,
486 shutdown: ShutdownSignal,
487 mut out: SourceSender,
488 acknowledgements: bool,
489 log_namespace: LogNamespace,
490) -> super::Source {
491 if config.include.is_empty() {
493 error!(
494 message = "`include` configuration option must contain at least one file pattern.",
495 internal_log_rate_limit = false
496 );
497 return Box::pin(future::ready(Err(())));
498 }
499
500 let exclude_patterns = config
501 .exclude
502 .iter()
503 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504 .collect::<Vec<PathBuf>>();
505 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507 let (ignore_checkpoints, read_from) = reconcile_position_options(
508 config.start_at_beginning,
509 config.ignore_checkpoints,
510 Some(config.read_from),
511 );
512
513 let emitter = FileSourceInternalEventsEmitter {
514 include_file_metric_tag: config.internal_metrics.include_file_tag,
515 };
516
517 let paths_provider = Glob::new(
518 &config.include,
519 &exclude_patterns,
520 MatchOptions::default(),
521 emitter.clone(),
522 )
523 .expect("invalid glob patterns");
524
525 let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527 let line_delimiter_as_bytes = match encoding_charset {
530 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531 None => Bytes::from(config.line_delimiter.clone()),
532 };
533
534 let checkpointer = Checkpointer::new(&data_dir);
535 let strategy = config.fingerprint.clone().into();
536
537 let file_server = FileServer {
538 paths_provider,
539 max_read_bytes: config.max_read_bytes,
540 ignore_checkpoints,
541 read_from,
542 ignore_before,
543 max_line_bytes: config.max_line_bytes,
544 line_delimiter: line_delimiter_as_bytes,
545 data_dir,
546 glob_minimum_cooldown,
547 fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548 oldest_first: config.oldest_first,
549 remove_after: config.remove_after_secs.map(Duration::from_secs),
550 emitter,
551 rotate_wait: config.rotate_wait,
552 };
553
554 let event_metadata = EventMetadata {
555 host_key: config
556 .host_key
557 .clone()
558 .unwrap_or(log_schema().host_key().cloned().into())
559 .path,
560 hostname: crate::get_hostname().ok(),
561 file_key: config.file_key.clone().path,
562 offset_key: config.offset_key.clone().and_then(|k| k.path),
563 };
564
565 let include = config.include.clone();
566 let exclude = config.exclude.clone();
567 let multiline_config = config.multiline.clone();
568 let message_start_indicator = config.message_start_indicator.clone();
569 let multi_line_timeout = config.multi_line_timeout;
570
571 let (finalizer, shutdown_checkpointer) = if acknowledgements {
572 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582 let checkpoints = checkpointer.view();
583 tokio::spawn(async move {
584 while let Some((status, entry)) = ack_stream.next().await {
585 if status == BatchStatus::Delivered {
586 checkpoints.update(entry.file_id, entry.offset);
587 }
588 }
589 send_shutdown.send(())
590 });
591 (Some(finalizer), shutdown2.map(|_| ()).boxed())
592 } else {
593 (None, shutdown.clone().map(|_| ()).boxed())
596 };
597
598 let checkpoints = checkpointer.view();
599 let include_file_metric_tag = config.internal_metrics.include_file_tag;
600 Box::pin(async move {
601 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603 let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607 let rx = rx
608 .map(futures::stream::iter)
609 .flatten()
610 .map(move |mut line| {
611 emit!(FileBytesReceived {
612 byte_size: line.text.len(),
613 file: &line.filename,
614 include_file_metric_tag,
615 });
616 line.text = match encoding_decoder.as_mut() {
618 Some(d) => d.decode_to_utf8(line.text),
619 None => line.text,
620 };
621 line
622 });
623
624 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625 if let Some(ref multiline_config) = multiline_config {
626 wrap_with_line_agg(
627 rx,
628 multiline_config.try_into().unwrap(), )
630 } else if let Some(msi) = message_start_indicator {
631 wrap_with_line_agg(
632 rx,
633 line_agg::Config::for_legacy(
634 Regex::new(&msi).unwrap(), multi_line_timeout,
636 ),
637 )
638 } else {
639 Box::new(rx)
640 };
641
642 let span = Span::current();
645 let mut messages = messages.map(move |line| {
646 let mut event = create_event(
647 line.text,
648 line.start_offset,
649 &line.filename,
650 &event_metadata,
651 log_namespace,
652 include_file_metric_tag,
653 );
654
655 if let Some(finalizer) = &finalizer {
656 let (batch, receiver) = BatchNotifier::new_with_receiver();
657 event = event.with_batch_notifier(&batch);
658 let entry = FinalizerEntry {
659 file_id: line.file_id,
660 offset: line.end_offset,
661 };
662 finalizer.add(entry, receiver);
664 } else {
665 checkpoints.update(line.file_id, line.end_offset);
666 }
667 event
668 });
669 tokio::spawn(async move {
670 match out
671 .send_event_stream(&mut messages)
672 .instrument(span.or_current())
673 .await
674 {
675 Ok(()) => {
676 debug!("Finished sending.");
677 }
678 Err(_) => {
679 let (count, _) = messages.size_hint();
680 emit!(StreamClosedError { count });
681 }
682 }
683 });
684
685 let span = info_span!("file_server");
686 tokio::task::spawn_blocking(move || {
687 let _enter = span.enter();
688 let rt = tokio::runtime::Handle::current();
689 let result =
690 rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691 emit!(FileOpen { count: 0 });
692 result.expect("file server exited with an error");
696 })
697 .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698 .await
699 })
700}
701
702fn reconcile_position_options(
705 start_at_beginning: Option<bool>,
706 ignore_checkpoints: Option<bool>,
707 read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709 if start_at_beginning.is_some() {
710 warn!(
711 message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712 )
713 }
714
715 match start_at_beginning {
716 Some(true) => (
717 ignore_checkpoints.unwrap_or(true),
718 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719 ),
720 _ => (
721 ignore_checkpoints.unwrap_or(false),
722 read_from.map(Into::into).unwrap_or_default(),
723 ),
724 }
725}
726
727fn wrap_with_line_agg(
728 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729 config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731 let logic = line_agg::Logic::new(config);
732 Box::new(
733 LineAgg::new(
734 rx.map(|line| {
735 (
736 line.filename,
737 line.text,
738 (line.file_id, line.start_offset, line.end_offset),
739 )
740 }),
741 logic,
742 )
743 .map(
744 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745 text,
746 filename,
747 file_id,
748 start_offset,
749 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750 lastline_end_offset
751 }),
752 },
753 ),
754 )
755}
756
757struct EventMetadata {
758 host_key: Option<OwnedValuePath>,
759 hostname: Option<String>,
760 file_key: Option<OwnedValuePath>,
761 offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765 line: Bytes,
766 offset: u64,
767 file: &str,
768 meta: &EventMetadata,
769 log_namespace: LogNamespace,
770 include_file_metric_tag: bool,
771) -> LogEvent {
772 let deserializer = BytesDeserializer;
773 let mut event = deserializer.parse_single(line, log_namespace);
774
775 log_namespace.insert_vector_metadata(
776 &mut event,
777 log_schema().source_type_key(),
778 path!("source_type"),
779 Bytes::from_static(FileConfig::NAME.as_bytes()),
780 );
781 log_namespace.insert_vector_metadata(
782 &mut event,
783 log_schema().timestamp_key(),
784 path!("ingest_timestamp"),
785 Utc::now(),
786 );
787
788 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789 if let Some(hostname) = &meta.hostname {
791 log_namespace.insert_source_metadata(
792 FileConfig::NAME,
793 &mut event,
794 legacy_host_key,
795 path!("host"),
796 hostname.clone(),
797 );
798 }
799
800 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801 log_namespace.insert_source_metadata(
802 FileConfig::NAME,
803 &mut event,
804 legacy_offset_key,
805 path!("offset"),
806 offset,
807 );
808
809 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810 log_namespace.insert_source_metadata(
811 FileConfig::NAME,
812 &mut event,
813 legacy_file_key,
814 path!("path"),
815 file,
816 );
817
818 emit!(FileEventsReceived {
819 count: 1,
820 file,
821 byte_size: event.estimated_json_encoded_size_of(),
822 include_file_metric_tag,
823 });
824
825 event
826}
827
828#[cfg(test)]
829mod tests {
830 use std::{
831 collections::HashSet,
832 fs::{self, File},
833 future::Future,
834 io::{Seek, Write},
835 sync::{
836 Arc,
837 atomic::{AtomicUsize, Ordering},
838 },
839 };
840
841 use encoding_rs::UTF_16LE;
842 use similar_asserts::assert_eq;
843 use tempfile::tempdir;
844 use tokio::time::{Duration, sleep, timeout};
845 use vector_lib::schema::Definition;
846 use vrl::{value, value::kind::Collection};
847
848 use super::*;
849 use crate::{
850 config::Config,
851 event::{Event, EventStatus, Value},
852 shutdown::ShutdownSignal,
853 sources::file,
854 test_util::{
855 components::{FILE_SOURCE_TAGS, assert_source_compliance},
856 wait_for_atomic_usize_timeout_ms,
857 },
858 };
859
860 #[test]
861 fn generate_config() {
862 crate::test_util::test_generate_config::<FileConfig>();
863 }
864
865 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
866 let data_dir = dir.path().join(".data");
869 fs::create_dir_all(&data_dir).unwrap();
870 file::FileConfig {
871 fingerprint: FingerprintConfig::Checksum {
872 ignored_header_bytes: 0,
873 lines: 1,
874 },
875 data_dir: Some(data_dir),
876 glob_minimum_cooldown_ms: Duration::from_millis(100),
877 internal_metrics: FileInternalMetricsConfig {
878 include_file_tag: true,
879 },
880 ..Default::default()
881 }
882 }
883
884 async fn sleep_500_millis() {
885 sleep(Duration::from_millis(500)).await;
886 }
887
888 #[test]
889 fn parse_config() {
890 let config: FileConfig = toml::from_str(
891 r#"
892 include = [ "/var/log/**/*.log" ]
893 file_key = "file"
894 glob_minimum_cooldown_ms = 1000
895 multi_line_timeout = 1000
896 max_read_bytes = 2048
897 line_delimiter = "\n"
898 "#,
899 )
900 .unwrap();
901 assert_eq!(config, FileConfig::default());
902 assert_eq!(
903 config.fingerprint,
904 FingerprintConfig::Checksum {
905 ignored_header_bytes: 0,
906 lines: 1
907 }
908 );
909
910 let config: FileConfig = toml::from_str(
911 r#"
912 include = [ "/var/log/**/*.log" ]
913 [fingerprint]
914 strategy = "device_and_inode"
915 "#,
916 )
917 .unwrap();
918 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
919
920 let config: FileConfig = toml::from_str(
921 r#"
922 include = [ "/var/log/**/*.log" ]
923 [fingerprint]
924 strategy = "checksum"
925 bytes = 128
926 ignored_header_bytes = 512
927 "#,
928 )
929 .unwrap();
930 assert_eq!(
931 config.fingerprint,
932 FingerprintConfig::Checksum {
933 ignored_header_bytes: 512,
934 lines: 1
935 }
936 );
937
938 let config: FileConfig = toml::from_str(
939 r#"
940 include = [ "/var/log/**/*.log" ]
941 [encoding]
942 charset = "utf-16le"
943 "#,
944 )
945 .unwrap();
946 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
947
948 let config: FileConfig = toml::from_str(
949 r#"
950 include = [ "/var/log/**/*.log" ]
951 read_from = "beginning"
952 "#,
953 )
954 .unwrap();
955 assert_eq!(config.read_from, ReadFromConfig::Beginning);
956
957 let config: FileConfig = toml::from_str(
958 r#"
959 include = [ "/var/log/**/*.log" ]
960 read_from = "end"
961 "#,
962 )
963 .unwrap();
964 assert_eq!(config.read_from, ReadFromConfig::End);
965 }
966
967 #[test]
968 fn resolve_data_dir() {
969 let global_dir = tempdir().unwrap();
970 let local_dir = tempdir().unwrap();
971
972 let mut config = Config::default();
973 config.global.data_dir = global_dir.keep().into();
974
975 let local_data_dir = Some(local_dir.path().to_path_buf());
977 let res = config
978 .global
979 .resolve_and_validate_data_dir(local_data_dir.as_ref())
980 .unwrap();
981 assert_eq!(res, local_dir.path());
982
983 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
985 assert_eq!(res, config.global.data_dir.unwrap());
986 }
987
988 #[test]
989 fn output_schema_definition_vector_namespace() {
990 let definitions = FileConfig::default()
991 .outputs(LogNamespace::Vector)
992 .remove(0)
993 .schema_definition(true);
994
995 assert_eq!(
996 definitions,
997 Some(
998 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
999 .with_meaning(OwnedTargetPath::event_root(), "message")
1000 .with_metadata_field(
1001 &owned_value_path!("vector", "source_type"),
1002 Kind::bytes(),
1003 None
1004 )
1005 .with_metadata_field(
1006 &owned_value_path!("vector", "ingest_timestamp"),
1007 Kind::timestamp(),
1008 None
1009 )
1010 .with_metadata_field(
1011 &owned_value_path!("file", "host"),
1012 Kind::bytes().or_undefined(),
1013 Some("host")
1014 )
1015 .with_metadata_field(
1016 &owned_value_path!("file", "offset"),
1017 Kind::integer(),
1018 None
1019 )
1020 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1021 )
1022 )
1023 }
1024
1025 #[test]
1026 fn output_schema_definition_legacy_namespace() {
1027 let definitions = FileConfig::default()
1028 .outputs(LogNamespace::Legacy)
1029 .remove(0)
1030 .schema_definition(true);
1031
1032 assert_eq!(
1033 definitions,
1034 Some(
1035 Definition::new_with_default_metadata(
1036 Kind::object(Collection::empty()),
1037 [LogNamespace::Legacy]
1038 )
1039 .with_event_field(
1040 &owned_value_path!("message"),
1041 Kind::bytes(),
1042 Some("message")
1043 )
1044 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1045 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1046 .with_event_field(
1047 &owned_value_path!("host"),
1048 Kind::bytes().or_undefined(),
1049 Some("host")
1050 )
1051 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1052 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1053 )
1054 )
1055 }
1056
1057 #[test]
1058 fn create_event_legacy_namespace() {
1059 let line = Bytes::from("hello world");
1060 let file = "some_file.rs";
1061 let offset: u64 = 0;
1062
1063 let meta = EventMetadata {
1064 host_key: Some(owned_value_path!("host")),
1065 hostname: Some("Some.Machine".to_string()),
1066 file_key: Some(owned_value_path!("file")),
1067 offset_key: Some(owned_value_path!("offset")),
1068 };
1069 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1070
1071 assert_eq!(log["file"], "some_file.rs".into());
1072 assert_eq!(log["host"], "Some.Machine".into());
1073 assert_eq!(log["offset"], 0.into());
1074 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1075 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1076 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1077 }
1078
1079 #[test]
1080 fn create_event_custom_fields_legacy_namespace() {
1081 let line = Bytes::from("hello world");
1082 let file = "some_file.rs";
1083 let offset: u64 = 0;
1084
1085 let meta = EventMetadata {
1086 host_key: Some(owned_value_path!("hostname")),
1087 hostname: Some("Some.Machine".to_string()),
1088 file_key: Some(owned_value_path!("file_path")),
1089 offset_key: Some(owned_value_path!("off")),
1090 };
1091 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1092
1093 assert_eq!(log["file_path"], "some_file.rs".into());
1094 assert_eq!(log["hostname"], "Some.Machine".into());
1095 assert_eq!(log["off"], 0.into());
1096 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1097 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1098 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1099 }
1100
1101 #[test]
1102 fn create_event_vector_namespace() {
1103 let line = Bytes::from("hello world");
1104 let file = "some_file.rs";
1105 let offset: u64 = 0;
1106
1107 let meta = EventMetadata {
1108 host_key: Some(owned_value_path!("ignored")),
1109 hostname: Some("Some.Machine".to_string()),
1110 file_key: Some(owned_value_path!("ignored")),
1111 offset_key: Some(owned_value_path!("ignored")),
1112 };
1113 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1114
1115 assert_eq!(log.value(), &value!("hello world"));
1116
1117 assert_eq!(
1118 log.metadata()
1119 .value()
1120 .get(path!("vector", "source_type"))
1121 .unwrap(),
1122 &value!("file")
1123 );
1124 assert!(
1125 log.metadata()
1126 .value()
1127 .get(path!("vector", "ingest_timestamp"))
1128 .unwrap()
1129 .is_timestamp()
1130 );
1131
1132 assert_eq!(
1133 log.metadata()
1134 .value()
1135 .get(path!(FileConfig::NAME, "host"))
1136 .unwrap(),
1137 &value!("Some.Machine")
1138 );
1139 assert_eq!(
1140 log.metadata()
1141 .value()
1142 .get(path!(FileConfig::NAME, "offset"))
1143 .unwrap(),
1144 &value!(0)
1145 );
1146 assert_eq!(
1147 log.metadata()
1148 .value()
1149 .get(path!(FileConfig::NAME, "path"))
1150 .unwrap(),
1151 &value!("some_file.rs")
1152 );
1153 }
1154
1155 #[tokio::test]
1156 async fn file_happy_path() {
1157 let n = 5;
1158
1159 let dir = tempdir().unwrap();
1160 let config = file::FileConfig {
1161 include: vec![dir.path().join("*")],
1162 ..test_default_file_config(&dir)
1163 };
1164
1165 let path1 = dir.path().join("file1");
1166 let path2 = dir.path().join("file2");
1167
1168 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1169 let mut file1 = File::create(&path1).unwrap();
1170 let mut file2 = File::create(&path2).unwrap();
1171
1172 for i in 0..n {
1173 writeln!(&mut file1, "hello {i}").unwrap();
1174 writeln!(&mut file2, "goodbye {i}").unwrap();
1175 }
1176
1177 file1.flush().unwrap();
1178 file2.flush().unwrap();
1179
1180 sleep_500_millis().await;
1181 })
1182 .await;
1183
1184 let mut hello_i = 0;
1185 let mut goodbye_i = 0;
1186
1187 for event in received {
1188 let line =
1189 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1190 if line.starts_with("hello") {
1191 assert_eq!(line, format!("hello {}", hello_i));
1192 assert_eq!(
1193 event.as_log()["file"].to_string_lossy(),
1194 path1.to_str().unwrap()
1195 );
1196 hello_i += 1;
1197 } else {
1198 assert_eq!(line, format!("goodbye {}", goodbye_i));
1199 assert_eq!(
1200 event.as_log()["file"].to_string_lossy(),
1201 path2.to_str().unwrap()
1202 );
1203 goodbye_i += 1;
1204 }
1205 }
1206 assert_eq!(hello_i, n);
1207 assert_eq!(goodbye_i, n);
1208 }
1209
1210 #[tokio::test]
1212 async fn file_read_empty_lines() {
1213 let n = 5;
1214
1215 let dir = tempdir().unwrap();
1216 let config = file::FileConfig {
1217 include: vec![dir.path().join("*")],
1218 ..test_default_file_config(&dir)
1219 };
1220
1221 let path = dir.path().join("file");
1222
1223 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1224 let mut file = File::create(&path).unwrap();
1225
1226 writeln!(&mut file, "line for checkpointing").unwrap();
1227 for _i in 0..n {
1228 writeln!(&mut file).unwrap();
1229 }
1230 file.flush().unwrap();
1231
1232 sleep_500_millis().await;
1233 })
1234 .await;
1235
1236 assert_eq!(received.len(), n + 1);
1237 }
1238
1239 #[tokio::test]
1240 async fn file_truncate() {
1241 let n = 5;
1242
1243 let dir = tempdir().unwrap();
1244 let config = file::FileConfig {
1245 include: vec![dir.path().join("*")],
1246 ..test_default_file_config(&dir)
1247 };
1248 let path = dir.path().join("file");
1249 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1250 let mut file = File::create(&path).unwrap();
1251
1252 for i in 0..n {
1253 writeln!(&mut file, "pretrunc {i}").unwrap();
1254 }
1255
1256 file.flush().unwrap();
1257 sleep_500_millis().await; file.set_len(0).unwrap();
1260 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1261
1262 file.sync_all().unwrap();
1263 sleep_500_millis().await; for i in 0..n {
1266 writeln!(&mut file, "posttrunc {i}").unwrap();
1267 }
1268
1269 file.flush().unwrap();
1270 sleep_500_millis().await;
1271 })
1272 .await;
1273
1274 let mut i = 0;
1275 let mut pre_trunc = true;
1276
1277 for event in received {
1278 assert_eq!(
1279 event.as_log()["file"].to_string_lossy(),
1280 path.to_str().unwrap()
1281 );
1282
1283 let line =
1284 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1285
1286 if pre_trunc {
1287 assert_eq!(line, format!("pretrunc {}", i));
1288 } else {
1289 assert_eq!(line, format!("posttrunc {}", i));
1290 }
1291
1292 i += 1;
1293 if i == n {
1294 i = 0;
1295 pre_trunc = false;
1296 }
1297 }
1298 }
1299
1300 #[tokio::test]
1301 async fn file_rotate() {
1302 let n = 5;
1303
1304 let dir = tempdir().unwrap();
1305 let config = file::FileConfig {
1306 include: vec![dir.path().join("*")],
1307 ..test_default_file_config(&dir)
1308 };
1309
1310 let path = dir.path().join("file");
1311 let archive_path = dir.path().join("file");
1312 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1313 let mut file = File::create(&path).unwrap();
1314
1315 for i in 0..n {
1316 writeln!(&mut file, "prerot {i}").unwrap();
1317 }
1318
1319 file.flush().unwrap();
1320 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1323 file.sync_all().unwrap();
1324
1325 let mut file = File::create(&path).unwrap();
1326
1327 file.sync_all().unwrap();
1328 sleep_500_millis().await; for i in 0..n {
1331 writeln!(&mut file, "postrot {i}").unwrap();
1332 }
1333
1334 file.flush().unwrap();
1335 sleep_500_millis().await;
1336 })
1337 .await;
1338
1339 let mut i = 0;
1340 let mut pre_rot = true;
1341
1342 for event in received {
1343 assert_eq!(
1344 event.as_log()["file"].to_string_lossy(),
1345 path.to_str().unwrap()
1346 );
1347
1348 let line =
1349 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1350
1351 if pre_rot {
1352 assert_eq!(line, format!("prerot {}", i));
1353 } else {
1354 assert_eq!(line, format!("postrot {}", i));
1355 }
1356
1357 i += 1;
1358 if i == n {
1359 i = 0;
1360 pre_rot = false;
1361 }
1362 }
1363 }
1364
1365 #[tokio::test]
1366 async fn file_multiple_paths() {
1367 let n = 5;
1368
1369 let dir = tempdir().unwrap();
1370 let config = file::FileConfig {
1371 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1372 exclude: vec![dir.path().join("a.*.txt")],
1373 ..test_default_file_config(&dir)
1374 };
1375
1376 let path1 = dir.path().join("a.txt");
1377 let path2 = dir.path().join("b.txt");
1378 let path3 = dir.path().join("a.log");
1379 let path4 = dir.path().join("a.ignore.txt");
1380 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1381 let mut file1 = File::create(&path1).unwrap();
1382 let mut file2 = File::create(&path2).unwrap();
1383 let mut file3 = File::create(&path3).unwrap();
1384 let mut file4 = File::create(&path4).unwrap();
1385
1386 for i in 0..n {
1387 writeln!(&mut file1, "1 {i}").unwrap();
1388 writeln!(&mut file2, "2 {i}").unwrap();
1389 writeln!(&mut file3, "3 {i}").unwrap();
1390 writeln!(&mut file4, "4 {i}").unwrap();
1391 }
1392 file1.flush().unwrap();
1393 file2.flush().unwrap();
1394 file3.flush().unwrap();
1395 file4.flush().unwrap();
1396
1397 sleep_500_millis().await;
1398 })
1399 .await;
1400
1401 let mut is = [0; 3];
1402
1403 for event in received {
1404 let line =
1405 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1406 let mut split = line.split(' ');
1407 let file = split.next().unwrap().parse::<usize>().unwrap();
1408 assert_ne!(file, 4);
1409 let i = split.next().unwrap().parse::<usize>().unwrap();
1410
1411 assert_eq!(is[file - 1], i);
1412 is[file - 1] += 1;
1413 }
1414
1415 assert_eq!(is, [n as usize; 3]);
1416 }
1417
1418 #[tokio::test]
1419 async fn file_exclude_paths() {
1420 let n = 5;
1421
1422 let dir = tempdir().unwrap();
1423 let config = file::FileConfig {
1424 include: vec![dir.path().join("a//b/*.log.*")],
1425 exclude: vec![dir.path().join("a//b/test.log.*")],
1426 ..test_default_file_config(&dir)
1427 };
1428
1429 let path1 = dir.path().join("a//b/a.log.1");
1430 let path2 = dir.path().join("a//b/test.log.1");
1431 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1432 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1433 let mut file1 = File::create(&path1).unwrap();
1434 let mut file2 = File::create(&path2).unwrap();
1435
1436 for i in 0..n {
1437 writeln!(&mut file1, "1 {i}").unwrap();
1438 writeln!(&mut file2, "2 {i}").unwrap();
1439 }
1440
1441 file1.flush().unwrap();
1442 file2.flush().unwrap();
1443 sleep_500_millis().await;
1444 })
1445 .await;
1446
1447 let mut is = [0; 1];
1448
1449 for event in received {
1450 let line =
1451 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1452 let mut split = line.split(' ');
1453 let file = split.next().unwrap().parse::<usize>().unwrap();
1454 assert_ne!(file, 4);
1455 let i = split.next().unwrap().parse::<usize>().unwrap();
1456
1457 assert_eq!(is[file - 1], i);
1458 is[file - 1] += 1;
1459 }
1460
1461 assert_eq!(is, [n as usize; 1]);
1462 }
1463
1464 #[tokio::test]
1465 async fn file_key_acknowledged() {
1466 file_key(Acks).await
1467 }
1468
1469 #[tokio::test]
1470 async fn file_key_no_acknowledge() {
1471 file_key(NoAcks).await
1472 }
1473
1474 async fn file_key(acks: AckingMode) {
1475 {
1477 let dir = tempdir().unwrap();
1478 let config = file::FileConfig {
1479 include: vec![dir.path().join("*")],
1480 ..test_default_file_config(&dir)
1481 };
1482
1483 let path = dir.path().join("file");
1484 let received =
1485 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1486 let mut file = File::create(&path).unwrap();
1487
1488 writeln!(&mut file, "hello there").unwrap();
1489 file.flush().unwrap();
1490
1491 sleep_500_millis().await;
1492 })
1493 .await;
1494
1495 assert_eq!(received.len(), 1);
1496 assert_eq!(
1497 received[0].as_log()["file"].to_string_lossy(),
1498 path.to_str().unwrap()
1499 );
1500 }
1501
1502 {
1504 let dir = tempdir().unwrap();
1505 let config = file::FileConfig {
1506 include: vec![dir.path().join("*")],
1507 file_key: OptionalValuePath::from(owned_value_path!("source")),
1508 ..test_default_file_config(&dir)
1509 };
1510
1511 let path = dir.path().join("file");
1512 let received =
1513 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1514 let mut file = File::create(&path).unwrap();
1515
1516 writeln!(&mut file, "hello there").unwrap();
1517 file.flush().unwrap();
1518
1519 sleep_500_millis().await;
1520 })
1521 .await;
1522
1523 assert_eq!(received.len(), 1);
1524 assert_eq!(
1525 received[0].as_log()["source"].to_string_lossy(),
1526 path.to_str().unwrap()
1527 );
1528 }
1529
1530 {
1532 let dir = tempdir().unwrap();
1533 let config = file::FileConfig {
1534 include: vec![dir.path().join("*")],
1535 ..test_default_file_config(&dir)
1536 };
1537
1538 let path = dir.path().join("file");
1539 let received =
1540 run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1541 let mut file = File::create(&path).unwrap();
1542
1543 writeln!(&mut file, "hello there").unwrap();
1544
1545 file.flush().unwrap();
1546 sleep_500_millis().await;
1547 })
1548 .await;
1549
1550 assert_eq!(received.len(), 1);
1551 assert_eq!(
1552 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1553 vec![
1554 default_file_key()
1555 .path
1556 .expect("file key to exist")
1557 .to_string()
1558 .into(),
1559 log_schema().host_key().unwrap().to_string().into(),
1560 log_schema().message_key().unwrap().to_string().into(),
1561 log_schema().timestamp_key().unwrap().to_string().into(),
1562 log_schema().source_type_key().unwrap().to_string().into()
1563 ]
1564 .into_iter()
1565 .collect::<HashSet<_>>()
1566 );
1567 }
1568 }
1569
1570 #[tokio::test]
1571 async fn file_start_position_server_restart_acknowledged() {
1572 file_start_position_server_restart(Acks).await
1573 }
1574
1575 #[tokio::test]
1576 async fn file_start_position_server_restart_no_acknowledge() {
1577 file_start_position_server_restart(NoAcks).await
1578 }
1579
1580 async fn file_start_position_server_restart(acking: AckingMode) {
1581 let dir = tempdir().unwrap();
1582 let config = file::FileConfig {
1583 include: vec![dir.path().join("*")],
1584 ..test_default_file_config(&dir)
1585 };
1586
1587 let path = dir.path().join("file");
1588 let mut file = File::create(&path).unwrap();
1589 writeln!(&mut file, "zeroth line").unwrap();
1590 file.flush().unwrap();
1591
1592 {
1594 let received =
1595 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1596 sleep_500_millis().await;
1597 writeln!(&mut file, "first line").unwrap();
1598 file.flush().unwrap();
1599 sleep_500_millis().await;
1600 })
1601 .await;
1602
1603 let lines = extract_messages_string(received);
1604 assert_eq!(lines, vec!["zeroth line", "first line"]);
1605 }
1606 {
1608 let received =
1609 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1610 sleep_500_millis().await;
1611 writeln!(&mut file, "second line").unwrap();
1612 file.flush().unwrap();
1613 sleep_500_millis().await;
1614 })
1615 .await;
1616
1617 let lines = extract_messages_string(received);
1618 assert_eq!(lines, vec!["second line"]);
1619 }
1620 {
1622 let config = file::FileConfig {
1623 include: vec![dir.path().join("*")],
1624 ignore_checkpoints: Some(true),
1625 read_from: ReadFromConfig::Beginning,
1626 ..test_default_file_config(&dir)
1627 };
1628 let received =
1629 run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1630 sleep_500_millis().await;
1631 writeln!(&mut file, "third line").unwrap();
1632 file.flush().unwrap();
1633 sleep_500_millis().await;
1634 })
1635 .await;
1636
1637 let lines = extract_messages_string(received);
1638 assert_eq!(
1639 lines,
1640 vec!["zeroth line", "first line", "second line", "third line"]
1641 );
1642 }
1643 }
1644
1645 #[tokio::test]
1646 async fn file_start_position_server_restart_unfinalized() {
1647 let dir = tempdir().unwrap();
1648 let config = file::FileConfig {
1649 include: vec![dir.path().join("*")],
1650 ..test_default_file_config(&dir)
1651 };
1652
1653 let path = dir.path().join("file");
1654 let mut file = File::create(&path).unwrap();
1655 writeln!(&mut file, "the line").unwrap();
1656 file.flush().unwrap();
1657
1658 let received = run_file_source(
1660 &config,
1661 false,
1662 Unfinalized,
1663 LogNamespace::Legacy,
1664 None,
1665 sleep(Duration::from_secs(5)),
1666 )
1667 .await;
1668 let lines = extract_messages_string(received);
1669 assert_eq!(lines, vec!["the line"]);
1670
1671 let received = run_file_source(
1673 &config,
1674 false,
1675 Unfinalized,
1676 LogNamespace::Legacy,
1677 None,
1678 sleep(Duration::from_secs(5)),
1679 )
1680 .await;
1681 let lines = extract_messages_string(received);
1682 assert_eq!(lines, vec!["the line"]);
1683 }
1684
1685 #[tokio::test]
1686 async fn file_duplicate_processing_after_restart() {
1687 let dir = tempdir().unwrap();
1688 let config = file::FileConfig {
1689 include: vec![dir.path().join("*")],
1690 ..test_default_file_config(&dir)
1691 };
1692
1693 let path = dir.path().join("file");
1694 let mut file = File::create(&path).unwrap();
1695
1696 let line_count = 4000;
1697 for i in 0..line_count {
1698 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1699 }
1700 file.flush().unwrap();
1701
1702 let received = run_file_source(
1704 &config,
1705 true,
1706 Acks,
1707 LogNamespace::Legacy,
1708 None,
1709 sleep_500_millis(),
1711 )
1712 .await;
1713 let lines = extract_messages_string(received);
1714
1715 assert!(lines.len() < line_count);
1718
1719 let remaining = line_count - lines.len();
1724 let event_count = Arc::new(AtomicUsize::new(0));
1725 let received = run_file_source(
1726 &config,
1727 true,
1728 Acks,
1729 LogNamespace::Legacy,
1730 Some(Arc::clone(&event_count)),
1731 async {
1732 wait_for_atomic_usize_timeout_ms(
1733 Arc::clone(&event_count),
1734 |n| n >= remaining,
1735 5_000,
1736 )
1737 .await;
1738 },
1739 )
1740 .await;
1741 let lines2 = extract_messages_string(received);
1742
1743 assert_eq!(lines.len() + lines2.len(), line_count);
1745 }
1746
1747 #[tokio::test]
1748 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1749 file_start_position_server_restart_with_file_rotation(Acks).await
1750 }
1751
1752 #[tokio::test]
1753 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1754 file_start_position_server_restart_with_file_rotation(NoAcks).await
1755 }
1756
1757 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1758 let dir = tempdir().unwrap();
1759 let config = file::FileConfig {
1760 include: vec![dir.path().join("*")],
1761 ..test_default_file_config(&dir)
1762 };
1763
1764 let path = dir.path().join("file");
1765 let path_for_old_file = dir.path().join("file.old");
1766 {
1768 let received =
1769 run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1770 let mut file = File::create(&path).unwrap();
1771 writeln!(&mut file, "first line").unwrap();
1772 file.flush().unwrap();
1773 sleep_500_millis().await;
1774 })
1775 .await;
1776
1777 let lines = extract_messages_string(received);
1778 assert_eq!(lines, vec!["first line"]);
1779 }
1780 fs::rename(&path, &path_for_old_file).expect("could not rename");
1782 {
1785 let received =
1786 run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1787 let mut file = File::create(&path).unwrap();
1788 writeln!(&mut file, "second line").unwrap();
1789 file.flush().unwrap();
1790 sleep_500_millis().await;
1791 })
1792 .await;
1793
1794 let lines = extract_messages_string(received);
1795 assert_eq!(lines, vec!["second line"]);
1796 }
1797 }
1798
1799 #[cfg(unix)] #[tokio::test]
1801 async fn file_start_position_ignore_old_files() {
1802 use std::{
1803 os::unix::io::AsRawFd,
1804 time::{Duration, SystemTime},
1805 };
1806
1807 let dir = tempdir().unwrap();
1808 let config = file::FileConfig {
1809 include: vec![dir.path().join("*")],
1810 ignore_older_secs: Some(5),
1811 ..test_default_file_config(&dir)
1812 };
1813
1814 let before_path = dir.path().join("before");
1815 let mut before_file = File::create(&before_path).unwrap();
1816 let after_path = dir.path().join("after");
1817 let mut after_file = File::create(&after_path).unwrap();
1818
1819 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1823 let before = SystemTime::now() - Duration::from_secs(8);
1825 let after = SystemTime::now() - Duration::from_secs(2);
1826
1827 let before_time = libc::timeval {
1828 tv_sec: before
1829 .duration_since(SystemTime::UNIX_EPOCH)
1830 .unwrap()
1831 .as_secs() as _,
1832 tv_usec: 0,
1833 };
1834 let before_times = [before_time, before_time];
1835
1836 let after_time = libc::timeval {
1837 tv_sec: after
1838 .duration_since(SystemTime::UNIX_EPOCH)
1839 .unwrap()
1840 .as_secs() as _,
1841 tv_usec: 0,
1842 };
1843 let after_times = [after_time, after_time];
1844
1845 unsafe {
1846 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1847 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1848 }
1849 }
1850
1851 before_file.sync_all().unwrap();
1852 after_file.sync_all().unwrap();
1853
1854 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1855 sleep_500_millis().await;
1856 writeln!(&mut before_file, "second line").unwrap();
1857 writeln!(&mut after_file, "_second line").unwrap();
1858
1859 before_file.flush().unwrap();
1860 after_file.flush().unwrap();
1861 sleep_500_millis().await;
1862 })
1863 .await;
1864
1865 let before_lines = received
1866 .iter()
1867 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1868 .map(|event| {
1869 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1870 })
1871 .collect::<Vec<_>>();
1872 let after_lines = received
1873 .iter()
1874 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1875 .map(|event| {
1876 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1877 })
1878 .collect::<Vec<_>>();
1879 assert_eq!(before_lines, vec!["second line"]);
1880 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1881 }
1882
1883 #[tokio::test]
1884 async fn file_max_line_bytes() {
1885 let dir = tempdir().unwrap();
1886 let config = file::FileConfig {
1887 include: vec![dir.path().join("*")],
1888 max_line_bytes: 10,
1889 ..test_default_file_config(&dir)
1890 };
1891
1892 let path = dir.path().join("file");
1893 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1894 let mut file = File::create(&path).unwrap();
1895
1896 writeln!(&mut file, "short").unwrap();
1897 writeln!(&mut file, "this is too long").unwrap();
1898 writeln!(&mut file, "11 eleven11").unwrap();
1899 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1900 writeln!(&mut file, "{super_long}").unwrap();
1901 writeln!(&mut file, "exactly 10").unwrap();
1902 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1903
1904 file.flush().unwrap();
1905 sleep_500_millis().await;
1906 sleep_500_millis().await;
1907
1908 writeln!(&mut file, "and then continue").unwrap();
1909 writeln!(&mut file, "last short").unwrap();
1910 file.flush().unwrap();
1911
1912 sleep_500_millis().await;
1913 sleep_500_millis().await;
1914 }).await;
1915
1916 let received = extract_messages_value(received);
1917
1918 assert_eq!(
1919 received,
1920 vec!["short".into(), "exactly 10".into(), "last short".into()]
1921 );
1922 }
1923
1924 #[tokio::test]
1925 async fn test_multi_line_aggregation_legacy() {
1926 let dir = tempdir().unwrap();
1927 let config = file::FileConfig {
1928 include: vec![dir.path().join("*")],
1929 message_start_indicator: Some("INFO".into()),
1930 multi_line_timeout: 25,
1931 ..test_default_file_config(&dir)
1932 };
1933
1934 let path = dir.path().join("file");
1935 let event_count = Arc::new(AtomicUsize::new(0));
1936 let received = run_file_source(
1937 &config,
1938 false,
1939 NoAcks,
1940 LogNamespace::Legacy,
1941 Some(Arc::clone(&event_count)),
1942 async {
1943 let mut file = File::create(&path).unwrap();
1944
1945 writeln!(&mut file, "leftover foo").unwrap();
1949 writeln!(&mut file, "INFO hello").unwrap();
1950 writeln!(&mut file, "INFO goodbye").unwrap();
1951 writeln!(&mut file, "part of goodbye").unwrap();
1952 writeln!(&mut file, "INFO hi again").unwrap();
1953 writeln!(&mut file, "and some more").unwrap();
1954 writeln!(&mut file, "INFO hello").unwrap();
1955 file.flush().unwrap();
1956
1957 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
1960
1961 writeln!(&mut file, "too slow").unwrap();
1962 writeln!(&mut file, "INFO doesn't have").unwrap();
1963 writeln!(&mut file, "to be INFO in").unwrap();
1964 writeln!(&mut file, "the middle").unwrap();
1965 file.flush().unwrap();
1966
1967 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
1970 },
1971 )
1972 .await;
1973
1974 let received = extract_messages_value(received);
1975
1976 assert_eq!(
1977 received,
1978 vec![
1979 "leftover foo".into(),
1980 "INFO hello".into(),
1981 "INFO goodbye\npart of goodbye".into(),
1982 "INFO hi again\nand some more".into(),
1983 "INFO hello".into(),
1984 "too slow".into(),
1985 "INFO doesn't have".into(),
1986 "to be INFO in\nthe middle".into(),
1987 ]
1988 );
1989 }
1990
1991 #[tokio::test]
1992 async fn test_multi_line_aggregation() {
1993 let dir = tempdir().unwrap();
1994 let config = file::FileConfig {
1995 include: vec![dir.path().join("*")],
1996 multiline: Some(MultilineConfig {
1997 start_pattern: "INFO".to_owned(),
1998 condition_pattern: "INFO".to_owned(),
1999 mode: line_agg::Mode::HaltBefore,
2000 timeout_ms: Duration::from_millis(25),
2001 }),
2002 ..test_default_file_config(&dir)
2003 };
2004
2005 let path = dir.path().join("file");
2006 let event_count = Arc::new(AtomicUsize::new(0));
2007 let received = run_file_source(
2008 &config,
2009 false,
2010 NoAcks,
2011 LogNamespace::Legacy,
2012 Some(Arc::clone(&event_count)),
2013 async {
2014 let mut file = File::create(&path).unwrap();
2015
2016 writeln!(&mut file, "leftover foo").unwrap();
2020 writeln!(&mut file, "INFO hello").unwrap();
2021 writeln!(&mut file, "INFO goodbye").unwrap();
2022 writeln!(&mut file, "part of goodbye").unwrap();
2023 writeln!(&mut file, "INFO hi again").unwrap();
2024 writeln!(&mut file, "and some more").unwrap();
2025 writeln!(&mut file, "INFO hello").unwrap();
2026 file.flush().unwrap();
2027
2028 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
2031
2032 writeln!(&mut file, "too slow").unwrap();
2033 writeln!(&mut file, "INFO doesn't have").unwrap();
2034 writeln!(&mut file, "to be INFO in").unwrap();
2035 writeln!(&mut file, "the middle").unwrap();
2036 file.flush().unwrap();
2037
2038 wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
2041 },
2042 )
2043 .await;
2044
2045 let received = extract_messages_value(received);
2046
2047 assert_eq!(
2048 received,
2049 vec![
2050 "leftover foo".into(),
2051 "INFO hello".into(),
2052 "INFO goodbye\npart of goodbye".into(),
2053 "INFO hi again\nand some more".into(),
2054 "INFO hello".into(),
2055 "too slow".into(),
2056 "INFO doesn't have".into(),
2057 "to be INFO in\nthe middle".into(),
2058 ]
2059 );
2060 }
2061
2062 #[tokio::test]
2063 async fn test_multi_line_checkpointing() {
2064 let dir = tempdir().unwrap();
2065 let config = file::FileConfig {
2066 include: vec![dir.path().join("*")],
2067 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2068 multiline: Some(MultilineConfig {
2069 start_pattern: "INFO".to_owned(),
2070 condition_pattern: "INFO".to_owned(),
2071 mode: line_agg::Mode::HaltBefore,
2072 timeout_ms: Duration::from_millis(25), }),
2074 ..test_default_file_config(&dir)
2075 };
2076
2077 let path = dir.path().join("file");
2078 let mut file = File::create(&path).unwrap();
2079
2080 writeln!(&mut file, "INFO hello").unwrap();
2081 writeln!(&mut file, "part of hello").unwrap();
2082
2083 file.sync_all().unwrap();
2084
2085 let received = run_file_source(
2088 &config,
2089 true,
2090 Acks,
2091 LogNamespace::Legacy,
2092 None,
2093 sleep_500_millis(),
2094 )
2095 .await;
2096
2097 assert_eq!(received[0].as_log()["offset"], 0.into());
2098
2099 let lines = extract_messages_string(received);
2100 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2101
2102 let received_after_restart =
2104 run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2105 writeln!(&mut file, "INFO goodbye").unwrap();
2106 file.flush().unwrap();
2107 sleep_500_millis().await;
2108 })
2109 .await;
2110 assert_eq!(
2111 received_after_restart[0].as_log()["offset"],
2112 (lines[0].len() + 1).into()
2113 );
2114 let lines = extract_messages_string(received_after_restart);
2115 assert_eq!(lines, vec!["INFO goodbye"]);
2116 }
2117
2118 #[tokio::test]
2119 async fn test_fair_reads() {
2120 let dir = tempdir().unwrap();
2121 let config = file::FileConfig {
2122 include: vec![dir.path().join("*")],
2123 max_read_bytes: 1,
2124 oldest_first: false,
2125 ..test_default_file_config(&dir)
2126 };
2127
2128 let older_path = dir.path().join("z_older_file");
2129 let mut older = File::create(&older_path).unwrap();
2130
2131 writeln!(&mut older, "hello i am the old file").unwrap();
2132 writeln!(&mut older, "i have been around a while").unwrap();
2133 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2134 older.sync_all().unwrap();
2135
2136 let newer_path = dir.path().join("a_newer_file");
2137 let mut newer = File::create(&newer_path).unwrap();
2138
2139 writeln!(&mut newer, "and i am the new file").unwrap();
2140 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2141 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2142 newer.sync_all().unwrap();
2143
2144 let received = run_file_source(
2145 &config,
2146 false,
2147 NoAcks,
2148 LogNamespace::Legacy,
2149 None,
2150 sleep_500_millis(),
2151 )
2152 .await;
2153
2154 let received = extract_messages_value(received);
2155
2156 let old_first = vec![
2157 "hello i am the old file".into(),
2158 "and i am the new file".into(),
2159 "i have been around a while".into(),
2160 "this should be interleaved with the old one".into(),
2161 "you can read newer files at the same time".into(),
2162 "which is fine because we want fairness".into(),
2163 ];
2164 let new_first: Vec<_> = old_first
2165 .chunks(2)
2166 .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2167 .collect();
2168
2169 if received[0] == old_first[0] {
2170 assert_eq!(received, old_first);
2171 } else {
2172 assert_eq!(received, new_first);
2173 }
2174 }
2175
2176 #[tokio::test]
2177 async fn test_oldest_first() {
2178 let dir = tempdir().unwrap();
2179 let config = file::FileConfig {
2180 include: vec![dir.path().join("*")],
2181 max_read_bytes: 1,
2182 oldest_first: true,
2183 ..test_default_file_config(&dir)
2184 };
2185
2186 let older_path = dir.path().join("z_older_file");
2187 let mut older = File::create(&older_path).unwrap();
2188 older.sync_all().unwrap();
2189
2190 sleep_500_millis().await;
2192
2193 let newer_path = dir.path().join("a_newer_file");
2194 let mut newer = File::create(&newer_path).unwrap();
2195 newer.sync_all().unwrap();
2196
2197 writeln!(&mut older, "hello i am the old file").unwrap();
2198 writeln!(&mut older, "i have been around a while").unwrap();
2199 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2200 older.flush().unwrap();
2201
2202 writeln!(&mut newer, "i'm new").unwrap();
2203 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2204 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2205 newer.flush().unwrap();
2206
2207 let received = run_file_source(
2208 &config,
2209 false,
2210 NoAcks,
2211 LogNamespace::Legacy,
2212 None,
2213 sleep_500_millis(),
2214 )
2215 .await;
2216
2217 let received = extract_messages_value(received);
2218
2219 assert_eq!(
2220 received,
2221 vec![
2222 "hello i am the old file".into(),
2223 "i have been around a while".into(),
2224 "you should definitely read all of me first".into(),
2225 "i'm new".into(),
2226 "hopefully you read all the old stuff first".into(),
2227 "because otherwise i'm not going to make sense".into(),
2228 ]
2229 );
2230 }
2231
2232 #[tokio::test]
2233 async fn test_split_reads() {
2234 let dir = tempdir().unwrap();
2235 let config = file::FileConfig {
2236 include: vec![dir.path().join("*")],
2237 max_read_bytes: 1,
2238 ..test_default_file_config(&dir)
2239 };
2240
2241 let path = dir.path().join("file");
2242 let mut file = File::create(&path).unwrap();
2243
2244 writeln!(&mut file, "hello i am a normal line").unwrap();
2245 file.sync_all().unwrap();
2246
2247 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2248 sleep_500_millis().await;
2249
2250 write!(&mut file, "i am not a full line").unwrap();
2251
2252 file.flush().unwrap();
2253 sleep_500_millis().await;
2255
2256 writeln!(&mut file, " until now").unwrap();
2257
2258 file.flush().unwrap();
2259 sleep_500_millis().await;
2260 })
2261 .await;
2262
2263 let received = extract_messages_value(received);
2264
2265 assert_eq!(
2266 received,
2267 vec![
2268 "hello i am a normal line".into(),
2269 "i am not a full line until now".into(),
2270 ]
2271 );
2272 }
2273
2274 #[tokio::test]
2275 async fn test_gzipped_file() {
2276 let dir = tempdir().unwrap();
2277 let config = file::FileConfig {
2278 include: vec![PathBuf::from("tests/data/gzipped.log")],
2279 max_line_bytes: 100,
2286 ..test_default_file_config(&dir)
2287 };
2288
2289 let received = run_file_source(
2290 &config,
2291 false,
2292 NoAcks,
2293 LogNamespace::Legacy,
2294 None,
2295 sleep_500_millis(),
2296 )
2297 .await;
2298
2299 let received = extract_messages_value(received);
2300
2301 assert_eq!(
2302 received,
2303 vec![
2304 "this is a simple file".into(),
2305 "i have been compressed".into(),
2306 "in order to make me smaller".into(),
2307 "but you can still read me".into(),
2308 "hooray".into(),
2309 ]
2310 );
2311 }
2312
2313 #[tokio::test]
2314 async fn test_non_utf8_encoded_file() {
2315 let dir = tempdir().unwrap();
2316 let config = file::FileConfig {
2317 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2318 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2319 ..test_default_file_config(&dir)
2320 };
2321
2322 let received = run_file_source(
2323 &config,
2324 false,
2325 NoAcks,
2326 LogNamespace::Legacy,
2327 None,
2328 sleep_500_millis(),
2329 )
2330 .await;
2331
2332 let received = extract_messages_value(received);
2333
2334 assert_eq!(
2335 received,
2336 vec![
2337 "hello i am a file".into(),
2338 "i can unicode".into(),
2339 "but i do so in 16 bits".into(),
2340 "and when i byte".into(),
2341 "i become little-endian".into(),
2342 ]
2343 );
2344 }
2345
2346 #[tokio::test]
2347 async fn test_non_default_line_delimiter() {
2348 let dir = tempdir().unwrap();
2349 let config = file::FileConfig {
2350 include: vec![dir.path().join("*")],
2351 line_delimiter: "\r\n".to_string(),
2352 ..test_default_file_config(&dir)
2353 };
2354
2355 let path = dir.path().join("file");
2356 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2357 let mut file = File::create(&path).unwrap();
2358
2359 write!(&mut file, "hello i am a line\r\n").unwrap();
2360 write!(&mut file, "and i am too\r\n").unwrap();
2361 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2362 write!(&mut file, "please treat us well\r\n").unwrap();
2363
2364 file.flush().unwrap();
2365 sleep_500_millis().await;
2366 })
2367 .await;
2368
2369 let received = extract_messages_value(received);
2370
2371 assert_eq!(
2372 received,
2373 vec![
2374 "hello i am a line".into(),
2375 "and i am too".into(),
2376 "CRLF is how we end".into(),
2377 "please treat us well".into()
2378 ]
2379 );
2380 }
2381
2382 #[tokio::test]
2386 async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2387 let dir = tempdir().unwrap();
2388 let config = file::FileConfig {
2389 include: vec![dir.path().join("*")],
2390 line_delimiter: "\r\n".to_string(),
2391 ..test_default_file_config(&dir)
2392 };
2393
2394 let path = dir.path().join("file");
2395 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2396 let mut file = File::create(&path).unwrap();
2397
2398 sleep_500_millis().await;
2399
2400 let buffer_size = 8192;
2407
2408 let event1_prefix = "Event 1: ";
2410 let padding1_len = buffer_size - event1_prefix.len() - 1; write!(&mut file, "{}", event1_prefix).unwrap();
2412 file.write_all(&vec![b'X'; padding1_len]).unwrap();
2413 write!(&mut file, "\r\n").unwrap(); let event2_prefix = "Event 2: ";
2417 let padding2_len = buffer_size - event2_prefix.len() - 1;
2418 write!(&mut file, "{}", event2_prefix).unwrap();
2419 file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2420 write!(&mut file, "\r\n").unwrap(); write!(&mut file, "Event 3: Final\r\n").unwrap();
2424
2425 sleep_500_millis().await;
2426 })
2427 .await;
2428
2429 let messages = extract_messages_value(received);
2430
2431 assert_eq!(
2433 messages.len(),
2434 3,
2435 "Should receive exactly 3 separate events (bug would merge them)"
2436 );
2437
2438 let msg0 = messages[0].to_string_lossy();
2440 let msg1 = messages[1].to_string_lossy();
2441 let msg2 = messages[2].to_string_lossy();
2442
2443 assert!(
2444 msg0.starts_with("Event 1: "),
2445 "First event should start with 'Event 1: ', got: {}",
2446 msg0
2447 );
2448 assert!(
2449 msg1.starts_with("Event 2: "),
2450 "Second event should start with 'Event 2: ', got: {}",
2451 msg1
2452 );
2453 assert_eq!(msg2, "Event 3: Final");
2454
2455 for (i, msg) in messages.iter().enumerate() {
2457 let msg_str = msg.to_string_lossy();
2458 assert!(
2459 !msg_str.contains('\r'),
2460 "Event {} should not contain embedded \\r",
2461 i
2462 );
2463 assert!(
2464 !msg_str.contains('\n'),
2465 "Event {} should not contain embedded \\n",
2466 i
2467 );
2468 }
2469 }
2470
2471 #[tokio::test]
2472 async fn remove_file() {
2473 let n = 5;
2474 let remove_after_secs = 1;
2475
2476 let dir = tempdir().unwrap();
2477 let config = file::FileConfig {
2478 include: vec![dir.path().join("*")],
2479 remove_after_secs: Some(remove_after_secs),
2480 ..test_default_file_config(&dir)
2481 };
2482
2483 let path = dir.path().join("file");
2484 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2485 let mut file = File::create(&path).unwrap();
2486
2487 for i in 0..n {
2488 writeln!(&mut file, "{i}").unwrap();
2489 }
2490 file.flush().unwrap();
2491 drop(file);
2492
2493 for _ in 0..10 {
2494 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2496
2497 if File::open(&path).is_err() {
2498 break;
2499 }
2500 }
2501 })
2502 .await;
2503
2504 assert_eq!(received.len(), n);
2505
2506 match File::open(&path) {
2507 Ok(_) => panic!("File wasn't removed"),
2508 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2509 }
2510 }
2511
2512 #[derive(Clone, Copy, Eq, PartialEq)]
2513 enum AckingMode {
2514 NoAcks, Unfinalized, Acks, }
2518 use AckingMode::*;
2519 use vector_lib::lookup::OwnedTargetPath;
2520
2521 async fn run_file_source(
2522 config: &FileConfig,
2523 wait_shutdown: bool,
2524 acking_mode: AckingMode,
2525 log_namespace: LogNamespace,
2526 event_counter: Option<Arc<AtomicUsize>>,
2531 inner: impl Future<Output = ()>,
2532 ) -> Vec<Event> {
2533 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2534 let (tx, rx) = match acking_mode {
2535 Acks => {
2536 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2537 (tx, rx.boxed())
2538 }
2539 Unfinalized => {
2540 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected);
2545 (tx, rx.boxed())
2546 }
2547 NoAcks => {
2548 let (tx, rx) = SourceSender::new_test();
2549 (tx, rx.boxed())
2550 }
2551 };
2552
2553 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2554 let data_dir = config.data_dir.clone().unwrap();
2555 let acks = !matches!(acking_mode, NoAcks);
2556
2557 tokio::spawn(file::file_source(
2558 config,
2559 data_dir,
2560 shutdown,
2561 tx,
2562 acks,
2563 log_namespace,
2564 ));
2565
2566 let result = if let Some(counter) = event_counter {
2567 let (relay_tx, mut relay_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
2570 tokio::spawn(async move {
2571 let mut rx = rx;
2572 while let Some(event) = rx.next().await {
2573 counter.fetch_add(1, Ordering::SeqCst);
2574 let _ = relay_tx.send(event);
2575 }
2576 });
2577
2578 inner.await;
2579 drop(trigger_shutdown);
2580
2581 timeout(Duration::from_secs(5), async move {
2582 let mut events = Vec::new();
2583 while let Some(event) = relay_rx.recv().await {
2584 events.push(event);
2585 }
2586 events
2587 })
2588 .await
2589 .expect("Unclosed channel: may indicate file-server could not shutdown gracefully.")
2590 } else {
2591 inner.await;
2592 drop(trigger_shutdown);
2593
2594 if acking_mode == Unfinalized {
2595 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2596 .collect::<Vec<_>>()
2597 .await
2598 } else {
2599 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2600 .await
2601 .expect(
2602 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2603 )
2604 }
2605 };
2606
2607 if wait_shutdown {
2608 shutdown_done.await;
2609 }
2610
2611 result
2612 })
2613 .await
2614 }
2615
2616 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2617 received
2618 .into_iter()
2619 .map(Event::into_log)
2620 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2621 .collect()
2622 }
2623
2624 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2625 received
2626 .into_iter()
2627 .map(Event::into_log)
2628 .map(|log| log.get_message().unwrap().clone())
2629 .collect()
2630 }
2631}