vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
12use tokio::{
13    select,
14    sync::{mpsc::UnboundedSender, oneshot},
15    time::timeout,
16};
17use tracing::{Instrument, Span};
18use vector_lib::{
19    EstimatedJsonEncodedSizeOf,
20    buffers::{
21        BufferType, WhenFull,
22        topology::{
23            builder::TopologyBuilder,
24            channel::{
25                BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
26                LimitedReceiver,
27            },
28        },
29    },
30    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
31    latency::LatencyRecorder,
32    schema::Definition,
33    source_sender::{CHUNK_SIZE, SourceSenderItem},
34    transform::update_runtime_schema_definition,
35};
36use vector_lib::{gauge, internal_event::GaugeName};
37use vector_vrl_metrics::MetricsStorage;
38
39use super::{
40    BuiltBuffer, ConfigDiff,
41    fanout::{self, Fanout},
42    schema,
43    task::{Task, TaskOutput, TaskResult},
44};
45use crate::{
46    SourceSender,
47    config::{
48        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
49        ProxyConfig, SinkContext, SinkOuter, SourceContext, SourceOuter, TransformContext,
50        TransformOuter, TransformOutput,
51    },
52    event::{EventArray, EventContainer},
53    extra_context::ExtraContext,
54    internal_events::EventsReceived,
55    shutdown::SourceShutdownCoordinator,
56    spawn_named,
57    topology::task::TaskError,
58    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
59    utilization::{
60        OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
61        UtilizationRegistry,
62    },
63};
64
65static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
66    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
67static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
68
69pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
70    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
71
72const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
73pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
74
75static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
76    crate::app::worker_threads()
77        .map(std::num::NonZeroUsize::get)
78        .unwrap_or_else(crate::num_threads)
79});
80
81const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
82
83struct Builder<'a> {
84    config: &'a super::Config,
85    diff: &'a ConfigDiff,
86    shutdown_coordinator: SourceShutdownCoordinator,
87    errors: Vec<String>,
88    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
89    tasks: HashMap<ComponentKey, Task>,
90    buffers: HashMap<ComponentKey, BuiltBuffer>,
91    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
92    healthchecks: HashMap<ComponentKey, Task>,
93    detach_triggers: HashMap<ComponentKey, Trigger>,
94    extra_context: ExtraContext,
95    utilization_emitter: Option<UtilizationEmitter>,
96    utilization_registry: UtilizationRegistry,
97}
98
99impl<'a> Builder<'a> {
100    fn new(
101        config: &'a super::Config,
102        diff: &'a ConfigDiff,
103        buffers: HashMap<ComponentKey, BuiltBuffer>,
104        extra_context: ExtraContext,
105        utilization_registry: Option<UtilizationRegistry>,
106    ) -> Self {
107        // If registry is not passed, we need to build a whole new utilization emitter + registry
108        // Otherwise, we just store the registry and reuse it for this build
109        let (emitter, registry) = if let Some(registry) = utilization_registry {
110            (None, registry)
111        } else {
112            let (emitter, registry) = UtilizationEmitter::new();
113            (Some(emitter), registry)
114        };
115        Self {
116            config,
117            diff,
118            buffers,
119            shutdown_coordinator: SourceShutdownCoordinator::default(),
120            errors: vec![],
121            outputs: HashMap::new(),
122            tasks: HashMap::new(),
123            inputs: HashMap::new(),
124            healthchecks: HashMap::new(),
125            detach_triggers: HashMap::new(),
126            extra_context,
127            utilization_emitter: emitter,
128            utilization_registry: registry,
129        }
130    }
131
132    /// Builds the new pieces of the topology found in `self.diff`.
133    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
134        let enrichment_tables = self.load_enrichment_tables().await;
135        let source_tasks = self.build_sources(enrichment_tables).await;
136        self.build_transforms(enrichment_tables).await;
137        self.build_sinks(enrichment_tables).await;
138
139        // We should have all the data for the enrichment tables loaded now, so switch them over to
140        // readonly.
141        enrichment_tables.finish_load();
142
143        if self.errors.is_empty() {
144            Ok(TopologyPieces {
145                inputs: self.inputs,
146                outputs: Self::finalize_outputs(self.outputs),
147                tasks: self.tasks,
148                source_tasks,
149                healthchecks: self.healthchecks,
150                shutdown_coordinator: self.shutdown_coordinator,
151                detach_triggers: self.detach_triggers,
152                metrics_storage: METRICS_STORAGE.clone(),
153                utilization: self
154                    .utilization_emitter
155                    .map(|e| (e, self.utilization_registry)),
156            })
157        } else {
158            Err(self.errors)
159        }
160    }
161
162    fn finalize_outputs(
163        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
164    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
165    {
166        let mut finalized_outputs = HashMap::new();
167        for (id, output) in outputs {
168            let entry = finalized_outputs
169                .entry(id.component)
170                .or_insert_with(HashMap::new);
171            entry.insert(id.port, output);
172        }
173
174        finalized_outputs
175    }
176
177    /// Loads, or reloads the enrichment tables.
178    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
179    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
180        let mut enrichment_tables = HashMap::new();
181
182        // Build enrichment tables
183        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
184            let table_name = name.to_string();
185            if ENRICHMENT_TABLES.needs_reload(&table_name) {
186                let indexes = if !self.diff.enrichment_tables.is_added(name) {
187                    // If this is an existing enrichment table, we need to store the indexes to reapply
188                    // them again post load.
189                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
190                } else {
191                    None
192                };
193
194                let mut table = match table_outer.inner.build(&self.config.global).await {
195                    Ok(table) => table,
196                    Err(error) => {
197                        self.errors
198                            .push(format!("Enrichment Table \"{name}\": {error}"));
199                        continue;
200                    }
201                };
202
203                if let Some(indexes) = indexes {
204                    for (case, index) in indexes {
205                        match table
206                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
207                        {
208                            Ok(_) => (),
209                            Err(error) => {
210                                // If there is an error adding an index we do not want to use the reloaded
211                                // data, the previously loaded data will still need to be used.
212                                // Just report the error and continue.
213                                error!(message = "Unable to add index to reloaded enrichment table.",
214                                    table = ?name.to_string(),
215                                    %error);
216                                continue 'tables;
217                            }
218                        }
219                    }
220                }
221
222                enrichment_tables.insert(table_name, table);
223            }
224        }
225
226        ENRICHMENT_TABLES.load(enrichment_tables);
227
228        &ENRICHMENT_TABLES
229    }
230
231    async fn build_sources(
232        &mut self,
233        enrichment_tables: &vector_lib::enrichment::TableRegistry,
234    ) -> HashMap<ComponentKey, Task> {
235        let mut source_tasks = HashMap::new();
236
237        let table_sources = self
238            .config
239            .enrichment_tables
240            .iter()
241            .filter_map(|(key, table)| table.as_source(key))
242            .collect::<Vec<_>>();
243        for (key, source) in self
244            .config
245            .sources()
246            .filter(|(key, _)| self.diff.sources.contains_new(key))
247            .chain(
248                table_sources
249                    .iter()
250                    .map(|(key, source)| (key, source))
251                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
252            )
253        {
254            debug!(component_id = %key, "Building new source.");
255
256            let span = error_span!(
257                "source",
258                component_kind = "source",
259                component_id = %key.id(),
260                component_type = %source.inner.get_component_name(),
261            );
262
263            if let Ok(server) = self
264                .build_instrumented_source(key, source, enrichment_tables)
265                .instrument(span)
266                .await
267            {
268                source_tasks.insert(key.clone(), server);
269            }
270        }
271
272        source_tasks
273    }
274
275    async fn build_instrumented_source(
276        &mut self,
277        key: &ComponentKey,
278        source: &SourceOuter,
279        enrichment_tables: &vector_lib::enrichment::TableRegistry,
280    ) -> Result<Task, ()> {
281        let typetag = source.inner.get_component_name();
282        let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
283
284        let task_name = format!(
285            ">> {} ({}, pump) >>",
286            source.inner.get_component_name(),
287            key.id()
288        );
289
290        let mut builder = SourceSender::builder()
291            .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
292            .with_timeout(source.inner.send_timeout())
293            .with_ewma_half_life_seconds(
294                self.config.global.buffer_utilization_ewma_half_life_seconds,
295            );
296        let mut pumps = Vec::new();
297        let mut controls = HashMap::new();
298        let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
299
300        for output in source_outputs.into_iter() {
301            let rx = builder.add_source_output(output.clone(), key.clone());
302
303            let (fanout, control) = Fanout::new(key.clone());
304            let source_type = source.inner.get_component_name();
305            let source = Arc::new(key.clone());
306
307            let pump = run_source_output_pump(rx, fanout, source, source_type);
308
309            pumps.push(pump.instrument(Span::current()));
310            controls.insert(
311                OutputId {
312                    component: key.clone(),
313                    port: output.port.clone(),
314                },
315                control,
316            );
317
318            let port = output.port.clone();
319            if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
320                schema_definitions.insert(port, definition);
321            }
322        }
323
324        let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
325        let pump = async move {
326            debug!("Source pump supervisor starting.");
327
328            // Spawn all of the per-output pumps and then await their completion.
329            //
330            // If any of the pumps complete with an error, or panic/are cancelled, we return
331            // immediately.
332            let mut handles = FuturesUnordered::new();
333            for pump in pumps {
334                handles.push(spawn_named(pump, task_name.as_ref()));
335            }
336
337            let mut had_pump_error = false;
338            while let Some(output) = handles.try_next().await? {
339                if let Err(e) = output {
340                    // Immediately send the error to the source's wrapper future, but ignore any
341                    // errors during the send, since nested errors wouldn't make any sense here.
342                    _ = pump_error_tx.send(e);
343                    had_pump_error = true;
344                    break;
345                }
346            }
347
348            if had_pump_error {
349                debug!("Source pump supervisor task finished with an error.");
350            } else {
351                debug!("Source pump supervisor task finished normally.");
352            }
353            Ok(TaskOutput::Source)
354        };
355        let pump = Task::new(key.clone(), typetag, pump);
356
357        let (shutdown_signal, force_shutdown_tripwire) = self
358            .shutdown_coordinator
359            .register_source(key, INTERNAL_SOURCES.contains(&typetag));
360
361        let context = SourceContext {
362            key: key.clone(),
363            globals: self.config.global.clone(),
364            enrichment_tables: enrichment_tables.clone(),
365            metrics_storage: METRICS_STORAGE.clone(),
366            shutdown: shutdown_signal,
367            out: builder.build(),
368            proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
369            acknowledgements: source.sink_acknowledgements,
370            schema_definitions,
371            schema: self.config.schema,
372            extra_context: self.extra_context.clone(),
373        };
374        let server = match source.inner.build(context).await {
375            Err(error) => {
376                self.errors.push(format!("Source \"{key}\": {error}"));
377                return Err(());
378            }
379            Ok(server) => server,
380        };
381
382        // Build a wrapper future that drives the actual source future, but returns early if we've
383        // been signalled to forcefully shutdown, or if the source pump encounters an error.
384        //
385        // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
386        // within the allotted time window. This can occur normally for certain sources, like stdin,
387        // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
388        // to shutdown unless some input is given.
389        let server = async move {
390            debug!("Source starting.");
391
392            let mut result = select! {
393                biased;
394
395                // We've been told that we must forcefully shut down.
396                _ = force_shutdown_tripwire => Ok(()),
397
398                // The source pump encountered an error, which we're now bubbling up here to stop
399                // the source as well, since the source running makes no sense without the pump.
400                //
401                // We only match receiving a message, not the error of the sender being dropped,
402                // just to keep things simpler.
403                Ok(e) = &mut pump_error_rx => Err(e),
404
405                // The source finished normally.
406                result = server => result.map_err(|_| TaskError::Opaque),
407            };
408
409            // Even though we already tried to receive any pump task error above, we may have exited
410            // on the source itself returning an error due to task scheduling, where the pump task
411            // encountered an error, sent it over the oneshot, but we were polling the source
412            // already and hit an error trying to send to the now-shutdown pump task.
413            //
414            // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
415            // time to see if the pump task encountered an error, using _that_ instead if so, to
416            // propagate the true error that caused the source to have to stop.
417            if let Ok(e) = pump_error_rx.try_recv() {
418                result = Err(e);
419            }
420
421            match result {
422                Ok(()) => {
423                    debug!("Source finished normally.");
424                    Ok(TaskOutput::Source)
425                }
426                Err(e) => {
427                    debug!("Source finished with an error.");
428                    Err(e)
429                }
430            }
431        };
432        let server = Task::new(key.clone(), typetag, server);
433
434        self.outputs.extend(controls);
435        self.tasks.insert(key.clone(), pump);
436
437        Ok(server)
438    }
439
440    async fn build_transforms(
441        &mut self,
442        enrichment_tables: &vector_lib::enrichment::TableRegistry,
443    ) {
444        let mut definition_cache = HashMap::default();
445
446        for (key, transform) in self
447            .config
448            .transforms()
449            .filter(|(key, _)| self.diff.transforms.contains_new(key))
450        {
451            debug!(component_id = %key, "Building new transform.");
452
453            let input_definitions = match schema::input_definitions(
454                &transform.inputs,
455                self.config,
456                enrichment_tables.clone(),
457                &mut definition_cache,
458            ) {
459                Ok(definitions) => definitions,
460                Err(_) => {
461                    // We have received an error whilst retrieving the definitions,
462                    // there is no point in continuing.
463
464                    return;
465                }
466            };
467
468            let span = error_span!(
469                "transform",
470                component_kind = "transform",
471                component_id = %key.id(),
472                component_type = %transform.inner.get_component_name(),
473            );
474
475            self.build_instrumented_transform(key, transform, enrichment_tables, input_definitions)
476                .instrument(span)
477                .await;
478        }
479    }
480
481    async fn build_instrumented_transform(
482        &mut self,
483        key: &ComponentKey,
484        transform: &TransformOuter<OutputId>,
485        enrichment_tables: &vector_lib::enrichment::TableRegistry,
486        input_definitions: Vec<(OutputId, Definition)>,
487    ) {
488        let merged_definition: Definition = input_definitions
489            .iter()
490            .map(|(_output_id, definition)| definition.clone())
491            .reduce(Definition::merge)
492            // We may not have any definitions if all the inputs are from metrics sources.
493            .unwrap_or_else(Definition::any);
494
495        // Create a map of the outputs to the list of possible definitions from those outputs.
496        let schema_definitions = transform
497            .inner
498            .outputs(
499                &TransformContext {
500                    enrichment_tables: enrichment_tables.clone(),
501                    metrics_storage: METRICS_STORAGE.clone(),
502                    schema: self.config.schema,
503                    ..Default::default()
504                },
505                &input_definitions,
506            )
507            .into_iter()
508            .map(|output| {
509                let definitions = output.schema_definitions(self.config.schema.enabled);
510                (output.port, definitions)
511            })
512            .collect::<HashMap<_, _>>();
513
514        let context = TransformContext {
515            key: Some(key.clone()),
516            globals: self.config.global.clone(),
517            enrichment_tables: enrichment_tables.clone(),
518            metrics_storage: METRICS_STORAGE.clone(),
519            schema_definitions,
520            merged_schema_definition: merged_definition.clone(),
521            schema: self.config.schema,
522            extra_context: self.extra_context.clone(),
523        };
524
525        let node = TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
526
527        let transform = match transform
528            .inner
529            .build(&context)
530            .instrument(Span::current())
531            .await
532        {
533            Err(error) => {
534                self.errors.push(format!("Transform \"{key}\": {error}"));
535                return;
536            }
537            Ok(transform) => transform,
538        };
539
540        let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
541        let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
542            TOPOLOGY_BUFFER_SIZE,
543            WhenFull::Block,
544            &Span::current(),
545            Some(metrics),
546            self.config.global.buffer_utilization_ewma_half_life_seconds,
547        );
548
549        self.inputs
550            .insert(key.clone(), (input_tx, node.inputs.clone()));
551
552        let (transform_task, transform_outputs) = self.build_transform(transform, node, input_rx);
553
554        self.outputs.extend(transform_outputs);
555        self.tasks.insert(key.clone(), transform_task);
556    }
557
558    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
559        let table_sinks = self
560            .config
561            .enrichment_tables
562            .iter()
563            .filter_map(|(key, table)| table.as_sink(key))
564            .collect::<Vec<_>>();
565        for (key, sink) in self
566            .config
567            .sinks()
568            .filter(|(key, _)| self.diff.sinks.contains_new(key))
569            .chain(
570                table_sinks
571                    .iter()
572                    .map(|(key, sink)| (key, sink))
573                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
574            )
575        {
576            debug!(component_id = %key, "Building new sink.");
577
578            let span = error_span!(
579                "sink",
580                component_kind = "sink",
581                component_id = %key.id(),
582                component_type = %sink.inner.get_component_name(),
583            );
584
585            self.build_instrumented_sink(key, sink, enrichment_tables)
586                .instrument(span)
587                .await;
588        }
589    }
590
591    async fn build_instrumented_sink(
592        &mut self,
593        key: &ComponentKey,
594        sink: &SinkOuter<OutputId>,
595        enrichment_tables: &vector_lib::enrichment::TableRegistry,
596    ) {
597        let sink_inputs = &sink.inputs;
598        let healthcheck = sink.healthcheck();
599        let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
600        let healthcheck_timeout = healthcheck.timeout;
601
602        let typetag = sink.inner.get_component_name();
603        let input_type = sink.inner.input().data_type();
604
605        // At this point, we've validated that all transforms are valid, including any
606        // transform that mutates the schema provided by their sources. We can now validate the
607        // schema expectations of each individual sink.
608        if let Err(mut err) =
609            schema::validate_sink_expectations(key, sink, self.config, enrichment_tables.clone())
610        {
611            self.errors.append(&mut err);
612        };
613
614        let (tx, rx) = match self.buffers.remove(key) {
615            Some(buffer) => buffer,
616            _ => {
617                let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") {
618                    BufferType::Memory { .. } => "memory",
619                    BufferType::DiskV2 { .. } => "disk",
620                };
621                let buffer_span = error_span!("sink", buffer_type);
622                let buffer = sink
623                    .buffer
624                    .build(
625                        self.config.global.data_dir.clone(),
626                        key.to_string(),
627                        buffer_span,
628                    )
629                    .await;
630                match buffer {
631                    Err(error) => {
632                        self.errors.push(format!("Sink \"{key}\": {error}"));
633                        return;
634                    }
635                    Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
636                }
637            }
638        };
639
640        let cx = SinkContext {
641            healthcheck,
642            globals: self.config.global.clone(),
643            enrichment_tables: enrichment_tables.clone(),
644            metrics_storage: METRICS_STORAGE.clone(),
645            proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
646            schema: self.config.schema,
647            app_name: crate::get_app_name().to_string(),
648            app_name_slug: crate::get_slugified_app_name(),
649            extra_context: self.extra_context.clone(),
650        };
651
652        let (sink, healthcheck) = match sink.inner.build(cx).await {
653            Err(error) => {
654                self.errors.push(format!("Sink \"{key}\": {error}"));
655                return;
656            }
657            Ok(built) => built,
658        };
659
660        let (trigger, tripwire) = Tripwire::new();
661
662        let utilization_sender = self
663            .utilization_registry
664            .add_component(key.clone(), gauge!(GaugeName::Utilization));
665        let component_key = key.clone();
666        let sink = async move {
667            debug!("Sink starting.");
668
669            // Why is this Arc<Mutex<Option<_>>> needed you ask.
670            // In case when this function build_pieces errors
671            // this future won't be run so this rx won't be taken
672            // which will enable us to reuse rx to rebuild
673            // old configuration by passing this Arc<Mutex<Option<_>>>
674            // yet again.
675            let rx = rx
676                .lock()
677                .unwrap()
678                .take()
679                .expect("Task started but input has been taken.");
680
681            let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
682
683            let events_received = register!(EventsReceived);
684            sink.run(
685                rx.by_ref()
686                    .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
687                    .inspect(|events| {
688                        events_received.emit(CountByteSize(
689                            events.len(),
690                            events.estimated_json_encoded_size_of(),
691                        ))
692                    })
693                    .take_until_if(tripwire),
694            )
695            .await
696            .map(|_| {
697                debug!("Sink finished normally.");
698                TaskOutput::Sink(rx)
699            })
700            .map_err(|_| {
701                debug!("Sink finished with an error.");
702                TaskError::Opaque
703            })
704        };
705
706        let task = Task::new(key.clone(), typetag, sink);
707
708        let component_key = key.clone();
709        let healthcheck_task = async move {
710            if enable_healthcheck {
711                timeout(healthcheck_timeout, healthcheck)
712                    .map(|result| match result {
713                        Ok(Ok(_)) => {
714                            info!("Healthcheck passed.");
715                            Ok(TaskOutput::Healthcheck)
716                        }
717                        Ok(Err(error)) => {
718                            error!(
719                                msg = "Healthcheck failed.",
720                                %error,
721                                component_kind = "sink",
722                                component_type = typetag,
723                                component_id = %component_key.id(),
724                            );
725                            Err(TaskError::wrapped(error))
726                        }
727                        Err(e) => {
728                            error!(
729                                msg = "Healthcheck timed out.",
730                                component_kind = "sink",
731                                component_type = typetag,
732                                component_id = %component_key.id(),
733                            );
734                            Err(TaskError::wrapped(Box::new(e)))
735                        }
736                    })
737                    .await
738            } else {
739                info!("Healthcheck disabled.");
740                Ok(TaskOutput::Healthcheck)
741            }
742        };
743
744        let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
745
746        self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
747        self.healthchecks.insert(key.clone(), healthcheck_task);
748        self.tasks.insert(key.clone(), task);
749        self.detach_triggers.insert(key.clone(), trigger);
750    }
751
752    fn build_transform(
753        &self,
754        transform: Transform,
755        node: TransformNode,
756        input_rx: BufferReceiver<EventArray>,
757    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
758        match transform {
759            // TODO: avoid the double boxing for function transforms here
760            Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
761            Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
762            Transform::Task(t) => self.build_task_transform(
763                t,
764                input_rx,
765                node.input_details.data_type(),
766                node.typetag,
767                &node.key,
768                &node.outputs,
769            ),
770        }
771    }
772
773    fn build_sync_transform(
774        &self,
775        t: Box<dyn SyncTransform>,
776        node: TransformNode,
777        input_rx: BufferReceiver<EventArray>,
778    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
779        let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
780
781        let sender = self
782            .utilization_registry
783            .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
784        let runner = Runner::new(
785            t,
786            input_rx,
787            sender,
788            node.input_details.data_type(),
789            outputs,
790            LatencyRecorder::new(self.config.global.latency_ewma_alpha),
791        );
792        let transform = if node.enable_concurrency {
793            runner.run_concurrently().boxed()
794        } else {
795            runner.run_inline().boxed()
796        };
797
798        let transform = async move {
799            debug!("Synchronous transform starting.");
800
801            match transform.await {
802                Ok(v) => {
803                    debug!("Synchronous transform finished normally.");
804                    Ok(v)
805                }
806                Err(e) => {
807                    debug!("Synchronous transform finished with an error.");
808                    Err(e)
809                }
810            }
811        };
812
813        let mut output_controls = HashMap::new();
814        for (name, control) in controls {
815            let id = name
816                .map(|name| OutputId::from((&node.key, name)))
817                .unwrap_or_else(|| OutputId::from(&node.key));
818            output_controls.insert(id, control);
819        }
820
821        let task = Task::new(node.key.clone(), node.typetag, transform);
822
823        (task, output_controls)
824    }
825
826    fn build_task_transform(
827        &self,
828        t: Box<dyn TaskTransform<EventArray>>,
829        input_rx: BufferReceiver<EventArray>,
830        input_type: DataType,
831        typetag: &str,
832        key: &ComponentKey,
833        outputs: &[TransformOutput],
834    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
835        let (mut fanout, control) = Fanout::new(key.clone());
836
837        let sender = self
838            .utilization_registry
839            .add_component(key.clone(), gauge!(GaugeName::Utilization));
840        let output_sender = sender.clone();
841        let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
842
843        let events_received = register!(EventsReceived);
844        let filtered = input_rx
845            .filter(move |events| ready(filter_events_type(events, input_type)))
846            .inspect(move |events| {
847                events_received.emit(CountByteSize(
848                    events.len(),
849                    events.estimated_json_encoded_size_of(),
850                ))
851            });
852        let events_sent = register!(EventsSent::from(internal_event::Output(None)));
853        let output_id = Arc::new(OutputId {
854            component: key.clone(),
855            port: None,
856        });
857        let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
858
859        // Task transforms can only write to the default output, so only a single schema def map is needed
860        let schema_definition_map = outputs
861            .iter()
862            .find(|x| x.port.is_none())
863            .expect("output for default port required for task transforms")
864            .log_schema_definitions
865            .clone()
866            .into_iter()
867            .map(|(key, value)| (key, Arc::new(value)))
868            .collect();
869
870        let stream = t
871            .transform(Box::pin(filtered))
872            .map(move |mut events| {
873                for event in events.iter_events_mut() {
874                    update_runtime_schema_definition(event, &output_id, &schema_definition_map);
875                }
876                let now = Instant::now();
877                latency_recorder.on_send(&mut events, now);
878                (events, now)
879            })
880            .inspect(move |(events, _): &(EventArray, Instant)| {
881                events_sent.emit(CountByteSize(
882                    events.len(),
883                    events.estimated_json_encoded_size_of(),
884                ));
885            });
886        let stream = OutputUtilization::new(output_sender, stream);
887        let transform = async move {
888            debug!("Task transform starting.");
889
890            match fanout.send_stream(stream).await {
891                Ok(()) => {
892                    debug!("Task transform finished normally.");
893                    Ok(TaskOutput::Transform)
894                }
895                Err(e) => {
896                    debug!("Task transform finished with an error.");
897                    Err(TaskError::wrapped(e))
898                }
899            }
900        }
901        .boxed();
902
903        let mut outputs = HashMap::new();
904        outputs.insert(OutputId::from(key), control);
905
906        let task = Task::new(key.clone(), typetag, transform);
907
908        (task, outputs)
909    }
910}
911
912async fn run_source_output_pump(
913    mut rx: LimitedReceiver<SourceSenderItem>,
914    mut fanout: Fanout,
915    source: Arc<ComponentKey>,
916    source_type: &'static str,
917) -> TaskResult {
918    debug!("Source pump starting.");
919
920    let mut control_channel_open = true;
921    loop {
922        tokio::select! {
923            biased;
924            // Process control messages (e.g. Remove/Pause) even when the source
925            // is idle, so that config reloads can proceed without waiting for the
926            // next event.
927            alive = fanout.recv_control_message(), if control_channel_open => {
928                control_channel_open = alive;
929            }
930            item = rx.next() => {
931                match item {
932                    Some(SourceSenderItem { events: mut array, send_reference }) => {
933                        // Even though we have a `send_reference` timestamp above, that reference
934                        // time is when the events were enqueued in the `SourceSender`, not when
935                        // they were pulled out of the `rx` stream on this end. Since those times
936                        // can be quite different (due to blocking inherent to the fanout send
937                        // operation), we set the `last_transform_timestamp` to the current time
938                        // instead to get an accurate reference for when the events started
939                        // waiting for the first transform.
940                        let now = Instant::now();
941                        array.for_each_metadata_mut(|metadata| {
942                            metadata.set_source_id(Arc::clone(&source));
943                            metadata.set_source_type(source_type);
944                            metadata.set_last_transform_timestamp(now);
945                        });
946                        fanout
947                            .send(array, Some(send_reference))
948                            .await
949                            .map_err(|e| {
950                                debug!("Source pump finished with an error.");
951                                TaskError::wrapped(e)
952                            })?;
953                    }
954                    None => break,
955                }
956            }
957        }
958    }
959
960    debug!("Source pump finished normally.");
961    Ok(TaskOutput::Source)
962}
963
964pub async fn reload_enrichment_tables(config: &Config) {
965    let mut enrichment_tables = HashMap::new();
966    // Build enrichment tables
967    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
968        let table_name = name.to_string();
969        if ENRICHMENT_TABLES.needs_reload(&table_name) {
970            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
971
972            let mut table = match table_outer.inner.build(&config.global).await {
973                Ok(table) => table,
974                Err(error) => {
975                    error!("Enrichment table \"{name}\" reload failed: {error}");
976                    continue;
977                }
978            };
979
980            if let Some(indexes) = indexes {
981                for (case, index) in indexes {
982                    match table
983                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
984                    {
985                        Ok(_) => (),
986                        Err(error) => {
987                            // If there is an error adding an index we do not want to use the reloaded
988                            // data, the previously loaded data will still need to be used.
989                            // Just report the error and continue.
990                            error!(
991                                message = "Unable to add index to reloaded enrichment table.",
992                                table = ?name.to_string(),
993                                %error
994                            );
995                            continue 'tables;
996                        }
997                    }
998                }
999            }
1000
1001            enrichment_tables.insert(table_name, table);
1002        }
1003    }
1004
1005    ENRICHMENT_TABLES.load(enrichment_tables);
1006    ENRICHMENT_TABLES.finish_load();
1007}
1008
1009pub struct TopologyPieces {
1010    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
1011    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
1012    pub(super) tasks: HashMap<ComponentKey, Task>,
1013    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
1014    pub(super) healthchecks: HashMap<ComponentKey, Task>,
1015    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
1016    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
1017    pub(crate) metrics_storage: MetricsStorage,
1018    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
1019}
1020
1021/// Builder for constructing TopologyPieces with a fluent API.
1022///
1023/// # Examples
1024///
1025/// ```ignore
1026/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
1027///     .with_buffers(buffers)
1028///     .with_extra_context(extra_context)
1029///     .build()
1030///     .await?;
1031/// ```
1032pub struct TopologyPiecesBuilder<'a> {
1033    config: &'a Config,
1034    diff: &'a ConfigDiff,
1035    buffers: HashMap<ComponentKey, BuiltBuffer>,
1036    extra_context: ExtraContext,
1037    utilization_registry: Option<UtilizationRegistry>,
1038}
1039
1040impl<'a> TopologyPiecesBuilder<'a> {
1041    /// Creates a new builder with required parameters.
1042    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1043        Self {
1044            config,
1045            diff,
1046            buffers: HashMap::new(),
1047            extra_context: ExtraContext::default(),
1048            utilization_registry: None,
1049        }
1050    }
1051
1052    /// Sets the buffers for the topology.
1053    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1054        self.buffers = buffers;
1055        self
1056    }
1057
1058    /// Sets the extra context for the topology.
1059    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1060        self.extra_context = extra_context;
1061        self
1062    }
1063
1064    /// Sets the utilization registry for the topology.
1065    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1066        self.utilization_registry = registry;
1067        self
1068    }
1069
1070    /// Builds the topology pieces, returning errors if any occur.
1071    ///
1072    /// Use this method when you need to handle errors explicitly,
1073    /// such as in tests or validation code.
1074    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1075        Builder::new(
1076            self.config,
1077            self.diff,
1078            self.buffers,
1079            self.extra_context,
1080            self.utilization_registry,
1081        )
1082        .build()
1083        .await
1084    }
1085
1086    /// Builds the topology pieces, logging any errors that occur.
1087    ///
1088    /// Use this method for runtime configuration loading where
1089    /// errors should be logged and execution should continue.
1090    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1091        match self.build().await {
1092            Err(errors) => {
1093                for error in errors {
1094                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1095                }
1096                None
1097            }
1098            Ok(new_pieces) => Some(new_pieces),
1099        }
1100    }
1101}
1102
1103impl TopologyPieces {
1104    pub async fn build_or_log_errors(
1105        config: &Config,
1106        diff: &ConfigDiff,
1107        buffers: HashMap<ComponentKey, BuiltBuffer>,
1108        extra_context: ExtraContext,
1109        utilization_registry: Option<UtilizationRegistry>,
1110    ) -> Option<Self> {
1111        TopologyPiecesBuilder::new(config, diff)
1112            .with_buffers(buffers)
1113            .with_extra_context(extra_context)
1114            .with_utilization_registry(utilization_registry)
1115            .build_or_log_errors()
1116            .await
1117    }
1118
1119    /// Builds only the new pieces, and doesn't check their topology.
1120    pub async fn build(
1121        config: &super::Config,
1122        diff: &ConfigDiff,
1123        buffers: HashMap<ComponentKey, BuiltBuffer>,
1124        extra_context: ExtraContext,
1125        utilization_registry: Option<UtilizationRegistry>,
1126    ) -> Result<Self, Vec<String>> {
1127        TopologyPiecesBuilder::new(config, diff)
1128            .with_buffers(buffers)
1129            .with_extra_context(extra_context)
1130            .with_utilization_registry(utilization_registry)
1131            .build()
1132            .await
1133    }
1134}
1135
1136const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1137    match events {
1138        EventArray::Logs(_) => data_type.contains(DataType::Log),
1139        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1140        EventArray::Traces(_) => data_type.contains(DataType::Trace),
1141    }
1142}
1143
1144#[derive(Debug, Clone)]
1145struct TransformNode {
1146    key: ComponentKey,
1147    typetag: &'static str,
1148    inputs: Inputs<OutputId>,
1149    input_details: Input,
1150    outputs: Vec<TransformOutput>,
1151    enable_concurrency: bool,
1152}
1153
1154impl TransformNode {
1155    pub fn from_parts(
1156        key: ComponentKey,
1157        context: &TransformContext,
1158        transform: &TransformOuter<OutputId>,
1159        schema_definition: &[(OutputId, Definition)],
1160    ) -> Self {
1161        Self {
1162            key,
1163            typetag: transform.inner.get_component_name(),
1164            inputs: transform.inputs.clone(),
1165            input_details: transform.inner.input(),
1166            outputs: transform.inner.outputs(context, schema_definition),
1167            enable_concurrency: transform.inner.enable_concurrency(),
1168        }
1169    }
1170}
1171
1172struct Runner {
1173    transform: Box<dyn SyncTransform>,
1174    input_rx: Option<BufferReceiver<EventArray>>,
1175    input_type: DataType,
1176    outputs: TransformOutputs,
1177    timer_tx: UtilizationComponentSender,
1178    latency_recorder: LatencyRecorder,
1179    events_received: Registered<EventsReceived>,
1180}
1181
1182impl Runner {
1183    fn new(
1184        transform: Box<dyn SyncTransform>,
1185        input_rx: BufferReceiver<EventArray>,
1186        timer_tx: UtilizationComponentSender,
1187        input_type: DataType,
1188        outputs: TransformOutputs,
1189        latency_recorder: LatencyRecorder,
1190    ) -> Self {
1191        Self {
1192            transform,
1193            input_rx: Some(input_rx),
1194            input_type,
1195            outputs,
1196            timer_tx,
1197            latency_recorder,
1198            events_received: register!(EventsReceived),
1199        }
1200    }
1201
1202    fn on_events_received(&mut self, events: &EventArray) {
1203        self.timer_tx.try_send_stop_wait();
1204
1205        self.events_received.emit(CountByteSize(
1206            events.len(),
1207            events.estimated_json_encoded_size_of(),
1208        ));
1209    }
1210
1211    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1212        self.timer_tx.try_send_start_wait();
1213        let now = Instant::now();
1214        outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1215        self.outputs.send(outputs_buf).await
1216    }
1217
1218    async fn run_inline(mut self) -> TaskResult {
1219        // 128 is an arbitrary, smallish constant
1220        const INLINE_BATCH_SIZE: usize = 128;
1221
1222        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1223
1224        let mut input_rx = self
1225            .input_rx
1226            .take()
1227            .expect("can't run runner twice")
1228            .into_stream()
1229            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1230
1231        self.timer_tx.try_send_start_wait();
1232        while let Some(events) = input_rx.next().await {
1233            self.on_events_received(&events);
1234            self.transform.transform_all(events, &mut outputs_buf);
1235            self.send_outputs(&mut outputs_buf)
1236                .await
1237                .map_err(TaskError::wrapped)?;
1238        }
1239
1240        Ok(TaskOutput::Transform)
1241    }
1242
1243    async fn run_concurrently(mut self) -> TaskResult {
1244        let input_rx = self
1245            .input_rx
1246            .take()
1247            .expect("can't run runner twice")
1248            .into_stream()
1249            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1250
1251        let mut input_rx =
1252            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1253
1254        let mut in_flight = FuturesOrdered::new();
1255        let mut shutting_down = false;
1256
1257        self.timer_tx.try_send_start_wait();
1258        loop {
1259            tokio::select! {
1260                biased;
1261
1262                result = in_flight.next(), if !in_flight.is_empty() => {
1263                    match result {
1264                        Some(Ok(mut outputs_buf)) => {
1265                            self.send_outputs(&mut outputs_buf).await
1266                                .map_err(TaskError::wrapped)?;
1267                        }
1268                        _ => unreachable!("join error or bad poll"),
1269                    }
1270                }
1271
1272                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1273                    match input_arrays {
1274                        Some(input_arrays) => {
1275                            let mut len = 0;
1276                            for events in &input_arrays {
1277                                self.on_events_received(events);
1278                                len += events.len();
1279                            }
1280
1281                            let mut t = self.transform.clone();
1282                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1283                            let task = tokio::spawn(async move {
1284                                for events in input_arrays {
1285                                    t.transform_all(events, &mut outputs_buf);
1286                                }
1287                                outputs_buf
1288                            }.in_current_span());
1289                            in_flight.push_back(task);
1290                        }
1291                        None => {
1292                            shutting_down = true;
1293                            continue
1294                        }
1295                    }
1296                }
1297
1298                else => {
1299                    if shutting_down {
1300                        break
1301                    }
1302                }
1303            }
1304        }
1305
1306        Ok(TaskOutput::Transform)
1307    }
1308}