1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
12use tokio::{
13 select,
14 sync::{mpsc::UnboundedSender, oneshot},
15 time::timeout,
16};
17use tracing::{Instrument, Span};
18use vector_lib::{
19 EstimatedJsonEncodedSizeOf,
20 buffers::{
21 BufferType, WhenFull,
22 topology::{
23 builder::TopologyBuilder,
24 channel::{
25 BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
26 LimitedReceiver,
27 },
28 },
29 },
30 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
31 latency::LatencyRecorder,
32 schema::Definition,
33 source_sender::{CHUNK_SIZE, SourceSenderItem},
34 transform::update_runtime_schema_definition,
35};
36use vector_lib::{gauge, internal_event::GaugeName};
37use vector_vrl_metrics::MetricsStorage;
38
39use super::{
40 BuiltBuffer, ConfigDiff,
41 fanout::{self, Fanout},
42 schema,
43 task::{Task, TaskOutput, TaskResult},
44};
45use crate::{
46 SourceSender,
47 config::{
48 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
49 ProxyConfig, SinkContext, SinkOuter, SourceContext, SourceOuter, TransformContext,
50 TransformOuter, TransformOutput,
51 },
52 event::{EventArray, EventContainer},
53 extra_context::ExtraContext,
54 internal_events::EventsReceived,
55 shutdown::SourceShutdownCoordinator,
56 spawn_named,
57 topology::task::TaskError,
58 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
59 utilization::{
60 OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
61 UtilizationRegistry,
62 },
63};
64
65static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
66 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
67static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
68
69pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
70 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
71
72const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
73pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
74
75static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
76 crate::app::worker_threads()
77 .map(std::num::NonZeroUsize::get)
78 .unwrap_or_else(crate::num_threads)
79});
80
81const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
82
83struct Builder<'a> {
84 config: &'a super::Config,
85 diff: &'a ConfigDiff,
86 shutdown_coordinator: SourceShutdownCoordinator,
87 errors: Vec<String>,
88 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
89 tasks: HashMap<ComponentKey, Task>,
90 buffers: HashMap<ComponentKey, BuiltBuffer>,
91 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
92 healthchecks: HashMap<ComponentKey, Task>,
93 detach_triggers: HashMap<ComponentKey, Trigger>,
94 extra_context: ExtraContext,
95 utilization_emitter: Option<UtilizationEmitter>,
96 utilization_registry: UtilizationRegistry,
97}
98
99impl<'a> Builder<'a> {
100 fn new(
101 config: &'a super::Config,
102 diff: &'a ConfigDiff,
103 buffers: HashMap<ComponentKey, BuiltBuffer>,
104 extra_context: ExtraContext,
105 utilization_registry: Option<UtilizationRegistry>,
106 ) -> Self {
107 let (emitter, registry) = if let Some(registry) = utilization_registry {
110 (None, registry)
111 } else {
112 let (emitter, registry) = UtilizationEmitter::new();
113 (Some(emitter), registry)
114 };
115 Self {
116 config,
117 diff,
118 buffers,
119 shutdown_coordinator: SourceShutdownCoordinator::default(),
120 errors: vec![],
121 outputs: HashMap::new(),
122 tasks: HashMap::new(),
123 inputs: HashMap::new(),
124 healthchecks: HashMap::new(),
125 detach_triggers: HashMap::new(),
126 extra_context,
127 utilization_emitter: emitter,
128 utilization_registry: registry,
129 }
130 }
131
132 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
134 let enrichment_tables = self.load_enrichment_tables().await;
135 let source_tasks = self.build_sources(enrichment_tables).await;
136 self.build_transforms(enrichment_tables).await;
137 self.build_sinks(enrichment_tables).await;
138
139 enrichment_tables.finish_load();
142
143 if self.errors.is_empty() {
144 Ok(TopologyPieces {
145 inputs: self.inputs,
146 outputs: Self::finalize_outputs(self.outputs),
147 tasks: self.tasks,
148 source_tasks,
149 healthchecks: self.healthchecks,
150 shutdown_coordinator: self.shutdown_coordinator,
151 detach_triggers: self.detach_triggers,
152 metrics_storage: METRICS_STORAGE.clone(),
153 utilization: self
154 .utilization_emitter
155 .map(|e| (e, self.utilization_registry)),
156 })
157 } else {
158 Err(self.errors)
159 }
160 }
161
162 fn finalize_outputs(
163 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
164 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
165 {
166 let mut finalized_outputs = HashMap::new();
167 for (id, output) in outputs {
168 let entry = finalized_outputs
169 .entry(id.component)
170 .or_insert_with(HashMap::new);
171 entry.insert(id.port, output);
172 }
173
174 finalized_outputs
175 }
176
177 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
180 let mut enrichment_tables = HashMap::new();
181
182 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
184 let table_name = name.to_string();
185 if ENRICHMENT_TABLES.needs_reload(&table_name) {
186 let indexes = if !self.diff.enrichment_tables.is_added(name) {
187 Some(ENRICHMENT_TABLES.index_fields(&table_name))
190 } else {
191 None
192 };
193
194 let mut table = match table_outer.inner.build(&self.config.global).await {
195 Ok(table) => table,
196 Err(error) => {
197 self.errors
198 .push(format!("Enrichment Table \"{name}\": {error}"));
199 continue;
200 }
201 };
202
203 if let Some(indexes) = indexes {
204 for (case, index) in indexes {
205 match table
206 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
207 {
208 Ok(_) => (),
209 Err(error) => {
210 error!(message = "Unable to add index to reloaded enrichment table.",
214 table = ?name.to_string(),
215 %error);
216 continue 'tables;
217 }
218 }
219 }
220 }
221
222 enrichment_tables.insert(table_name, table);
223 }
224 }
225
226 ENRICHMENT_TABLES.load(enrichment_tables);
227
228 &ENRICHMENT_TABLES
229 }
230
231 async fn build_sources(
232 &mut self,
233 enrichment_tables: &vector_lib::enrichment::TableRegistry,
234 ) -> HashMap<ComponentKey, Task> {
235 let mut source_tasks = HashMap::new();
236
237 let table_sources = self
238 .config
239 .enrichment_tables
240 .iter()
241 .filter_map(|(key, table)| table.as_source(key))
242 .collect::<Vec<_>>();
243 for (key, source) in self
244 .config
245 .sources()
246 .filter(|(key, _)| self.diff.sources.contains_new(key))
247 .chain(
248 table_sources
249 .iter()
250 .map(|(key, source)| (key, source))
251 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
252 )
253 {
254 debug!(component_id = %key, "Building new source.");
255
256 let span = error_span!(
257 "source",
258 component_kind = "source",
259 component_id = %key.id(),
260 component_type = %source.inner.get_component_name(),
261 );
262
263 if let Ok(server) = self
264 .build_instrumented_source(key, source, enrichment_tables)
265 .instrument(span)
266 .await
267 {
268 source_tasks.insert(key.clone(), server);
269 }
270 }
271
272 source_tasks
273 }
274
275 async fn build_instrumented_source(
276 &mut self,
277 key: &ComponentKey,
278 source: &SourceOuter,
279 enrichment_tables: &vector_lib::enrichment::TableRegistry,
280 ) -> Result<Task, ()> {
281 let typetag = source.inner.get_component_name();
282 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
283
284 let task_name = format!(
285 ">> {} ({}, pump) >>",
286 source.inner.get_component_name(),
287 key.id()
288 );
289
290 let mut builder = SourceSender::builder()
291 .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
292 .with_timeout(source.inner.send_timeout())
293 .with_ewma_half_life_seconds(
294 self.config.global.buffer_utilization_ewma_half_life_seconds,
295 );
296 let mut pumps = Vec::new();
297 let mut controls = HashMap::new();
298 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
299
300 for output in source_outputs.into_iter() {
301 let rx = builder.add_source_output(output.clone(), key.clone());
302
303 let (fanout, control) = Fanout::new(key.clone());
304 let source_type = source.inner.get_component_name();
305 let source = Arc::new(key.clone());
306
307 let pump = run_source_output_pump(rx, fanout, source, source_type);
308
309 pumps.push(pump.instrument(Span::current()));
310 controls.insert(
311 OutputId {
312 component: key.clone(),
313 port: output.port.clone(),
314 },
315 control,
316 );
317
318 let port = output.port.clone();
319 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
320 schema_definitions.insert(port, definition);
321 }
322 }
323
324 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
325 let pump = async move {
326 debug!("Source pump supervisor starting.");
327
328 let mut handles = FuturesUnordered::new();
333 for pump in pumps {
334 handles.push(spawn_named(pump, task_name.as_ref()));
335 }
336
337 let mut had_pump_error = false;
338 while let Some(output) = handles.try_next().await? {
339 if let Err(e) = output {
340 _ = pump_error_tx.send(e);
343 had_pump_error = true;
344 break;
345 }
346 }
347
348 if had_pump_error {
349 debug!("Source pump supervisor task finished with an error.");
350 } else {
351 debug!("Source pump supervisor task finished normally.");
352 }
353 Ok(TaskOutput::Source)
354 };
355 let pump = Task::new(key.clone(), typetag, pump);
356
357 let (shutdown_signal, force_shutdown_tripwire) = self
358 .shutdown_coordinator
359 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
360
361 let context = SourceContext {
362 key: key.clone(),
363 globals: self.config.global.clone(),
364 enrichment_tables: enrichment_tables.clone(),
365 metrics_storage: METRICS_STORAGE.clone(),
366 shutdown: shutdown_signal,
367 out: builder.build(),
368 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
369 acknowledgements: source.sink_acknowledgements,
370 schema_definitions,
371 schema: self.config.schema,
372 extra_context: self.extra_context.clone(),
373 };
374 let server = match source.inner.build(context).await {
375 Err(error) => {
376 self.errors.push(format!("Source \"{key}\": {error}"));
377 return Err(());
378 }
379 Ok(server) => server,
380 };
381
382 let server = async move {
390 debug!("Source starting.");
391
392 let mut result = select! {
393 biased;
394
395 _ = force_shutdown_tripwire => Ok(()),
397
398 Ok(e) = &mut pump_error_rx => Err(e),
404
405 result = server => result.map_err(|_| TaskError::Opaque),
407 };
408
409 if let Ok(e) = pump_error_rx.try_recv() {
418 result = Err(e);
419 }
420
421 match result {
422 Ok(()) => {
423 debug!("Source finished normally.");
424 Ok(TaskOutput::Source)
425 }
426 Err(e) => {
427 debug!("Source finished with an error.");
428 Err(e)
429 }
430 }
431 };
432 let server = Task::new(key.clone(), typetag, server);
433
434 self.outputs.extend(controls);
435 self.tasks.insert(key.clone(), pump);
436
437 Ok(server)
438 }
439
440 async fn build_transforms(
441 &mut self,
442 enrichment_tables: &vector_lib::enrichment::TableRegistry,
443 ) {
444 let mut definition_cache = HashMap::default();
445
446 for (key, transform) in self
447 .config
448 .transforms()
449 .filter(|(key, _)| self.diff.transforms.contains_new(key))
450 {
451 debug!(component_id = %key, "Building new transform.");
452
453 let input_definitions = match schema::input_definitions(
454 &transform.inputs,
455 self.config,
456 enrichment_tables.clone(),
457 &mut definition_cache,
458 ) {
459 Ok(definitions) => definitions,
460 Err(_) => {
461 return;
465 }
466 };
467
468 let span = error_span!(
469 "transform",
470 component_kind = "transform",
471 component_id = %key.id(),
472 component_type = %transform.inner.get_component_name(),
473 );
474
475 self.build_instrumented_transform(key, transform, enrichment_tables, input_definitions)
476 .instrument(span)
477 .await;
478 }
479 }
480
481 async fn build_instrumented_transform(
482 &mut self,
483 key: &ComponentKey,
484 transform: &TransformOuter<OutputId>,
485 enrichment_tables: &vector_lib::enrichment::TableRegistry,
486 input_definitions: Vec<(OutputId, Definition)>,
487 ) {
488 let merged_definition: Definition = input_definitions
489 .iter()
490 .map(|(_output_id, definition)| definition.clone())
491 .reduce(Definition::merge)
492 .unwrap_or_else(Definition::any);
494
495 let schema_definitions = transform
497 .inner
498 .outputs(
499 &TransformContext {
500 enrichment_tables: enrichment_tables.clone(),
501 metrics_storage: METRICS_STORAGE.clone(),
502 schema: self.config.schema,
503 ..Default::default()
504 },
505 &input_definitions,
506 )
507 .into_iter()
508 .map(|output| {
509 let definitions = output.schema_definitions(self.config.schema.enabled);
510 (output.port, definitions)
511 })
512 .collect::<HashMap<_, _>>();
513
514 let context = TransformContext {
515 key: Some(key.clone()),
516 globals: self.config.global.clone(),
517 enrichment_tables: enrichment_tables.clone(),
518 metrics_storage: METRICS_STORAGE.clone(),
519 schema_definitions,
520 merged_schema_definition: merged_definition.clone(),
521 schema: self.config.schema,
522 extra_context: self.extra_context.clone(),
523 };
524
525 let node = TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
526
527 let transform = match transform
528 .inner
529 .build(&context)
530 .instrument(Span::current())
531 .await
532 {
533 Err(error) => {
534 self.errors.push(format!("Transform \"{key}\": {error}"));
535 return;
536 }
537 Ok(transform) => transform,
538 };
539
540 let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
541 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
542 TOPOLOGY_BUFFER_SIZE,
543 WhenFull::Block,
544 &Span::current(),
545 Some(metrics),
546 self.config.global.buffer_utilization_ewma_half_life_seconds,
547 );
548
549 self.inputs
550 .insert(key.clone(), (input_tx, node.inputs.clone()));
551
552 let (transform_task, transform_outputs) = self.build_transform(transform, node, input_rx);
553
554 self.outputs.extend(transform_outputs);
555 self.tasks.insert(key.clone(), transform_task);
556 }
557
558 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
559 let table_sinks = self
560 .config
561 .enrichment_tables
562 .iter()
563 .filter_map(|(key, table)| table.as_sink(key))
564 .collect::<Vec<_>>();
565 for (key, sink) in self
566 .config
567 .sinks()
568 .filter(|(key, _)| self.diff.sinks.contains_new(key))
569 .chain(
570 table_sinks
571 .iter()
572 .map(|(key, sink)| (key, sink))
573 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
574 )
575 {
576 debug!(component_id = %key, "Building new sink.");
577
578 let span = error_span!(
579 "sink",
580 component_kind = "sink",
581 component_id = %key.id(),
582 component_type = %sink.inner.get_component_name(),
583 );
584
585 self.build_instrumented_sink(key, sink, enrichment_tables)
586 .instrument(span)
587 .await;
588 }
589 }
590
591 async fn build_instrumented_sink(
592 &mut self,
593 key: &ComponentKey,
594 sink: &SinkOuter<OutputId>,
595 enrichment_tables: &vector_lib::enrichment::TableRegistry,
596 ) {
597 let sink_inputs = &sink.inputs;
598 let healthcheck = sink.healthcheck();
599 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
600 let healthcheck_timeout = healthcheck.timeout;
601
602 let typetag = sink.inner.get_component_name();
603 let input_type = sink.inner.input().data_type();
604
605 if let Err(mut err) =
609 schema::validate_sink_expectations(key, sink, self.config, enrichment_tables.clone())
610 {
611 self.errors.append(&mut err);
612 };
613
614 let (tx, rx) = match self.buffers.remove(key) {
615 Some(buffer) => buffer,
616 _ => {
617 let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") {
618 BufferType::Memory { .. } => "memory",
619 BufferType::DiskV2 { .. } => "disk",
620 };
621 let buffer_span = error_span!("sink", buffer_type);
622 let buffer = sink
623 .buffer
624 .build(
625 self.config.global.data_dir.clone(),
626 key.to_string(),
627 buffer_span,
628 )
629 .await;
630 match buffer {
631 Err(error) => {
632 self.errors.push(format!("Sink \"{key}\": {error}"));
633 return;
634 }
635 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
636 }
637 }
638 };
639
640 let cx = SinkContext {
641 healthcheck,
642 globals: self.config.global.clone(),
643 enrichment_tables: enrichment_tables.clone(),
644 metrics_storage: METRICS_STORAGE.clone(),
645 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
646 schema: self.config.schema,
647 app_name: crate::get_app_name().to_string(),
648 app_name_slug: crate::get_slugified_app_name(),
649 extra_context: self.extra_context.clone(),
650 };
651
652 let (sink, healthcheck) = match sink.inner.build(cx).await {
653 Err(error) => {
654 self.errors.push(format!("Sink \"{key}\": {error}"));
655 return;
656 }
657 Ok(built) => built,
658 };
659
660 let (trigger, tripwire) = Tripwire::new();
661
662 let utilization_sender = self
663 .utilization_registry
664 .add_component(key.clone(), gauge!(GaugeName::Utilization));
665 let component_key = key.clone();
666 let sink = async move {
667 debug!("Sink starting.");
668
669 let rx = rx
676 .lock()
677 .unwrap()
678 .take()
679 .expect("Task started but input has been taken.");
680
681 let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
682
683 let events_received = register!(EventsReceived);
684 sink.run(
685 rx.by_ref()
686 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
687 .inspect(|events| {
688 events_received.emit(CountByteSize(
689 events.len(),
690 events.estimated_json_encoded_size_of(),
691 ))
692 })
693 .take_until_if(tripwire),
694 )
695 .await
696 .map(|_| {
697 debug!("Sink finished normally.");
698 TaskOutput::Sink(rx)
699 })
700 .map_err(|_| {
701 debug!("Sink finished with an error.");
702 TaskError::Opaque
703 })
704 };
705
706 let task = Task::new(key.clone(), typetag, sink);
707
708 let component_key = key.clone();
709 let healthcheck_task = async move {
710 if enable_healthcheck {
711 timeout(healthcheck_timeout, healthcheck)
712 .map(|result| match result {
713 Ok(Ok(_)) => {
714 info!("Healthcheck passed.");
715 Ok(TaskOutput::Healthcheck)
716 }
717 Ok(Err(error)) => {
718 error!(
719 msg = "Healthcheck failed.",
720 %error,
721 component_kind = "sink",
722 component_type = typetag,
723 component_id = %component_key.id(),
724 );
725 Err(TaskError::wrapped(error))
726 }
727 Err(e) => {
728 error!(
729 msg = "Healthcheck timed out.",
730 component_kind = "sink",
731 component_type = typetag,
732 component_id = %component_key.id(),
733 );
734 Err(TaskError::wrapped(Box::new(e)))
735 }
736 })
737 .await
738 } else {
739 info!("Healthcheck disabled.");
740 Ok(TaskOutput::Healthcheck)
741 }
742 };
743
744 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
745
746 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
747 self.healthchecks.insert(key.clone(), healthcheck_task);
748 self.tasks.insert(key.clone(), task);
749 self.detach_triggers.insert(key.clone(), trigger);
750 }
751
752 fn build_transform(
753 &self,
754 transform: Transform,
755 node: TransformNode,
756 input_rx: BufferReceiver<EventArray>,
757 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
758 match transform {
759 Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
761 Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
762 Transform::Task(t) => self.build_task_transform(
763 t,
764 input_rx,
765 node.input_details.data_type(),
766 node.typetag,
767 &node.key,
768 &node.outputs,
769 ),
770 }
771 }
772
773 fn build_sync_transform(
774 &self,
775 t: Box<dyn SyncTransform>,
776 node: TransformNode,
777 input_rx: BufferReceiver<EventArray>,
778 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
779 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
780
781 let sender = self
782 .utilization_registry
783 .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
784 let runner = Runner::new(
785 t,
786 input_rx,
787 sender,
788 node.input_details.data_type(),
789 outputs,
790 LatencyRecorder::new(self.config.global.latency_ewma_alpha),
791 );
792 let transform = if node.enable_concurrency {
793 runner.run_concurrently().boxed()
794 } else {
795 runner.run_inline().boxed()
796 };
797
798 let transform = async move {
799 debug!("Synchronous transform starting.");
800
801 match transform.await {
802 Ok(v) => {
803 debug!("Synchronous transform finished normally.");
804 Ok(v)
805 }
806 Err(e) => {
807 debug!("Synchronous transform finished with an error.");
808 Err(e)
809 }
810 }
811 };
812
813 let mut output_controls = HashMap::new();
814 for (name, control) in controls {
815 let id = name
816 .map(|name| OutputId::from((&node.key, name)))
817 .unwrap_or_else(|| OutputId::from(&node.key));
818 output_controls.insert(id, control);
819 }
820
821 let task = Task::new(node.key.clone(), node.typetag, transform);
822
823 (task, output_controls)
824 }
825
826 fn build_task_transform(
827 &self,
828 t: Box<dyn TaskTransform<EventArray>>,
829 input_rx: BufferReceiver<EventArray>,
830 input_type: DataType,
831 typetag: &str,
832 key: &ComponentKey,
833 outputs: &[TransformOutput],
834 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
835 let (mut fanout, control) = Fanout::new(key.clone());
836
837 let sender = self
838 .utilization_registry
839 .add_component(key.clone(), gauge!(GaugeName::Utilization));
840 let output_sender = sender.clone();
841 let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
842
843 let events_received = register!(EventsReceived);
844 let filtered = input_rx
845 .filter(move |events| ready(filter_events_type(events, input_type)))
846 .inspect(move |events| {
847 events_received.emit(CountByteSize(
848 events.len(),
849 events.estimated_json_encoded_size_of(),
850 ))
851 });
852 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
853 let output_id = Arc::new(OutputId {
854 component: key.clone(),
855 port: None,
856 });
857 let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
858
859 let schema_definition_map = outputs
861 .iter()
862 .find(|x| x.port.is_none())
863 .expect("output for default port required for task transforms")
864 .log_schema_definitions
865 .clone()
866 .into_iter()
867 .map(|(key, value)| (key, Arc::new(value)))
868 .collect();
869
870 let stream = t
871 .transform(Box::pin(filtered))
872 .map(move |mut events| {
873 for event in events.iter_events_mut() {
874 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
875 }
876 let now = Instant::now();
877 latency_recorder.on_send(&mut events, now);
878 (events, now)
879 })
880 .inspect(move |(events, _): &(EventArray, Instant)| {
881 events_sent.emit(CountByteSize(
882 events.len(),
883 events.estimated_json_encoded_size_of(),
884 ));
885 });
886 let stream = OutputUtilization::new(output_sender, stream);
887 let transform = async move {
888 debug!("Task transform starting.");
889
890 match fanout.send_stream(stream).await {
891 Ok(()) => {
892 debug!("Task transform finished normally.");
893 Ok(TaskOutput::Transform)
894 }
895 Err(e) => {
896 debug!("Task transform finished with an error.");
897 Err(TaskError::wrapped(e))
898 }
899 }
900 }
901 .boxed();
902
903 let mut outputs = HashMap::new();
904 outputs.insert(OutputId::from(key), control);
905
906 let task = Task::new(key.clone(), typetag, transform);
907
908 (task, outputs)
909 }
910}
911
912async fn run_source_output_pump(
913 mut rx: LimitedReceiver<SourceSenderItem>,
914 mut fanout: Fanout,
915 source: Arc<ComponentKey>,
916 source_type: &'static str,
917) -> TaskResult {
918 debug!("Source pump starting.");
919
920 let mut control_channel_open = true;
921 loop {
922 tokio::select! {
923 biased;
924 alive = fanout.recv_control_message(), if control_channel_open => {
928 control_channel_open = alive;
929 }
930 item = rx.next() => {
931 match item {
932 Some(SourceSenderItem { events: mut array, send_reference }) => {
933 let now = Instant::now();
941 array.for_each_metadata_mut(|metadata| {
942 metadata.set_source_id(Arc::clone(&source));
943 metadata.set_source_type(source_type);
944 metadata.set_last_transform_timestamp(now);
945 });
946 fanout
947 .send(array, Some(send_reference))
948 .await
949 .map_err(|e| {
950 debug!("Source pump finished with an error.");
951 TaskError::wrapped(e)
952 })?;
953 }
954 None => break,
955 }
956 }
957 }
958 }
959
960 debug!("Source pump finished normally.");
961 Ok(TaskOutput::Source)
962}
963
964pub async fn reload_enrichment_tables(config: &Config) {
965 let mut enrichment_tables = HashMap::new();
966 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
968 let table_name = name.to_string();
969 if ENRICHMENT_TABLES.needs_reload(&table_name) {
970 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
971
972 let mut table = match table_outer.inner.build(&config.global).await {
973 Ok(table) => table,
974 Err(error) => {
975 error!("Enrichment table \"{name}\" reload failed: {error}");
976 continue;
977 }
978 };
979
980 if let Some(indexes) = indexes {
981 for (case, index) in indexes {
982 match table
983 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
984 {
985 Ok(_) => (),
986 Err(error) => {
987 error!(
991 message = "Unable to add index to reloaded enrichment table.",
992 table = ?name.to_string(),
993 %error
994 );
995 continue 'tables;
996 }
997 }
998 }
999 }
1000
1001 enrichment_tables.insert(table_name, table);
1002 }
1003 }
1004
1005 ENRICHMENT_TABLES.load(enrichment_tables);
1006 ENRICHMENT_TABLES.finish_load();
1007}
1008
1009pub struct TopologyPieces {
1010 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
1011 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
1012 pub(super) tasks: HashMap<ComponentKey, Task>,
1013 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
1014 pub(super) healthchecks: HashMap<ComponentKey, Task>,
1015 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
1016 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
1017 pub(crate) metrics_storage: MetricsStorage,
1018 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
1019}
1020
1021pub struct TopologyPiecesBuilder<'a> {
1033 config: &'a Config,
1034 diff: &'a ConfigDiff,
1035 buffers: HashMap<ComponentKey, BuiltBuffer>,
1036 extra_context: ExtraContext,
1037 utilization_registry: Option<UtilizationRegistry>,
1038}
1039
1040impl<'a> TopologyPiecesBuilder<'a> {
1041 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1043 Self {
1044 config,
1045 diff,
1046 buffers: HashMap::new(),
1047 extra_context: ExtraContext::default(),
1048 utilization_registry: None,
1049 }
1050 }
1051
1052 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1054 self.buffers = buffers;
1055 self
1056 }
1057
1058 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1060 self.extra_context = extra_context;
1061 self
1062 }
1063
1064 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1066 self.utilization_registry = registry;
1067 self
1068 }
1069
1070 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1075 Builder::new(
1076 self.config,
1077 self.diff,
1078 self.buffers,
1079 self.extra_context,
1080 self.utilization_registry,
1081 )
1082 .build()
1083 .await
1084 }
1085
1086 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1091 match self.build().await {
1092 Err(errors) => {
1093 for error in errors {
1094 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1095 }
1096 None
1097 }
1098 Ok(new_pieces) => Some(new_pieces),
1099 }
1100 }
1101}
1102
1103impl TopologyPieces {
1104 pub async fn build_or_log_errors(
1105 config: &Config,
1106 diff: &ConfigDiff,
1107 buffers: HashMap<ComponentKey, BuiltBuffer>,
1108 extra_context: ExtraContext,
1109 utilization_registry: Option<UtilizationRegistry>,
1110 ) -> Option<Self> {
1111 TopologyPiecesBuilder::new(config, diff)
1112 .with_buffers(buffers)
1113 .with_extra_context(extra_context)
1114 .with_utilization_registry(utilization_registry)
1115 .build_or_log_errors()
1116 .await
1117 }
1118
1119 pub async fn build(
1121 config: &super::Config,
1122 diff: &ConfigDiff,
1123 buffers: HashMap<ComponentKey, BuiltBuffer>,
1124 extra_context: ExtraContext,
1125 utilization_registry: Option<UtilizationRegistry>,
1126 ) -> Result<Self, Vec<String>> {
1127 TopologyPiecesBuilder::new(config, diff)
1128 .with_buffers(buffers)
1129 .with_extra_context(extra_context)
1130 .with_utilization_registry(utilization_registry)
1131 .build()
1132 .await
1133 }
1134}
1135
1136const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1137 match events {
1138 EventArray::Logs(_) => data_type.contains(DataType::Log),
1139 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1140 EventArray::Traces(_) => data_type.contains(DataType::Trace),
1141 }
1142}
1143
1144#[derive(Debug, Clone)]
1145struct TransformNode {
1146 key: ComponentKey,
1147 typetag: &'static str,
1148 inputs: Inputs<OutputId>,
1149 input_details: Input,
1150 outputs: Vec<TransformOutput>,
1151 enable_concurrency: bool,
1152}
1153
1154impl TransformNode {
1155 pub fn from_parts(
1156 key: ComponentKey,
1157 context: &TransformContext,
1158 transform: &TransformOuter<OutputId>,
1159 schema_definition: &[(OutputId, Definition)],
1160 ) -> Self {
1161 Self {
1162 key,
1163 typetag: transform.inner.get_component_name(),
1164 inputs: transform.inputs.clone(),
1165 input_details: transform.inner.input(),
1166 outputs: transform.inner.outputs(context, schema_definition),
1167 enable_concurrency: transform.inner.enable_concurrency(),
1168 }
1169 }
1170}
1171
1172struct Runner {
1173 transform: Box<dyn SyncTransform>,
1174 input_rx: Option<BufferReceiver<EventArray>>,
1175 input_type: DataType,
1176 outputs: TransformOutputs,
1177 timer_tx: UtilizationComponentSender,
1178 latency_recorder: LatencyRecorder,
1179 events_received: Registered<EventsReceived>,
1180}
1181
1182impl Runner {
1183 fn new(
1184 transform: Box<dyn SyncTransform>,
1185 input_rx: BufferReceiver<EventArray>,
1186 timer_tx: UtilizationComponentSender,
1187 input_type: DataType,
1188 outputs: TransformOutputs,
1189 latency_recorder: LatencyRecorder,
1190 ) -> Self {
1191 Self {
1192 transform,
1193 input_rx: Some(input_rx),
1194 input_type,
1195 outputs,
1196 timer_tx,
1197 latency_recorder,
1198 events_received: register!(EventsReceived),
1199 }
1200 }
1201
1202 fn on_events_received(&mut self, events: &EventArray) {
1203 self.timer_tx.try_send_stop_wait();
1204
1205 self.events_received.emit(CountByteSize(
1206 events.len(),
1207 events.estimated_json_encoded_size_of(),
1208 ));
1209 }
1210
1211 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1212 self.timer_tx.try_send_start_wait();
1213 let now = Instant::now();
1214 outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1215 self.outputs.send(outputs_buf).await
1216 }
1217
1218 async fn run_inline(mut self) -> TaskResult {
1219 const INLINE_BATCH_SIZE: usize = 128;
1221
1222 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1223
1224 let mut input_rx = self
1225 .input_rx
1226 .take()
1227 .expect("can't run runner twice")
1228 .into_stream()
1229 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1230
1231 self.timer_tx.try_send_start_wait();
1232 while let Some(events) = input_rx.next().await {
1233 self.on_events_received(&events);
1234 self.transform.transform_all(events, &mut outputs_buf);
1235 self.send_outputs(&mut outputs_buf)
1236 .await
1237 .map_err(TaskError::wrapped)?;
1238 }
1239
1240 Ok(TaskOutput::Transform)
1241 }
1242
1243 async fn run_concurrently(mut self) -> TaskResult {
1244 let input_rx = self
1245 .input_rx
1246 .take()
1247 .expect("can't run runner twice")
1248 .into_stream()
1249 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1250
1251 let mut input_rx =
1252 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1253
1254 let mut in_flight = FuturesOrdered::new();
1255 let mut shutting_down = false;
1256
1257 self.timer_tx.try_send_start_wait();
1258 loop {
1259 tokio::select! {
1260 biased;
1261
1262 result = in_flight.next(), if !in_flight.is_empty() => {
1263 match result {
1264 Some(Ok(mut outputs_buf)) => {
1265 self.send_outputs(&mut outputs_buf).await
1266 .map_err(TaskError::wrapped)?;
1267 }
1268 _ => unreachable!("join error or bad poll"),
1269 }
1270 }
1271
1272 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1273 match input_arrays {
1274 Some(input_arrays) => {
1275 let mut len = 0;
1276 for events in &input_arrays {
1277 self.on_events_received(events);
1278 len += events.len();
1279 }
1280
1281 let mut t = self.transform.clone();
1282 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1283 let task = tokio::spawn(async move {
1284 for events in input_arrays {
1285 t.transform_all(events, &mut outputs_buf);
1286 }
1287 outputs_buf
1288 }.in_current_span());
1289 in_flight.push_back(task);
1290 }
1291 None => {
1292 shutting_down = true;
1293 continue
1294 }
1295 }
1296 }
1297
1298 else => {
1299 if shutting_down {
1300 break
1301 }
1302 }
1303 }
1304 }
1305
1306 Ok(TaskOutput::Transform)
1307 }
1308}