vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{BytesDeserializer, BytesDeserializerConfig},
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    file_source::{
17        file_server::{FileServer, Line, calculate_ignore_before},
18        paths_provider::{Glob, MatchOptions},
19    },
20    file_source_common::{
21        Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22    },
23    finalizer::OrderedFinalizer,
24    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30    SourceSender,
31    config::{
32        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33        log_schema,
34    },
35    encoding_transcode::{Decoder, Encoder},
36    event::{BatchNotifier, BatchStatus, LogEvent},
37    internal_events::{
38        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39        FileSourceInternalEventsEmitter, StreamClosedError,
40    },
41    line_agg::{self, LineAgg},
42    serde::bool_or_struct,
43    shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48    #[snafu(display(
49        "message_start_indicator {:?} is not a valid regex: {}",
50        indicator,
51        source
52    ))]
53    InvalidMessageStartIndicator {
54        indicator: String,
55        source: regex::Error,
56    },
57}
58
59/// Configuration for the `file` source.
60#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
66    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67    pub include: Vec<PathBuf>,
68
69    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
70    ///
71    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
72    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
73    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
74    #[serde(default)]
75    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76    pub exclude: Vec<PathBuf>,
77
78    /// Overrides the name of the log field used to add the file path to each event.
79    ///
80    /// The value is the full path to the file where the event was read message.
81    ///
82    /// Set to `""` to suppress this key.
83    #[serde(default = "default_file_key")]
84    #[configurable(metadata(docs::examples = "path"))]
85    pub file_key: OptionalValuePath,
86
87    /// Whether or not to start reading from the beginning of a new file.
88    #[configurable(
89        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90    )]
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    pub start_at_beginning: Option<bool>,
94
95    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
96    ///
97    /// Checkpoints are still written normally.
98    #[serde(default)]
99    pub ignore_checkpoints: Option<bool>,
100
101    #[serde(default = "default_read_from")]
102    #[configurable(derived)]
103    pub read_from: ReadFromConfig,
104
105    /// Ignore files with a data modification date older than the specified number of seconds.
106    #[serde(alias = "ignore_older", default)]
107    #[configurable(metadata(docs::type_unit = "seconds"))]
108    #[configurable(metadata(docs::examples = 600))]
109    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110    pub ignore_older_secs: Option<u64>,
111
112    /// The maximum size of a line before it is discarded.
113    ///
114    /// This protects against malformed lines or tailing incorrect files.
115    #[serde(default = "default_max_line_bytes")]
116    #[configurable(metadata(docs::type_unit = "bytes"))]
117    pub max_line_bytes: usize,
118
119    /// Overrides the name of the log field used to add the current hostname to each event.
120    ///
121    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
122    ///
123    /// Set to `""` to suppress this key.
124    ///
125    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
126    #[configurable(metadata(docs::examples = "hostname"))]
127    pub host_key: Option<OptionalValuePath>,
128
129    /// The directory used to persist file checkpoint positions.
130    ///
131    /// By default, the [global `data_dir` option][global_data_dir] is used.
132    /// Make sure the running user has write permissions to this directory.
133    ///
134    /// If this directory is specified, then Vector will attempt to create it.
135    ///
136    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
137    #[serde(default)]
138    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139    #[configurable(metadata(docs::human_name = "Data Directory"))]
140    pub data_dir: Option<PathBuf>,
141
142    /// Enables adding the file offset to each event and sets the name of the log field used.
143    ///
144    /// The value is the byte offset of the start of the line within the file.
145    ///
146    /// Off by default, the offset is only added to the event if this is set.
147    #[serde(default)]
148    #[configurable(metadata(docs::examples = "offset"))]
149    pub offset_key: Option<OptionalValuePath>,
150
151    /// The delay between file discovery calls.
152    ///
153    /// This controls the interval at which files are searched. A higher value results in greater
154    /// chances of some short-lived files being missed between searches, but a lower value increases
155    /// the performance impact of file discovery.
156    #[serde(
157        alias = "glob_minimum_cooldown",
158        default = "default_glob_minimum_cooldown_ms"
159    )]
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::type_unit = "milliseconds"))]
162    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163    pub glob_minimum_cooldown_ms: Duration,
164
165    #[configurable(derived)]
166    #[serde(alias = "fingerprinting", default)]
167    fingerprint: FingerprintConfig,
168
169    /// Ignore missing files when fingerprinting.
170    ///
171    /// This may be useful when used with source directories containing dangling symlinks.
172    #[serde(default)]
173    pub ignore_not_found: bool,
174
175    /// String value used to identify the start of a multi-line message.
176    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177    #[configurable(metadata(docs::hidden))]
178    #[serde(default)]
179    pub message_start_indicator: Option<String>,
180
181    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
182    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183    #[configurable(metadata(docs::hidden))]
184    #[serde(default = "default_multi_line_timeout")]
185    pub multi_line_timeout: u64,
186
187    /// Multiline aggregation configuration.
188    ///
189    /// If not specified, multiline aggregation is disabled.
190    #[configurable(derived)]
191    #[serde(default)]
192    pub multiline: Option<MultilineConfig>,
193
194    /// Max amount of bytes to read from a single file before switching over to the next file.
195    /// **Note:** This does not apply when `oldest_first` is `true`.
196    ///
197    /// This allows distributing the reads more or less evenly across
198    /// the files.
199    #[serde(default = "default_max_read_bytes")]
200    #[configurable(metadata(docs::type_unit = "bytes"))]
201    pub max_read_bytes: usize,
202
203    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
204    #[serde(default)]
205    pub oldest_first: bool,
206
207    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
208    ///
209    /// If not specified, files are not removed.
210    #[serde(alias = "remove_after", default)]
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 0))]
213    #[configurable(metadata(docs::examples = 5))]
214    #[configurable(metadata(docs::examples = 60))]
215    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216    pub remove_after_secs: Option<u64>,
217
218    /// String sequence used to separate one file line from another.
219    #[serde(default = "default_line_delimiter")]
220    #[configurable(metadata(docs::examples = "\r\n"))]
221    pub line_delimiter: String,
222
223    #[configurable(derived)]
224    #[serde(default)]
225    pub encoding: Option<EncodingConfig>,
226
227    #[configurable(derived)]
228    #[serde(default, deserialize_with = "bool_or_struct")]
229    acknowledgements: SourceAcknowledgementsConfig,
230
231    /// The namespace to use for logs. This overrides the global setting.
232    #[configurable(metadata(docs::hidden))]
233    #[serde(default)]
234    log_namespace: Option<bool>,
235
236    #[configurable(derived)]
237    #[serde(default)]
238    internal_metrics: FileInternalMetricsConfig,
239
240    /// How long to keep an open handle to a rotated log file.
241    /// The default value represents "no limit"
242    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243    #[configurable(metadata(docs::type_unit = "seconds"))]
244    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245    pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249    bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253    OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257    ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261    Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265    1000
266} // deprecated
267
268const fn default_max_read_bytes() -> usize {
269    2048
270}
271
272fn default_line_delimiter() -> String {
273    "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277    Duration::from_secs(u64::MAX / 2)
278}
279
280/// Configuration for how files should be identified.
281///
282/// This is important for `checkpointing` when file rotation is used.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290    /// Read lines from the beginning of the file and compute a checksum over them.
291    Checksum {
292        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
293        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
294        /// gzip is supported at this time.
295        ///
296        /// This can be helpful if all files share a common header that should be skipped.
297        #[serde(default = "default_ignored_header_bytes")]
298        #[configurable(metadata(docs::type_unit = "bytes"))]
299        ignored_header_bytes: usize,
300
301        /// The number of lines to read for generating the checksum.
302        ///
303        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
304        /// gzip is supported at this time.
305        ///
306        /// If the file has less than this amount of lines, it won’t be read at all.
307        #[serde(default = "default_lines")]
308        #[configurable(metadata(docs::type_unit = "lines"))]
309        lines: usize,
310    },
311
312    /// Use the [device and inode][inode] as the identifier.
313    ///
314    /// [inode]: https://en.wikipedia.org/wiki/Inode
315    #[serde(rename = "device_and_inode")]
316    DevInode,
317}
318
319impl Default for FingerprintConfig {
320    fn default() -> Self {
321        Self::Checksum {
322            ignored_header_bytes: 0,
323            lines: default_lines(),
324        }
325    }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329    0
330}
331
332const fn default_lines() -> usize {
333    1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337    fn from(config: FingerprintConfig) -> FingerprintStrategy {
338        match config {
339            FingerprintConfig::Checksum {
340                ignored_header_bytes,
341                lines,
342            } => FingerprintStrategy::FirstLinesChecksum {
343                ignored_header_bytes,
344                lines,
345            },
346            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347        }
348    }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353    pub(crate) file_id: FileFingerprint,
354    pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358    fn default() -> Self {
359        Self {
360            include: vec![PathBuf::from("/var/log/**/*.log")],
361            exclude: vec![],
362            file_key: default_file_key(),
363            start_at_beginning: None,
364            ignore_checkpoints: None,
365            read_from: default_read_from(),
366            ignore_older_secs: None,
367            max_line_bytes: default_max_line_bytes(),
368            fingerprint: FingerprintConfig::default(),
369            ignore_not_found: false,
370            host_key: None,
371            offset_key: None,
372            data_dir: None,
373            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374            message_start_indicator: None,
375            multi_line_timeout: default_multi_line_timeout(), // millis
376            multiline: None,
377            max_read_bytes: default_max_read_bytes(),
378            oldest_first: false,
379            remove_after_secs: None,
380            line_delimiter: default_line_delimiter(),
381            encoding: None,
382            acknowledgements: Default::default(),
383            log_namespace: None,
384            internal_metrics: Default::default(),
385            rotate_wait: default_rotate_wait(),
386        }
387    }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396        // add the source name as a subdir, so that multiple sources can
397        // operate within the same given data_dir (e.g. the global one)
398        // without the file servers' checkpointers interfering with each
399        // other
400        let data_dir = cx
401            .globals
402            // source are only global, name can be used for subdir
403            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405        // Clippy rule, because async_trait?
406        #[allow(clippy::suspicious_else_formatting)]
407        {
408            if let Some(ref config) = self.multiline {
409                let _: line_agg::Config = config.try_into()?;
410            }
411
412            if let Some(ref indicator) = self.message_start_indicator {
413                Regex::new(indicator)
414                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415            }
416        }
417
418        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420        let log_namespace = cx.log_namespace(self.log_namespace);
421
422        Ok(file_source(
423            self,
424            data_dir,
425            cx.shutdown,
426            cx.out,
427            acknowledgements,
428            log_namespace,
429        ))
430    }
431
432    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434        let host_key = self
435            .host_key
436            .clone()
437            .unwrap_or(log_schema().host_key().cloned().into())
438            .path
439            .map(LegacyKey::Overwrite);
440
441        let offset_key = self
442            .offset_key
443            .clone()
444            .and_then(|k| k.path)
445            .map(LegacyKey::Overwrite);
446
447        let schema_definition = BytesDeserializerConfig
448            .schema_definition(global_log_namespace.merge(self.log_namespace))
449            .with_standard_vector_source_metadata()
450            .with_source_metadata(
451                Self::NAME,
452                host_key,
453                &owned_value_path!("host"),
454                Kind::bytes().or_undefined(),
455                Some("host"),
456            )
457            .with_source_metadata(
458                Self::NAME,
459                offset_key,
460                &owned_value_path!("offset"),
461                Kind::integer(),
462                None,
463            )
464            .with_source_metadata(
465                Self::NAME,
466                file_key,
467                &owned_value_path!("path"),
468                Kind::bytes(),
469                None,
470            );
471
472        vec![SourceOutput::new_maybe_logs(
473            DataType::Log,
474            schema_definition,
475        )]
476    }
477
478    fn can_acknowledge(&self) -> bool {
479        true
480    }
481}
482
483pub fn file_source(
484    config: &FileConfig,
485    data_dir: PathBuf,
486    shutdown: ShutdownSignal,
487    mut out: SourceSender,
488    acknowledgements: bool,
489    log_namespace: LogNamespace,
490) -> super::Source {
491    // the include option must be specified but also must contain at least one entry.
492    if config.include.is_empty() {
493        error!(
494            message = "`include` configuration option must contain at least one file pattern.",
495            internal_log_rate_limit = false
496        );
497        return Box::pin(future::ready(Err(())));
498    }
499
500    let exclude_patterns = config
501        .exclude
502        .iter()
503        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504        .collect::<Vec<PathBuf>>();
505    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507    let (ignore_checkpoints, read_from) = reconcile_position_options(
508        config.start_at_beginning,
509        config.ignore_checkpoints,
510        Some(config.read_from),
511    );
512
513    let emitter = FileSourceInternalEventsEmitter {
514        include_file_metric_tag: config.internal_metrics.include_file_tag,
515    };
516
517    let paths_provider = Glob::new(
518        &config.include,
519        &exclude_patterns,
520        MatchOptions::default(),
521        emitter.clone(),
522    )
523    .expect("invalid glob patterns");
524
525    let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527    // if file encoding is specified, need to convert the line delimiter (present as utf8)
528    // to the specified encoding, so that delimiter-based line splitting can work properly
529    let line_delimiter_as_bytes = match encoding_charset {
530        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531        None => Bytes::from(config.line_delimiter.clone()),
532    };
533
534    let checkpointer = Checkpointer::new(&data_dir);
535    let strategy = config.fingerprint.clone().into();
536
537    let file_server = FileServer {
538        paths_provider,
539        max_read_bytes: config.max_read_bytes,
540        ignore_checkpoints,
541        read_from,
542        ignore_before,
543        max_line_bytes: config.max_line_bytes,
544        line_delimiter: line_delimiter_as_bytes,
545        data_dir,
546        glob_minimum_cooldown,
547        fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548        oldest_first: config.oldest_first,
549        remove_after: config.remove_after_secs.map(Duration::from_secs),
550        emitter,
551        rotate_wait: config.rotate_wait,
552    };
553
554    let event_metadata = EventMetadata {
555        host_key: config
556            .host_key
557            .clone()
558            .unwrap_or(log_schema().host_key().cloned().into())
559            .path,
560        hostname: crate::get_hostname().ok(),
561        file_key: config.file_key.clone().path,
562        offset_key: config.offset_key.clone().and_then(|k| k.path),
563    };
564
565    let include = config.include.clone();
566    let exclude = config.exclude.clone();
567    let multiline_config = config.multiline.clone();
568    let message_start_indicator = config.message_start_indicator.clone();
569    let multi_line_timeout = config.multi_line_timeout;
570
571    let (finalizer, shutdown_checkpointer) = if acknowledgements {
572        // The shutdown sent in to the finalizer is the global
573        // shutdown handle used to tell it to stop accepting new batch
574        // statuses and just wait for the remaining acks to come in.
575        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577        // We set up a separate shutdown signal to tie together the
578        // finalizer and the checkpoint writer task in the file
579        // server, to make it continue to write out updated
580        // checkpoints until all the acks have come in.
581        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582        let checkpoints = checkpointer.view();
583        tokio::spawn(async move {
584            while let Some((status, entry)) = ack_stream.next().await {
585                if status == BatchStatus::Delivered {
586                    checkpoints.update(entry.file_id, entry.offset);
587                }
588            }
589            send_shutdown.send(())
590        });
591        (Some(finalizer), shutdown2.map(|_| ()).boxed())
592    } else {
593        // When not dealing with end-to-end acknowledgements, just
594        // clone the global shutdown to stop the checkpoint writer.
595        (None, shutdown.clone().map(|_| ()).boxed())
596    };
597
598    let checkpoints = checkpointer.view();
599    let include_file_metric_tag = config.internal_metrics.include_file_tag;
600    Box::pin(async move {
601        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603        let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605        // sizing here is just a guess
606        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607        let rx = rx
608            .map(futures::stream::iter)
609            .flatten()
610            .map(move |mut line| {
611                emit!(FileBytesReceived {
612                    byte_size: line.text.len(),
613                    file: &line.filename,
614                    include_file_metric_tag,
615                });
616                // transcode each line from the file's encoding charset to utf8
617                line.text = match encoding_decoder.as_mut() {
618                    Some(d) => d.decode_to_utf8(line.text),
619                    None => line.text,
620                };
621                line
622            });
623
624        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625            if let Some(ref multiline_config) = multiline_config {
626                wrap_with_line_agg(
627                    rx,
628                    multiline_config.try_into().unwrap(), // validated in build
629                )
630            } else if let Some(msi) = message_start_indicator {
631                wrap_with_line_agg(
632                    rx,
633                    line_agg::Config::for_legacy(
634                        Regex::new(&msi).unwrap(), // validated in build
635                        multi_line_timeout,
636                    ),
637                )
638            } else {
639                Box::new(rx)
640            };
641
642        // Once file server ends this will run until it has finished processing remaining
643        // logs in the queue.
644        let span = Span::current();
645        let mut messages = messages.map(move |line| {
646            let mut event = create_event(
647                line.text,
648                line.start_offset,
649                &line.filename,
650                &event_metadata,
651                log_namespace,
652                include_file_metric_tag,
653            );
654
655            if let Some(finalizer) = &finalizer {
656                let (batch, receiver) = BatchNotifier::new_with_receiver();
657                event = event.with_batch_notifier(&batch);
658                let entry = FinalizerEntry {
659                    file_id: line.file_id,
660                    offset: line.end_offset,
661                };
662                // checkpoints.update will be called from ack_stream's thread
663                finalizer.add(entry, receiver);
664            } else {
665                checkpoints.update(line.file_id, line.end_offset);
666            }
667            event
668        });
669        tokio::spawn(async move {
670            match out
671                .send_event_stream(&mut messages)
672                .instrument(span.or_current())
673                .await
674            {
675                Ok(()) => {
676                    debug!("Finished sending.");
677                }
678                Err(_) => {
679                    let (count, _) = messages.size_hint();
680                    emit!(StreamClosedError { count });
681                }
682            }
683        });
684
685        let span = info_span!("file_server");
686        tokio::task::spawn_blocking(move || {
687            let _enter = span.enter();
688            let rt = tokio::runtime::Handle::current();
689            let result =
690                rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691            emit!(FileOpen { count: 0 });
692            // Panic if we encounter any error originating from the file server.
693            // We're at the `spawn_blocking` call, the panic will be caught and
694            // passed to the `JoinHandle` error, similar to the usual threads.
695            result.expect("file server exited with an error");
696        })
697        .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698        .await
699    })
700}
701
702/// Emit deprecation warning if the old option is used, and take it into account when determining
703/// defaults. Any of the newer options will override it when set directly.
704fn reconcile_position_options(
705    start_at_beginning: Option<bool>,
706    ignore_checkpoints: Option<bool>,
707    read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709    if start_at_beginning.is_some() {
710        warn!(
711            message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712        )
713    }
714
715    match start_at_beginning {
716        Some(true) => (
717            ignore_checkpoints.unwrap_or(true),
718            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719        ),
720        _ => (
721            ignore_checkpoints.unwrap_or(false),
722            read_from.map(Into::into).unwrap_or_default(),
723        ),
724    }
725}
726
727fn wrap_with_line_agg(
728    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729    config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731    let logic = line_agg::Logic::new(config);
732    Box::new(
733        LineAgg::new(
734            rx.map(|line| {
735                (
736                    line.filename,
737                    line.text,
738                    (line.file_id, line.start_offset, line.end_offset),
739                )
740            }),
741            logic,
742        )
743        .map(
744            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745                text,
746                filename,
747                file_id,
748                start_offset,
749                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750                    lastline_end_offset
751                }),
752            },
753        ),
754    )
755}
756
757struct EventMetadata {
758    host_key: Option<OwnedValuePath>,
759    hostname: Option<String>,
760    file_key: Option<OwnedValuePath>,
761    offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765    line: Bytes,
766    offset: u64,
767    file: &str,
768    meta: &EventMetadata,
769    log_namespace: LogNamespace,
770    include_file_metric_tag: bool,
771) -> LogEvent {
772    let deserializer = BytesDeserializer;
773    let mut event = deserializer.parse_single(line, log_namespace);
774
775    log_namespace.insert_vector_metadata(
776        &mut event,
777        log_schema().source_type_key(),
778        path!("source_type"),
779        Bytes::from_static(FileConfig::NAME.as_bytes()),
780    );
781    log_namespace.insert_vector_metadata(
782        &mut event,
783        log_schema().timestamp_key(),
784        path!("ingest_timestamp"),
785        Utc::now(),
786    );
787
788    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
790    if let Some(hostname) = &meta.hostname {
791        log_namespace.insert_source_metadata(
792            FileConfig::NAME,
793            &mut event,
794            legacy_host_key,
795            path!("host"),
796            hostname.clone(),
797        );
798    }
799
800    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801    log_namespace.insert_source_metadata(
802        FileConfig::NAME,
803        &mut event,
804        legacy_offset_key,
805        path!("offset"),
806        offset,
807    );
808
809    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810    log_namespace.insert_source_metadata(
811        FileConfig::NAME,
812        &mut event,
813        legacy_file_key,
814        path!("path"),
815        file,
816    );
817
818    emit!(FileEventsReceived {
819        count: 1,
820        file,
821        byte_size: event.estimated_json_encoded_size_of(),
822        include_file_metric_tag,
823    });
824
825    event
826}
827
828#[cfg(test)]
829mod tests {
830    use std::{
831        collections::HashSet,
832        fs::{self, File},
833        future::Future,
834        io::{Seek, Write},
835        sync::{
836            Arc,
837            atomic::{AtomicUsize, Ordering},
838        },
839    };
840
841    use encoding_rs::UTF_16LE;
842    use similar_asserts::assert_eq;
843    use tempfile::tempdir;
844    use tokio::time::{Duration, sleep, timeout};
845    use vector_lib::schema::Definition;
846    use vrl::{value, value::kind::Collection};
847
848    use super::*;
849    use crate::{
850        config::Config,
851        event::{Event, EventStatus, Value},
852        shutdown::ShutdownSignal,
853        sources::file,
854        test_util::{
855            components::{FILE_SOURCE_TAGS, assert_source_compliance},
856            wait_for_atomic_usize_timeout_ms,
857        },
858    };
859
860    #[test]
861    fn generate_config() {
862        crate::test_util::test_generate_config::<FileConfig>();
863    }
864
865    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
866        // Store checkpoints in a subdirectory so they don't appear in the
867        // glob-watched directory (which covers dir.path()/*).
868        let data_dir = dir.path().join(".data");
869        fs::create_dir_all(&data_dir).unwrap();
870        file::FileConfig {
871            fingerprint: FingerprintConfig::Checksum {
872                ignored_header_bytes: 0,
873                lines: 1,
874            },
875            data_dir: Some(data_dir),
876            glob_minimum_cooldown_ms: Duration::from_millis(100),
877            internal_metrics: FileInternalMetricsConfig {
878                include_file_tag: true,
879            },
880            ..Default::default()
881        }
882    }
883
884    async fn sleep_500_millis() {
885        sleep(Duration::from_millis(500)).await;
886    }
887
888    #[test]
889    fn parse_config() {
890        let config: FileConfig = toml::from_str(
891            r#"
892            include = [ "/var/log/**/*.log" ]
893            file_key = "file"
894            glob_minimum_cooldown_ms = 1000
895            multi_line_timeout = 1000
896            max_read_bytes = 2048
897            line_delimiter = "\n"
898        "#,
899        )
900        .unwrap();
901        assert_eq!(config, FileConfig::default());
902        assert_eq!(
903            config.fingerprint,
904            FingerprintConfig::Checksum {
905                ignored_header_bytes: 0,
906                lines: 1
907            }
908        );
909
910        let config: FileConfig = toml::from_str(
911            r#"
912        include = [ "/var/log/**/*.log" ]
913        [fingerprint]
914        strategy = "device_and_inode"
915        "#,
916        )
917        .unwrap();
918        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
919
920        let config: FileConfig = toml::from_str(
921            r#"
922        include = [ "/var/log/**/*.log" ]
923        [fingerprint]
924        strategy = "checksum"
925        bytes = 128
926        ignored_header_bytes = 512
927        "#,
928        )
929        .unwrap();
930        assert_eq!(
931            config.fingerprint,
932            FingerprintConfig::Checksum {
933                ignored_header_bytes: 512,
934                lines: 1
935            }
936        );
937
938        let config: FileConfig = toml::from_str(
939            r#"
940        include = [ "/var/log/**/*.log" ]
941        [encoding]
942        charset = "utf-16le"
943        "#,
944        )
945        .unwrap();
946        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
947
948        let config: FileConfig = toml::from_str(
949            r#"
950        include = [ "/var/log/**/*.log" ]
951        read_from = "beginning"
952        "#,
953        )
954        .unwrap();
955        assert_eq!(config.read_from, ReadFromConfig::Beginning);
956
957        let config: FileConfig = toml::from_str(
958            r#"
959        include = [ "/var/log/**/*.log" ]
960        read_from = "end"
961        "#,
962        )
963        .unwrap();
964        assert_eq!(config.read_from, ReadFromConfig::End);
965    }
966
967    #[test]
968    fn resolve_data_dir() {
969        let global_dir = tempdir().unwrap();
970        let local_dir = tempdir().unwrap();
971
972        let mut config = Config::default();
973        config.global.data_dir = global_dir.keep().into();
974
975        // local path given -- local should win
976        let local_data_dir = Some(local_dir.path().to_path_buf());
977        let res = config
978            .global
979            .resolve_and_validate_data_dir(local_data_dir.as_ref())
980            .unwrap();
981        assert_eq!(res, local_dir.path());
982
983        // no local path given -- global fallback should be in effect
984        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
985        assert_eq!(res, config.global.data_dir.unwrap());
986    }
987
988    #[test]
989    fn output_schema_definition_vector_namespace() {
990        let definitions = FileConfig::default()
991            .outputs(LogNamespace::Vector)
992            .remove(0)
993            .schema_definition(true);
994
995        assert_eq!(
996            definitions,
997            Some(
998                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
999                    .with_meaning(OwnedTargetPath::event_root(), "message")
1000                    .with_metadata_field(
1001                        &owned_value_path!("vector", "source_type"),
1002                        Kind::bytes(),
1003                        None
1004                    )
1005                    .with_metadata_field(
1006                        &owned_value_path!("vector", "ingest_timestamp"),
1007                        Kind::timestamp(),
1008                        None
1009                    )
1010                    .with_metadata_field(
1011                        &owned_value_path!("file", "host"),
1012                        Kind::bytes().or_undefined(),
1013                        Some("host")
1014                    )
1015                    .with_metadata_field(
1016                        &owned_value_path!("file", "offset"),
1017                        Kind::integer(),
1018                        None
1019                    )
1020                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1021            )
1022        )
1023    }
1024
1025    #[test]
1026    fn output_schema_definition_legacy_namespace() {
1027        let definitions = FileConfig::default()
1028            .outputs(LogNamespace::Legacy)
1029            .remove(0)
1030            .schema_definition(true);
1031
1032        assert_eq!(
1033            definitions,
1034            Some(
1035                Definition::new_with_default_metadata(
1036                    Kind::object(Collection::empty()),
1037                    [LogNamespace::Legacy]
1038                )
1039                .with_event_field(
1040                    &owned_value_path!("message"),
1041                    Kind::bytes(),
1042                    Some("message")
1043                )
1044                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1045                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1046                .with_event_field(
1047                    &owned_value_path!("host"),
1048                    Kind::bytes().or_undefined(),
1049                    Some("host")
1050                )
1051                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1052                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1053            )
1054        )
1055    }
1056
1057    #[test]
1058    fn create_event_legacy_namespace() {
1059        let line = Bytes::from("hello world");
1060        let file = "some_file.rs";
1061        let offset: u64 = 0;
1062
1063        let meta = EventMetadata {
1064            host_key: Some(owned_value_path!("host")),
1065            hostname: Some("Some.Machine".to_string()),
1066            file_key: Some(owned_value_path!("file")),
1067            offset_key: Some(owned_value_path!("offset")),
1068        };
1069        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1070
1071        assert_eq!(log["file"], "some_file.rs".into());
1072        assert_eq!(log["host"], "Some.Machine".into());
1073        assert_eq!(log["offset"], 0.into());
1074        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1075        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1076        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1077    }
1078
1079    #[test]
1080    fn create_event_custom_fields_legacy_namespace() {
1081        let line = Bytes::from("hello world");
1082        let file = "some_file.rs";
1083        let offset: u64 = 0;
1084
1085        let meta = EventMetadata {
1086            host_key: Some(owned_value_path!("hostname")),
1087            hostname: Some("Some.Machine".to_string()),
1088            file_key: Some(owned_value_path!("file_path")),
1089            offset_key: Some(owned_value_path!("off")),
1090        };
1091        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1092
1093        assert_eq!(log["file_path"], "some_file.rs".into());
1094        assert_eq!(log["hostname"], "Some.Machine".into());
1095        assert_eq!(log["off"], 0.into());
1096        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1097        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1098        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1099    }
1100
1101    #[test]
1102    fn create_event_vector_namespace() {
1103        let line = Bytes::from("hello world");
1104        let file = "some_file.rs";
1105        let offset: u64 = 0;
1106
1107        let meta = EventMetadata {
1108            host_key: Some(owned_value_path!("ignored")),
1109            hostname: Some("Some.Machine".to_string()),
1110            file_key: Some(owned_value_path!("ignored")),
1111            offset_key: Some(owned_value_path!("ignored")),
1112        };
1113        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1114
1115        assert_eq!(log.value(), &value!("hello world"));
1116
1117        assert_eq!(
1118            log.metadata()
1119                .value()
1120                .get(path!("vector", "source_type"))
1121                .unwrap(),
1122            &value!("file")
1123        );
1124        assert!(
1125            log.metadata()
1126                .value()
1127                .get(path!("vector", "ingest_timestamp"))
1128                .unwrap()
1129                .is_timestamp()
1130        );
1131
1132        assert_eq!(
1133            log.metadata()
1134                .value()
1135                .get(path!(FileConfig::NAME, "host"))
1136                .unwrap(),
1137            &value!("Some.Machine")
1138        );
1139        assert_eq!(
1140            log.metadata()
1141                .value()
1142                .get(path!(FileConfig::NAME, "offset"))
1143                .unwrap(),
1144            &value!(0)
1145        );
1146        assert_eq!(
1147            log.metadata()
1148                .value()
1149                .get(path!(FileConfig::NAME, "path"))
1150                .unwrap(),
1151            &value!("some_file.rs")
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn file_happy_path() {
1157        let n = 5;
1158
1159        let dir = tempdir().unwrap();
1160        let config = file::FileConfig {
1161            include: vec![dir.path().join("*")],
1162            ..test_default_file_config(&dir)
1163        };
1164
1165        let path1 = dir.path().join("file1");
1166        let path2 = dir.path().join("file2");
1167
1168        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1169            let mut file1 = File::create(&path1).unwrap();
1170            let mut file2 = File::create(&path2).unwrap();
1171
1172            for i in 0..n {
1173                writeln!(&mut file1, "hello {i}").unwrap();
1174                writeln!(&mut file2, "goodbye {i}").unwrap();
1175            }
1176
1177            file1.flush().unwrap();
1178            file2.flush().unwrap();
1179
1180            sleep_500_millis().await;
1181        })
1182        .await;
1183
1184        let mut hello_i = 0;
1185        let mut goodbye_i = 0;
1186
1187        for event in received {
1188            let line =
1189                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1190            if line.starts_with("hello") {
1191                assert_eq!(line, format!("hello {}", hello_i));
1192                assert_eq!(
1193                    event.as_log()["file"].to_string_lossy(),
1194                    path1.to_str().unwrap()
1195                );
1196                hello_i += 1;
1197            } else {
1198                assert_eq!(line, format!("goodbye {}", goodbye_i));
1199                assert_eq!(
1200                    event.as_log()["file"].to_string_lossy(),
1201                    path2.to_str().unwrap()
1202                );
1203                goodbye_i += 1;
1204            }
1205        }
1206        assert_eq!(hello_i, n);
1207        assert_eq!(goodbye_i, n);
1208    }
1209
1210    // https://github.com/vectordotdev/vector/issues/8363
1211    #[tokio::test]
1212    async fn file_read_empty_lines() {
1213        let n = 5;
1214
1215        let dir = tempdir().unwrap();
1216        let config = file::FileConfig {
1217            include: vec![dir.path().join("*")],
1218            ..test_default_file_config(&dir)
1219        };
1220
1221        let path = dir.path().join("file");
1222
1223        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1224            let mut file = File::create(&path).unwrap();
1225
1226            writeln!(&mut file, "line for checkpointing").unwrap();
1227            for _i in 0..n {
1228                writeln!(&mut file).unwrap();
1229            }
1230            file.flush().unwrap();
1231
1232            sleep_500_millis().await;
1233        })
1234        .await;
1235
1236        assert_eq!(received.len(), n + 1);
1237    }
1238
1239    #[tokio::test]
1240    async fn file_truncate() {
1241        let n = 5;
1242
1243        let dir = tempdir().unwrap();
1244        let config = file::FileConfig {
1245            include: vec![dir.path().join("*")],
1246            ..test_default_file_config(&dir)
1247        };
1248        let path = dir.path().join("file");
1249        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1250            let mut file = File::create(&path).unwrap();
1251
1252            for i in 0..n {
1253                writeln!(&mut file, "pretrunc {i}").unwrap();
1254            }
1255
1256            file.flush().unwrap();
1257            sleep_500_millis().await; // The writes must be observed before truncating
1258
1259            file.set_len(0).unwrap();
1260            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1261
1262            file.sync_all().unwrap();
1263            sleep_500_millis().await; // The truncate must be observed before writing again
1264
1265            for i in 0..n {
1266                writeln!(&mut file, "posttrunc {i}").unwrap();
1267            }
1268
1269            file.flush().unwrap();
1270            sleep_500_millis().await;
1271        })
1272        .await;
1273
1274        let mut i = 0;
1275        let mut pre_trunc = true;
1276
1277        for event in received {
1278            assert_eq!(
1279                event.as_log()["file"].to_string_lossy(),
1280                path.to_str().unwrap()
1281            );
1282
1283            let line =
1284                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1285
1286            if pre_trunc {
1287                assert_eq!(line, format!("pretrunc {}", i));
1288            } else {
1289                assert_eq!(line, format!("posttrunc {}", i));
1290            }
1291
1292            i += 1;
1293            if i == n {
1294                i = 0;
1295                pre_trunc = false;
1296            }
1297        }
1298    }
1299
1300    #[tokio::test]
1301    async fn file_rotate() {
1302        let n = 5;
1303
1304        let dir = tempdir().unwrap();
1305        let config = file::FileConfig {
1306            include: vec![dir.path().join("*")],
1307            ..test_default_file_config(&dir)
1308        };
1309
1310        let path = dir.path().join("file");
1311        let archive_path = dir.path().join("file");
1312        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1313            let mut file = File::create(&path).unwrap();
1314
1315            for i in 0..n {
1316                writeln!(&mut file, "prerot {i}").unwrap();
1317            }
1318
1319            file.flush().unwrap();
1320            sleep_500_millis().await; // The writes must be observed before rotating
1321
1322            fs::rename(&path, archive_path).expect("could not rename");
1323            file.sync_all().unwrap();
1324
1325            let mut file = File::create(&path).unwrap();
1326
1327            file.sync_all().unwrap();
1328            sleep_500_millis().await; // The rotation must be observed before writing again
1329
1330            for i in 0..n {
1331                writeln!(&mut file, "postrot {i}").unwrap();
1332            }
1333
1334            file.flush().unwrap();
1335            sleep_500_millis().await;
1336        })
1337        .await;
1338
1339        let mut i = 0;
1340        let mut pre_rot = true;
1341
1342        for event in received {
1343            assert_eq!(
1344                event.as_log()["file"].to_string_lossy(),
1345                path.to_str().unwrap()
1346            );
1347
1348            let line =
1349                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1350
1351            if pre_rot {
1352                assert_eq!(line, format!("prerot {}", i));
1353            } else {
1354                assert_eq!(line, format!("postrot {}", i));
1355            }
1356
1357            i += 1;
1358            if i == n {
1359                i = 0;
1360                pre_rot = false;
1361            }
1362        }
1363    }
1364
1365    #[tokio::test]
1366    async fn file_multiple_paths() {
1367        let n = 5;
1368
1369        let dir = tempdir().unwrap();
1370        let config = file::FileConfig {
1371            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1372            exclude: vec![dir.path().join("a.*.txt")],
1373            ..test_default_file_config(&dir)
1374        };
1375
1376        let path1 = dir.path().join("a.txt");
1377        let path2 = dir.path().join("b.txt");
1378        let path3 = dir.path().join("a.log");
1379        let path4 = dir.path().join("a.ignore.txt");
1380        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1381            let mut file1 = File::create(&path1).unwrap();
1382            let mut file2 = File::create(&path2).unwrap();
1383            let mut file3 = File::create(&path3).unwrap();
1384            let mut file4 = File::create(&path4).unwrap();
1385
1386            for i in 0..n {
1387                writeln!(&mut file1, "1 {i}").unwrap();
1388                writeln!(&mut file2, "2 {i}").unwrap();
1389                writeln!(&mut file3, "3 {i}").unwrap();
1390                writeln!(&mut file4, "4 {i}").unwrap();
1391            }
1392            file1.flush().unwrap();
1393            file2.flush().unwrap();
1394            file3.flush().unwrap();
1395            file4.flush().unwrap();
1396
1397            sleep_500_millis().await;
1398        })
1399        .await;
1400
1401        let mut is = [0; 3];
1402
1403        for event in received {
1404            let line =
1405                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1406            let mut split = line.split(' ');
1407            let file = split.next().unwrap().parse::<usize>().unwrap();
1408            assert_ne!(file, 4);
1409            let i = split.next().unwrap().parse::<usize>().unwrap();
1410
1411            assert_eq!(is[file - 1], i);
1412            is[file - 1] += 1;
1413        }
1414
1415        assert_eq!(is, [n as usize; 3]);
1416    }
1417
1418    #[tokio::test]
1419    async fn file_exclude_paths() {
1420        let n = 5;
1421
1422        let dir = tempdir().unwrap();
1423        let config = file::FileConfig {
1424            include: vec![dir.path().join("a//b/*.log.*")],
1425            exclude: vec![dir.path().join("a//b/test.log.*")],
1426            ..test_default_file_config(&dir)
1427        };
1428
1429        let path1 = dir.path().join("a//b/a.log.1");
1430        let path2 = dir.path().join("a//b/test.log.1");
1431        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1432            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1433            let mut file1 = File::create(&path1).unwrap();
1434            let mut file2 = File::create(&path2).unwrap();
1435
1436            for i in 0..n {
1437                writeln!(&mut file1, "1 {i}").unwrap();
1438                writeln!(&mut file2, "2 {i}").unwrap();
1439            }
1440
1441            file1.flush().unwrap();
1442            file2.flush().unwrap();
1443            sleep_500_millis().await;
1444        })
1445        .await;
1446
1447        let mut is = [0; 1];
1448
1449        for event in received {
1450            let line =
1451                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1452            let mut split = line.split(' ');
1453            let file = split.next().unwrap().parse::<usize>().unwrap();
1454            assert_ne!(file, 4);
1455            let i = split.next().unwrap().parse::<usize>().unwrap();
1456
1457            assert_eq!(is[file - 1], i);
1458            is[file - 1] += 1;
1459        }
1460
1461        assert_eq!(is, [n as usize; 1]);
1462    }
1463
1464    #[tokio::test]
1465    async fn file_key_acknowledged() {
1466        file_key(Acks).await
1467    }
1468
1469    #[tokio::test]
1470    async fn file_key_no_acknowledge() {
1471        file_key(NoAcks).await
1472    }
1473
1474    async fn file_key(acks: AckingMode) {
1475        // Default
1476        {
1477            let dir = tempdir().unwrap();
1478            let config = file::FileConfig {
1479                include: vec![dir.path().join("*")],
1480                ..test_default_file_config(&dir)
1481            };
1482
1483            let path = dir.path().join("file");
1484            let received =
1485                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1486                    let mut file = File::create(&path).unwrap();
1487
1488                    writeln!(&mut file, "hello there").unwrap();
1489                    file.flush().unwrap();
1490
1491                    sleep_500_millis().await;
1492                })
1493                .await;
1494
1495            assert_eq!(received.len(), 1);
1496            assert_eq!(
1497                received[0].as_log()["file"].to_string_lossy(),
1498                path.to_str().unwrap()
1499            );
1500        }
1501
1502        // Custom
1503        {
1504            let dir = tempdir().unwrap();
1505            let config = file::FileConfig {
1506                include: vec![dir.path().join("*")],
1507                file_key: OptionalValuePath::from(owned_value_path!("source")),
1508                ..test_default_file_config(&dir)
1509            };
1510
1511            let path = dir.path().join("file");
1512            let received =
1513                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1514                    let mut file = File::create(&path).unwrap();
1515
1516                    writeln!(&mut file, "hello there").unwrap();
1517                    file.flush().unwrap();
1518
1519                    sleep_500_millis().await;
1520                })
1521                .await;
1522
1523            assert_eq!(received.len(), 1);
1524            assert_eq!(
1525                received[0].as_log()["source"].to_string_lossy(),
1526                path.to_str().unwrap()
1527            );
1528        }
1529
1530        // Hidden
1531        {
1532            let dir = tempdir().unwrap();
1533            let config = file::FileConfig {
1534                include: vec![dir.path().join("*")],
1535                ..test_default_file_config(&dir)
1536            };
1537
1538            let path = dir.path().join("file");
1539            let received =
1540                run_file_source(&config, true, acks, LogNamespace::Legacy, None, async {
1541                    let mut file = File::create(&path).unwrap();
1542
1543                    writeln!(&mut file, "hello there").unwrap();
1544
1545                    file.flush().unwrap();
1546                    sleep_500_millis().await;
1547                })
1548                .await;
1549
1550            assert_eq!(received.len(), 1);
1551            assert_eq!(
1552                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1553                vec![
1554                    default_file_key()
1555                        .path
1556                        .expect("file key to exist")
1557                        .to_string()
1558                        .into(),
1559                    log_schema().host_key().unwrap().to_string().into(),
1560                    log_schema().message_key().unwrap().to_string().into(),
1561                    log_schema().timestamp_key().unwrap().to_string().into(),
1562                    log_schema().source_type_key().unwrap().to_string().into()
1563                ]
1564                .into_iter()
1565                .collect::<HashSet<_>>()
1566            );
1567        }
1568    }
1569
1570    #[tokio::test]
1571    async fn file_start_position_server_restart_acknowledged() {
1572        file_start_position_server_restart(Acks).await
1573    }
1574
1575    #[tokio::test]
1576    async fn file_start_position_server_restart_no_acknowledge() {
1577        file_start_position_server_restart(NoAcks).await
1578    }
1579
1580    async fn file_start_position_server_restart(acking: AckingMode) {
1581        let dir = tempdir().unwrap();
1582        let config = file::FileConfig {
1583            include: vec![dir.path().join("*")],
1584            ..test_default_file_config(&dir)
1585        };
1586
1587        let path = dir.path().join("file");
1588        let mut file = File::create(&path).unwrap();
1589        writeln!(&mut file, "zeroth line").unwrap();
1590        file.flush().unwrap();
1591
1592        // First time server runs it picks up existing lines.
1593        {
1594            let received =
1595                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1596                    sleep_500_millis().await;
1597                    writeln!(&mut file, "first line").unwrap();
1598                    file.flush().unwrap();
1599                    sleep_500_millis().await;
1600                })
1601                .await;
1602
1603            let lines = extract_messages_string(received);
1604            assert_eq!(lines, vec!["zeroth line", "first line"]);
1605        }
1606        // Restart server, read file from checkpoint.
1607        {
1608            let received =
1609                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1610                    sleep_500_millis().await;
1611                    writeln!(&mut file, "second line").unwrap();
1612                    file.flush().unwrap();
1613                    sleep_500_millis().await;
1614                })
1615                .await;
1616
1617            let lines = extract_messages_string(received);
1618            assert_eq!(lines, vec!["second line"]);
1619        }
1620        // Restart server, read files from beginning.
1621        {
1622            let config = file::FileConfig {
1623                include: vec![dir.path().join("*")],
1624                ignore_checkpoints: Some(true),
1625                read_from: ReadFromConfig::Beginning,
1626                ..test_default_file_config(&dir)
1627            };
1628            let received =
1629                run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1630                    sleep_500_millis().await;
1631                    writeln!(&mut file, "third line").unwrap();
1632                    file.flush().unwrap();
1633                    sleep_500_millis().await;
1634                })
1635                .await;
1636
1637            let lines = extract_messages_string(received);
1638            assert_eq!(
1639                lines,
1640                vec!["zeroth line", "first line", "second line", "third line"]
1641            );
1642        }
1643    }
1644
1645    #[tokio::test]
1646    async fn file_start_position_server_restart_unfinalized() {
1647        let dir = tempdir().unwrap();
1648        let config = file::FileConfig {
1649            include: vec![dir.path().join("*")],
1650            ..test_default_file_config(&dir)
1651        };
1652
1653        let path = dir.path().join("file");
1654        let mut file = File::create(&path).unwrap();
1655        writeln!(&mut file, "the line").unwrap();
1656        file.flush().unwrap();
1657
1658        // First time server runs it picks up existing lines.
1659        let received = run_file_source(
1660            &config,
1661            false,
1662            Unfinalized,
1663            LogNamespace::Legacy,
1664            None,
1665            sleep(Duration::from_secs(5)),
1666        )
1667        .await;
1668        let lines = extract_messages_string(received);
1669        assert_eq!(lines, vec!["the line"]);
1670
1671        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1672        let received = run_file_source(
1673            &config,
1674            false,
1675            Unfinalized,
1676            LogNamespace::Legacy,
1677            None,
1678            sleep(Duration::from_secs(5)),
1679        )
1680        .await;
1681        let lines = extract_messages_string(received);
1682        assert_eq!(lines, vec!["the line"]);
1683    }
1684
1685    #[tokio::test]
1686    async fn file_duplicate_processing_after_restart() {
1687        let dir = tempdir().unwrap();
1688        let config = file::FileConfig {
1689            include: vec![dir.path().join("*")],
1690            ..test_default_file_config(&dir)
1691        };
1692
1693        let path = dir.path().join("file");
1694        let mut file = File::create(&path).unwrap();
1695
1696        let line_count = 4000;
1697        for i in 0..line_count {
1698            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1699        }
1700        file.flush().unwrap();
1701
1702        // First time server runs it should pick up a bunch of lines
1703        let received = run_file_source(
1704            &config,
1705            true,
1706            Acks,
1707            LogNamespace::Legacy,
1708            None,
1709            // shutdown signal is sent after this duration
1710            sleep_500_millis(),
1711        )
1712        .await;
1713        let lines = extract_messages_string(received);
1714
1715        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1716        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1717        assert!(lines.len() < line_count);
1718
1719        // Restart the server, and it should read the rest without duplicating any.
1720        // Use the event counter to drain rx continuously (removing backpressure so
1721        // the file server can read all remaining lines without being stalled), then
1722        // trigger shutdown once all expected events have been received.
1723        let remaining = line_count - lines.len();
1724        let event_count = Arc::new(AtomicUsize::new(0));
1725        let received = run_file_source(
1726            &config,
1727            true,
1728            Acks,
1729            LogNamespace::Legacy,
1730            Some(Arc::clone(&event_count)),
1731            async {
1732                wait_for_atomic_usize_timeout_ms(
1733                    Arc::clone(&event_count),
1734                    |n| n >= remaining,
1735                    5_000,
1736                )
1737                .await;
1738            },
1739        )
1740        .await;
1741        let lines2 = extract_messages_string(received);
1742
1743        // Between both runs, we should have the expected number of lines
1744        assert_eq!(lines.len() + lines2.len(), line_count);
1745    }
1746
1747    #[tokio::test]
1748    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1749        file_start_position_server_restart_with_file_rotation(Acks).await
1750    }
1751
1752    #[tokio::test]
1753    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1754        file_start_position_server_restart_with_file_rotation(NoAcks).await
1755    }
1756
1757    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1758        let dir = tempdir().unwrap();
1759        let config = file::FileConfig {
1760            include: vec![dir.path().join("*")],
1761            ..test_default_file_config(&dir)
1762        };
1763
1764        let path = dir.path().join("file");
1765        let path_for_old_file = dir.path().join("file.old");
1766        // Run server first time, collect some lines.
1767        {
1768            let received =
1769                run_file_source(&config, true, acking, LogNamespace::Legacy, None, async {
1770                    let mut file = File::create(&path).unwrap();
1771                    writeln!(&mut file, "first line").unwrap();
1772                    file.flush().unwrap();
1773                    sleep_500_millis().await;
1774                })
1775                .await;
1776
1777            let lines = extract_messages_string(received);
1778            assert_eq!(lines, vec!["first line"]);
1779        }
1780        // Perform 'file rotation' to archive old lines.
1781        fs::rename(&path, &path_for_old_file).expect("could not rename");
1782        // Restart the server and make sure it does not re-read the old file
1783        // even though it has a new name.
1784        {
1785            let received =
1786                run_file_source(&config, false, acking, LogNamespace::Legacy, None, async {
1787                    let mut file = File::create(&path).unwrap();
1788                    writeln!(&mut file, "second line").unwrap();
1789                    file.flush().unwrap();
1790                    sleep_500_millis().await;
1791                })
1792                .await;
1793
1794            let lines = extract_messages_string(received);
1795            assert_eq!(lines, vec!["second line"]);
1796        }
1797    }
1798
1799    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1800    #[tokio::test]
1801    async fn file_start_position_ignore_old_files() {
1802        use std::{
1803            os::unix::io::AsRawFd,
1804            time::{Duration, SystemTime},
1805        };
1806
1807        let dir = tempdir().unwrap();
1808        let config = file::FileConfig {
1809            include: vec![dir.path().join("*")],
1810            ignore_older_secs: Some(5),
1811            ..test_default_file_config(&dir)
1812        };
1813
1814        let before_path = dir.path().join("before");
1815        let mut before_file = File::create(&before_path).unwrap();
1816        let after_path = dir.path().join("after");
1817        let mut after_file = File::create(&after_path).unwrap();
1818
1819        writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1820        writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1821
1822        {
1823            // Set the modified times
1824            let before = SystemTime::now() - Duration::from_secs(8);
1825            let after = SystemTime::now() - Duration::from_secs(2);
1826
1827            let before_time = libc::timeval {
1828                tv_sec: before
1829                    .duration_since(SystemTime::UNIX_EPOCH)
1830                    .unwrap()
1831                    .as_secs() as _,
1832                tv_usec: 0,
1833            };
1834            let before_times = [before_time, before_time];
1835
1836            let after_time = libc::timeval {
1837                tv_sec: after
1838                    .duration_since(SystemTime::UNIX_EPOCH)
1839                    .unwrap()
1840                    .as_secs() as _,
1841                tv_usec: 0,
1842            };
1843            let after_times = [after_time, after_time];
1844
1845            unsafe {
1846                libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1847                libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1848            }
1849        }
1850
1851        before_file.sync_all().unwrap();
1852        after_file.sync_all().unwrap();
1853
1854        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1855            sleep_500_millis().await;
1856            writeln!(&mut before_file, "second line").unwrap();
1857            writeln!(&mut after_file, "_second line").unwrap();
1858
1859            before_file.flush().unwrap();
1860            after_file.flush().unwrap();
1861            sleep_500_millis().await;
1862        })
1863        .await;
1864
1865        let before_lines = received
1866            .iter()
1867            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1868            .map(|event| {
1869                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1870            })
1871            .collect::<Vec<_>>();
1872        let after_lines = received
1873            .iter()
1874            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1875            .map(|event| {
1876                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1877            })
1878            .collect::<Vec<_>>();
1879        assert_eq!(before_lines, vec!["second line"]);
1880        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1881    }
1882
1883    #[tokio::test]
1884    async fn file_max_line_bytes() {
1885        let dir = tempdir().unwrap();
1886        let config = file::FileConfig {
1887            include: vec![dir.path().join("*")],
1888            max_line_bytes: 10,
1889            ..test_default_file_config(&dir)
1890        };
1891
1892        let path = dir.path().join("file");
1893        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
1894            let mut file = File::create(&path).unwrap();
1895
1896            writeln!(&mut file, "short").unwrap();
1897            writeln!(&mut file, "this is too long").unwrap();
1898            writeln!(&mut file, "11 eleven11").unwrap();
1899            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1900            writeln!(&mut file, "{super_long}").unwrap();
1901            writeln!(&mut file, "exactly 10").unwrap();
1902            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1903
1904            file.flush().unwrap();
1905            sleep_500_millis().await;
1906            sleep_500_millis().await;
1907
1908            writeln!(&mut file, "and then continue").unwrap();
1909            writeln!(&mut file, "last short").unwrap();
1910            file.flush().unwrap();
1911
1912            sleep_500_millis().await;
1913            sleep_500_millis().await;
1914        }).await;
1915
1916        let received = extract_messages_value(received);
1917
1918        assert_eq!(
1919            received,
1920            vec!["short".into(), "exactly 10".into(), "last short".into()]
1921        );
1922    }
1923
1924    #[tokio::test]
1925    async fn test_multi_line_aggregation_legacy() {
1926        let dir = tempdir().unwrap();
1927        let config = file::FileConfig {
1928            include: vec![dir.path().join("*")],
1929            message_start_indicator: Some("INFO".into()),
1930            multi_line_timeout: 25,
1931            ..test_default_file_config(&dir)
1932        };
1933
1934        let path = dir.path().join("file");
1935        let event_count = Arc::new(AtomicUsize::new(0));
1936        let received = run_file_source(
1937            &config,
1938            false,
1939            NoAcks,
1940            LogNamespace::Legacy,
1941            Some(Arc::clone(&event_count)),
1942            async {
1943                let mut file = File::create(&path).unwrap();
1944
1945                // Write all lines through the second "INFO hello". Events 1-4
1946                // are emitted immediately by EndExclude; event 5 ("INFO hello"
1947                // standalone) requires the 25ms timeout to fire.
1948                writeln!(&mut file, "leftover foo").unwrap();
1949                writeln!(&mut file, "INFO hello").unwrap();
1950                writeln!(&mut file, "INFO goodbye").unwrap();
1951                writeln!(&mut file, "part of goodbye").unwrap();
1952                writeln!(&mut file, "INFO hi again").unwrap();
1953                writeln!(&mut file, "and some more").unwrap();
1954                writeln!(&mut file, "INFO hello").unwrap();
1955                file.flush().unwrap();
1956
1957                // Block until event 5 is observed: the timeout fired and
1958                // "INFO hello" was emitted before we write "too slow".
1959                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
1960
1961                writeln!(&mut file, "too slow").unwrap();
1962                writeln!(&mut file, "INFO doesn't have").unwrap();
1963                writeln!(&mut file, "to be INFO in").unwrap();
1964                writeln!(&mut file, "the middle").unwrap();
1965                file.flush().unwrap();
1966
1967                // Wait for events 6 ("too slow") and 7 ("INFO doesn't have")
1968                // before triggering shutdown.
1969                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
1970            },
1971        )
1972        .await;
1973
1974        let received = extract_messages_value(received);
1975
1976        assert_eq!(
1977            received,
1978            vec![
1979                "leftover foo".into(),
1980                "INFO hello".into(),
1981                "INFO goodbye\npart of goodbye".into(),
1982                "INFO hi again\nand some more".into(),
1983                "INFO hello".into(),
1984                "too slow".into(),
1985                "INFO doesn't have".into(),
1986                "to be INFO in\nthe middle".into(),
1987            ]
1988        );
1989    }
1990
1991    #[tokio::test]
1992    async fn test_multi_line_aggregation() {
1993        let dir = tempdir().unwrap();
1994        let config = file::FileConfig {
1995            include: vec![dir.path().join("*")],
1996            multiline: Some(MultilineConfig {
1997                start_pattern: "INFO".to_owned(),
1998                condition_pattern: "INFO".to_owned(),
1999                mode: line_agg::Mode::HaltBefore,
2000                timeout_ms: Duration::from_millis(25),
2001            }),
2002            ..test_default_file_config(&dir)
2003        };
2004
2005        let path = dir.path().join("file");
2006        let event_count = Arc::new(AtomicUsize::new(0));
2007        let received = run_file_source(
2008            &config,
2009            false,
2010            NoAcks,
2011            LogNamespace::Legacy,
2012            Some(Arc::clone(&event_count)),
2013            async {
2014                let mut file = File::create(&path).unwrap();
2015
2016                // Write all lines through the second "INFO hello". Events 1-4
2017                // are emitted immediately by EndExclude; event 5 ("INFO hello"
2018                // standalone) requires the 25ms timeout to fire.
2019                writeln!(&mut file, "leftover foo").unwrap();
2020                writeln!(&mut file, "INFO hello").unwrap();
2021                writeln!(&mut file, "INFO goodbye").unwrap();
2022                writeln!(&mut file, "part of goodbye").unwrap();
2023                writeln!(&mut file, "INFO hi again").unwrap();
2024                writeln!(&mut file, "and some more").unwrap();
2025                writeln!(&mut file, "INFO hello").unwrap();
2026                file.flush().unwrap();
2027
2028                // Block until event 5 is observed: the timeout fired and
2029                // "INFO hello" was emitted before we write "too slow".
2030                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 5, 500).await;
2031
2032                writeln!(&mut file, "too slow").unwrap();
2033                writeln!(&mut file, "INFO doesn't have").unwrap();
2034                writeln!(&mut file, "to be INFO in").unwrap();
2035                writeln!(&mut file, "the middle").unwrap();
2036                file.flush().unwrap();
2037
2038                // Wait for events 6 ("too slow") and 7 ("INFO doesn't have")
2039                // before triggering shutdown.
2040                wait_for_atomic_usize_timeout_ms(Arc::clone(&event_count), |n| n >= 7, 500).await;
2041            },
2042        )
2043        .await;
2044
2045        let received = extract_messages_value(received);
2046
2047        assert_eq!(
2048            received,
2049            vec![
2050                "leftover foo".into(),
2051                "INFO hello".into(),
2052                "INFO goodbye\npart of goodbye".into(),
2053                "INFO hi again\nand some more".into(),
2054                "INFO hello".into(),
2055                "too slow".into(),
2056                "INFO doesn't have".into(),
2057                "to be INFO in\nthe middle".into(),
2058            ]
2059        );
2060    }
2061
2062    #[tokio::test]
2063    async fn test_multi_line_checkpointing() {
2064        let dir = tempdir().unwrap();
2065        let config = file::FileConfig {
2066            include: vec![dir.path().join("*")],
2067            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2068            multiline: Some(MultilineConfig {
2069                start_pattern: "INFO".to_owned(),
2070                condition_pattern: "INFO".to_owned(),
2071                mode: line_agg::Mode::HaltBefore,
2072                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2073            }),
2074            ..test_default_file_config(&dir)
2075        };
2076
2077        let path = dir.path().join("file");
2078        let mut file = File::create(&path).unwrap();
2079
2080        writeln!(&mut file, "INFO hello").unwrap();
2081        writeln!(&mut file, "part of hello").unwrap();
2082
2083        file.sync_all().unwrap();
2084
2085        // Read and aggregate existing lines. wait_shutdown=true ensures the
2086        // checkpoint is fully written to disk before the second run reads it.
2087        let received = run_file_source(
2088            &config,
2089            true,
2090            Acks,
2091            LogNamespace::Legacy,
2092            None,
2093            sleep_500_millis(),
2094        )
2095        .await;
2096
2097        assert_eq!(received[0].as_log()["offset"], 0.into());
2098
2099        let lines = extract_messages_string(received);
2100        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2101
2102        // After restart, we should not see any part of the previously aggregated lines
2103        let received_after_restart =
2104            run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2105                writeln!(&mut file, "INFO goodbye").unwrap();
2106                file.flush().unwrap();
2107                sleep_500_millis().await;
2108            })
2109            .await;
2110        assert_eq!(
2111            received_after_restart[0].as_log()["offset"],
2112            (lines[0].len() + 1).into()
2113        );
2114        let lines = extract_messages_string(received_after_restart);
2115        assert_eq!(lines, vec!["INFO goodbye"]);
2116    }
2117
2118    #[tokio::test]
2119    async fn test_fair_reads() {
2120        let dir = tempdir().unwrap();
2121        let config = file::FileConfig {
2122            include: vec![dir.path().join("*")],
2123            max_read_bytes: 1,
2124            oldest_first: false,
2125            ..test_default_file_config(&dir)
2126        };
2127
2128        let older_path = dir.path().join("z_older_file");
2129        let mut older = File::create(&older_path).unwrap();
2130
2131        writeln!(&mut older, "hello i am the old file").unwrap();
2132        writeln!(&mut older, "i have been around a while").unwrap();
2133        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2134        older.sync_all().unwrap();
2135
2136        let newer_path = dir.path().join("a_newer_file");
2137        let mut newer = File::create(&newer_path).unwrap();
2138
2139        writeln!(&mut newer, "and i am the new file").unwrap();
2140        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2141        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2142        newer.sync_all().unwrap();
2143
2144        let received = run_file_source(
2145            &config,
2146            false,
2147            NoAcks,
2148            LogNamespace::Legacy,
2149            None,
2150            sleep_500_millis(),
2151        )
2152        .await;
2153
2154        let received = extract_messages_value(received);
2155
2156        let old_first = vec![
2157            "hello i am the old file".into(),
2158            "and i am the new file".into(),
2159            "i have been around a while".into(),
2160            "this should be interleaved with the old one".into(),
2161            "you can read newer files at the same time".into(),
2162            "which is fine because we want fairness".into(),
2163        ];
2164        let new_first: Vec<_> = old_first
2165            .chunks(2)
2166            .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2167            .collect();
2168
2169        if received[0] == old_first[0] {
2170            assert_eq!(received, old_first);
2171        } else {
2172            assert_eq!(received, new_first);
2173        }
2174    }
2175
2176    #[tokio::test]
2177    async fn test_oldest_first() {
2178        let dir = tempdir().unwrap();
2179        let config = file::FileConfig {
2180            include: vec![dir.path().join("*")],
2181            max_read_bytes: 1,
2182            oldest_first: true,
2183            ..test_default_file_config(&dir)
2184        };
2185
2186        let older_path = dir.path().join("z_older_file");
2187        let mut older = File::create(&older_path).unwrap();
2188        older.sync_all().unwrap();
2189
2190        // Sleep to ensure the creation timestamps are different
2191        sleep_500_millis().await;
2192
2193        let newer_path = dir.path().join("a_newer_file");
2194        let mut newer = File::create(&newer_path).unwrap();
2195        newer.sync_all().unwrap();
2196
2197        writeln!(&mut older, "hello i am the old file").unwrap();
2198        writeln!(&mut older, "i have been around a while").unwrap();
2199        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2200        older.flush().unwrap();
2201
2202        writeln!(&mut newer, "i'm new").unwrap();
2203        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2204        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2205        newer.flush().unwrap();
2206
2207        let received = run_file_source(
2208            &config,
2209            false,
2210            NoAcks,
2211            LogNamespace::Legacy,
2212            None,
2213            sleep_500_millis(),
2214        )
2215        .await;
2216
2217        let received = extract_messages_value(received);
2218
2219        assert_eq!(
2220            received,
2221            vec![
2222                "hello i am the old file".into(),
2223                "i have been around a while".into(),
2224                "you should definitely read all of me first".into(),
2225                "i'm new".into(),
2226                "hopefully you read all the old stuff first".into(),
2227                "because otherwise i'm not going to make sense".into(),
2228            ]
2229        );
2230    }
2231
2232    #[tokio::test]
2233    async fn test_split_reads() {
2234        let dir = tempdir().unwrap();
2235        let config = file::FileConfig {
2236            include: vec![dir.path().join("*")],
2237            max_read_bytes: 1,
2238            ..test_default_file_config(&dir)
2239        };
2240
2241        let path = dir.path().join("file");
2242        let mut file = File::create(&path).unwrap();
2243
2244        writeln!(&mut file, "hello i am a normal line").unwrap();
2245        file.sync_all().unwrap();
2246
2247        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2248            sleep_500_millis().await;
2249
2250            write!(&mut file, "i am not a full line").unwrap();
2251
2252            file.flush().unwrap();
2253            // Longer than the EOF timeout
2254            sleep_500_millis().await;
2255
2256            writeln!(&mut file, " until now").unwrap();
2257
2258            file.flush().unwrap();
2259            sleep_500_millis().await;
2260        })
2261        .await;
2262
2263        let received = extract_messages_value(received);
2264
2265        assert_eq!(
2266            received,
2267            vec![
2268                "hello i am a normal line".into(),
2269                "i am not a full line until now".into(),
2270            ]
2271        );
2272    }
2273
2274    #[tokio::test]
2275    async fn test_gzipped_file() {
2276        let dir = tempdir().unwrap();
2277        let config = file::FileConfig {
2278            include: vec![PathBuf::from("tests/data/gzipped.log")],
2279            // TODO: remove this once files are fingerprinted after decompression
2280            //
2281            // Currently, this needs to be smaller than the total size of the compressed file
2282            // because the fingerprinter tries to read until a newline, which it's not going to see
2283            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2284            // can't return a fingerprint because the value would change once more data is written.
2285            max_line_bytes: 100,
2286            ..test_default_file_config(&dir)
2287        };
2288
2289        let received = run_file_source(
2290            &config,
2291            false,
2292            NoAcks,
2293            LogNamespace::Legacy,
2294            None,
2295            sleep_500_millis(),
2296        )
2297        .await;
2298
2299        let received = extract_messages_value(received);
2300
2301        assert_eq!(
2302            received,
2303            vec![
2304                "this is a simple file".into(),
2305                "i have been compressed".into(),
2306                "in order to make me smaller".into(),
2307                "but you can still read me".into(),
2308                "hooray".into(),
2309            ]
2310        );
2311    }
2312
2313    #[tokio::test]
2314    async fn test_non_utf8_encoded_file() {
2315        let dir = tempdir().unwrap();
2316        let config = file::FileConfig {
2317            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2318            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2319            ..test_default_file_config(&dir)
2320        };
2321
2322        let received = run_file_source(
2323            &config,
2324            false,
2325            NoAcks,
2326            LogNamespace::Legacy,
2327            None,
2328            sleep_500_millis(),
2329        )
2330        .await;
2331
2332        let received = extract_messages_value(received);
2333
2334        assert_eq!(
2335            received,
2336            vec![
2337                "hello i am a file".into(),
2338                "i can unicode".into(),
2339                "but i do so in 16 bits".into(),
2340                "and when i byte".into(),
2341                "i become little-endian".into(),
2342            ]
2343        );
2344    }
2345
2346    #[tokio::test]
2347    async fn test_non_default_line_delimiter() {
2348        let dir = tempdir().unwrap();
2349        let config = file::FileConfig {
2350            include: vec![dir.path().join("*")],
2351            line_delimiter: "\r\n".to_string(),
2352            ..test_default_file_config(&dir)
2353        };
2354
2355        let path = dir.path().join("file");
2356        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2357            let mut file = File::create(&path).unwrap();
2358
2359            write!(&mut file, "hello i am a line\r\n").unwrap();
2360            write!(&mut file, "and i am too\r\n").unwrap();
2361            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2362            write!(&mut file, "please treat us well\r\n").unwrap();
2363
2364            file.flush().unwrap();
2365            sleep_500_millis().await;
2366        })
2367        .await;
2368
2369        let received = extract_messages_value(received);
2370
2371        assert_eq!(
2372            received,
2373            vec![
2374                "hello i am a line".into(),
2375                "and i am too".into(),
2376                "CRLF is how we end".into(),
2377                "please treat us well".into()
2378            ]
2379        );
2380    }
2381
2382    // Regression test for https://github.com/vectordotdev/vector/issues/24027
2383    // Tests that multi-character delimiters (like \r\n) are correctly handled when
2384    // split across buffer boundaries. Without the fix, events would be merged together.
2385    #[tokio::test]
2386    async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2387        let dir = tempdir().unwrap();
2388        let config = file::FileConfig {
2389            include: vec![dir.path().join("*")],
2390            line_delimiter: "\r\n".to_string(),
2391            ..test_default_file_config(&dir)
2392        };
2393
2394        let path = dir.path().join("file");
2395        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, None, async {
2396            let mut file = File::create(&path).unwrap();
2397
2398            sleep_500_millis().await;
2399
2400            // Create data where \r\n is split at 8KB buffer boundary
2401            // This reproduces the exact scenario that caused data corruption:
2402            // - Event 1 ends with \r at byte 8191
2403            // - The \n appears at byte 8192 (right at the buffer boundary)
2404            // - Without the fix, Event 1 and Event 2 would be merged
2405
2406            let buffer_size = 8192;
2407
2408            // Event 1: Position \r\n to split at first boundary
2409            let event1_prefix = "Event 1: ";
2410            let padding1_len = buffer_size - event1_prefix.len() - 1; // -1 for the \r
2411            write!(&mut file, "{}", event1_prefix).unwrap();
2412            file.write_all(&vec![b'X'; padding1_len]).unwrap();
2413            write!(&mut file, "\r\n").unwrap(); // \r at byte 8191, \n at byte 8192
2414
2415            // Event 2: Position \r\n to split at second boundary
2416            let event2_prefix = "Event 2: ";
2417            let padding2_len = buffer_size - event2_prefix.len() - 1;
2418            write!(&mut file, "{}", event2_prefix).unwrap();
2419            file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2420            write!(&mut file, "\r\n").unwrap(); // \r at byte 16383, \n at byte 16384
2421
2422            // Event 3: Normal line without boundary split
2423            write!(&mut file, "Event 3: Final\r\n").unwrap();
2424
2425            sleep_500_millis().await;
2426        })
2427        .await;
2428
2429        let messages = extract_messages_value(received);
2430
2431        // The bug would cause Events 1 and 2 to be merged into a single message
2432        assert_eq!(
2433            messages.len(),
2434            3,
2435            "Should receive exactly 3 separate events (bug would merge them)"
2436        );
2437
2438        // Verify each event is correctly separated and starts with expected prefix
2439        let msg0 = messages[0].to_string_lossy();
2440        let msg1 = messages[1].to_string_lossy();
2441        let msg2 = messages[2].to_string_lossy();
2442
2443        assert!(
2444            msg0.starts_with("Event 1: "),
2445            "First event should start with 'Event 1: ', got: {}",
2446            msg0
2447        );
2448        assert!(
2449            msg1.starts_with("Event 2: "),
2450            "Second event should start with 'Event 2: ', got: {}",
2451            msg1
2452        );
2453        assert_eq!(msg2, "Event 3: Final");
2454
2455        // Ensure no event contains embedded CR/LF (sign of incorrect merging)
2456        for (i, msg) in messages.iter().enumerate() {
2457            let msg_str = msg.to_string_lossy();
2458            assert!(
2459                !msg_str.contains('\r'),
2460                "Event {} should not contain embedded \\r",
2461                i
2462            );
2463            assert!(
2464                !msg_str.contains('\n'),
2465                "Event {} should not contain embedded \\n",
2466                i
2467            );
2468        }
2469    }
2470
2471    #[tokio::test]
2472    async fn remove_file() {
2473        let n = 5;
2474        let remove_after_secs = 1;
2475
2476        let dir = tempdir().unwrap();
2477        let config = file::FileConfig {
2478            include: vec![dir.path().join("*")],
2479            remove_after_secs: Some(remove_after_secs),
2480            ..test_default_file_config(&dir)
2481        };
2482
2483        let path = dir.path().join("file");
2484        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, None, async {
2485            let mut file = File::create(&path).unwrap();
2486
2487            for i in 0..n {
2488                writeln!(&mut file, "{i}").unwrap();
2489            }
2490            file.flush().unwrap();
2491            drop(file);
2492
2493            for _ in 0..10 {
2494                // Wait for remove grace period to end.
2495                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2496
2497                if File::open(&path).is_err() {
2498                    break;
2499                }
2500            }
2501        })
2502        .await;
2503
2504        assert_eq!(received.len(), n);
2505
2506        match File::open(&path) {
2507            Ok(_) => panic!("File wasn't removed"),
2508            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2509        }
2510    }
2511
2512    #[derive(Clone, Copy, Eq, PartialEq)]
2513    enum AckingMode {
2514        NoAcks,      // No acknowledgement handling and no finalization
2515        Unfinalized, // Acknowledgement handling but no finalization
2516        Acks,        // Full acknowledgements and proper finalization
2517    }
2518    use AckingMode::*;
2519    use vector_lib::lookup::OwnedTargetPath;
2520
2521    async fn run_file_source(
2522        config: &FileConfig,
2523        wait_shutdown: bool,
2524        acking_mode: AckingMode,
2525        log_namespace: LogNamespace,
2526        // When `Some`, events are relayed through an unbounded channel and the
2527        // counter is incremented for each event received.  The inner future can
2528        // call `wait_for_atomic_usize` on this counter to gate writes on
2529        // observed events instead of relying on wall-clock sleeps.
2530        event_counter: Option<Arc<AtomicUsize>>,
2531        inner: impl Future<Output = ()>,
2532    ) -> Vec<Event> {
2533        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2534            let (tx, rx) = match acking_mode {
2535                Acks => {
2536                    let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2537                    (tx, rx.boxed())
2538                }
2539                Unfinalized => {
2540                    // Use Rejected so that events are finalized but checkpoints
2541                    // are NOT updated (only Delivered triggers checkpoint updates).
2542                    // This avoids a race where the default Delivered status on drop
2543                    // could leak checkpoint writes into the next run.
2544                    let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected);
2545                    (tx, rx.boxed())
2546                }
2547                NoAcks => {
2548                    let (tx, rx) = SourceSender::new_test();
2549                    (tx, rx.boxed())
2550                }
2551            };
2552
2553            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2554            let data_dir = config.data_dir.clone().unwrap();
2555            let acks = !matches!(acking_mode, NoAcks);
2556
2557            tokio::spawn(file::file_source(
2558                config,
2559                data_dir,
2560                shutdown,
2561                tx,
2562                acks,
2563                log_namespace,
2564            ));
2565
2566            let result = if let Some(counter) = event_counter {
2567                // Relay mode: a background task forwards events and increments
2568                // the counter so `inner` can observe them without arbitrary sleeps.
2569                let (relay_tx, mut relay_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
2570                tokio::spawn(async move {
2571                    let mut rx = rx;
2572                    while let Some(event) = rx.next().await {
2573                        counter.fetch_add(1, Ordering::SeqCst);
2574                        let _ = relay_tx.send(event);
2575                    }
2576                });
2577
2578                inner.await;
2579                drop(trigger_shutdown);
2580
2581                timeout(Duration::from_secs(5), async move {
2582                    let mut events = Vec::new();
2583                    while let Some(event) = relay_rx.recv().await {
2584                        events.push(event);
2585                    }
2586                    events
2587                })
2588                .await
2589                .expect("Unclosed channel: may indicate file-server could not shutdown gracefully.")
2590            } else {
2591                inner.await;
2592                drop(trigger_shutdown);
2593
2594                if acking_mode == Unfinalized {
2595                    rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2596                        .collect::<Vec<_>>()
2597                        .await
2598                } else {
2599                    timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2600                        .await
2601                        .expect(
2602                            "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2603                        )
2604                }
2605            };
2606
2607            if wait_shutdown {
2608                shutdown_done.await;
2609            }
2610
2611            result
2612        })
2613        .await
2614    }
2615
2616    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2617        received
2618            .into_iter()
2619            .map(Event::into_log)
2620            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2621            .collect()
2622    }
2623
2624    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2625        received
2626            .into_iter()
2627            .map(Event::into_log)
2628            .map(|log| log.get_message().unwrap().clone())
2629            .collect()
2630    }
2631}