1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{
11 config::ComponentKey,
12 event::{EventArray, EventContainer},
13};
14
15pub enum ControlMessage {
16 Add(ComponentKey, BufferSender<EventArray>),
18
19 Remove(ComponentKey),
21
22 Pause(ComponentKey),
27
28 Replace(ComponentKey, BufferSender<EventArray>),
30}
31
32impl fmt::Debug for ControlMessage {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "ControlMessage::")?;
35 match self {
36 Self::Add(id, _) => write!(f, "Add({id:?})"),
37 Self::Remove(id) => write!(f, "Remove({id:?})"),
38 Self::Pause(id) => write!(f, "Pause({id:?})"),
39 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
40 }
41 }
42}
43
44pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
47
48pub struct Fanout {
49 senders: IndexMap<ComponentKey, Option<Sender>>,
50 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
51 upstream_component: ComponentKey,
52}
53
54impl Fanout {
55 pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) {
56 let (control_tx, control_rx) = mpsc::unbounded_channel();
57
58 let fanout = Self {
59 senders: Default::default(),
60 control_channel: control_rx,
61 upstream_component,
62 };
63
64 (fanout, control_tx)
65 }
66
67 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
73 assert!(
74 !self.senders.contains_key(&id),
75 "Adding duplicate output id to fanout: {id}"
76 );
77 self.senders.insert(id, Some(Sender::new(sink)));
78 }
79
80 fn remove(&mut self, id: &ComponentKey) {
81 assert!(
82 self.senders.shift_remove(id).is_some(),
83 "Removing nonexistent sink from fanout: {id}"
84 );
85 }
86
87 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
88 match self.senders.get_mut(id) {
89 Some(sender) => {
90 assert!(
94 sender.replace(Sender::new(sink)).is_none(),
95 "Replacing existing sink is not valid: {id}"
96 );
97 }
98 None => panic!("Replacing unknown sink from fanout: {id}"),
99 }
100 }
101
102 fn pause(&mut self, id: &ComponentKey) {
103 match self.senders.get_mut(id) {
104 Some(sender) => {
105 assert!(
108 sender.take().is_some(),
109 "Pausing nonexistent sink is not valid: {id}"
110 );
111 }
112 None => panic!("Pausing unknown sink from fanout: {id}"),
113 }
114 }
115
116 pub async fn recv_control_message(&mut self) -> bool {
121 match self.control_channel.recv().await {
122 Some(msg) => {
123 self.apply_control_message(msg);
124 true
125 }
126 None => false,
127 }
128 }
129
130 fn apply_control_message(&mut self, message: ControlMessage) {
134 trace!("Processing control message outside of send: {:?}", message);
135
136 match message {
137 ControlMessage::Add(id, sink) => self.add(id, sink),
138 ControlMessage::Remove(id) => self.remove(&id),
139 ControlMessage::Pause(id) => self.pause(&id),
140 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
141 }
142 }
143
144 async fn wait_for_replacements(&mut self) {
149 while self.senders.values().any(Option::is_none) {
150 if let Some(msg) = self.control_channel.recv().await {
151 self.apply_control_message(msg);
152 } else {
153 }
171 }
172 }
173
174 pub async fn send_stream(
190 &mut self,
191 events: impl Stream<Item = (EventArray, Instant)>,
192 ) -> crate::Result<()> {
193 tokio::pin!(events);
194 while let Some((event_array, send_reference)) = events.next().await {
195 self.send(event_array, Some(send_reference)).await?;
196 }
197 Ok(())
198 }
199
200 pub async fn send(
216 &mut self,
217 events: EventArray,
218 send_reference: Option<Instant>,
219 ) -> crate::Result<()> {
220 while let Ok(message) = self.control_channel.try_recv() {
222 self.apply_control_message(message);
223 }
224
225 self.wait_for_replacements().await;
227
228 debug_assert!(
232 !events.is_empty(),
233 "Fanout received empty event batch from upstream component '{}'",
234 self.upstream_component,
235 );
236 #[cfg(not(debug_assertions))]
240 if events.is_empty() {
241 warn!(
242 message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.",
243 component_id = %self.upstream_component,
244 downstream_count = self.senders.len(),
245 );
246 return Ok(());
247 }
248
249 if self.senders.is_empty() {
251 trace!("No senders present.");
252 return Ok(());
253 }
254
255 let mut control_channel_open = true;
262
263 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
266
267 loop {
268 tokio::select! {
269 biased;
273
274 maybe_msg = self.control_channel.recv(), if control_channel_open => {
275 trace!("Processing control message inside of send: {:?}", maybe_msg);
276
277 match maybe_msg {
280 Some(ControlMessage::Add(id, sink)) => {
281 send_group.add(id, sink);
282 },
283 Some(ControlMessage::Remove(id)) => {
284 send_group.remove(&id);
285 },
286 Some(ControlMessage::Pause(id)) => {
287 send_group.pause(&id);
288 },
289 Some(ControlMessage::Replace(id, sink)) => {
290 send_group.replace(&id, Sender::new(sink));
291 },
292 None => {
293 control_channel_open = false;
295 }
296 }
297 }
298
299 result = send_group.send() => match result {
300 Ok(()) => {
301 trace!("Sent item to fanout.");
302 break;
303 },
304 Err(e) => return Err(e),
305 }
306 }
307 }
308
309 Ok(())
310 }
311}
312
313struct SendGroup<'a> {
314 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
315 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
316}
317
318impl<'a> SendGroup<'a> {
319 fn new(
320 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
321 events: EventArray,
322 send_reference: Option<Instant>,
323 ) -> Self {
324 debug_assert!(senders.values().all(Option::is_some));
327
328 let last_sender_idx = senders.len().saturating_sub(1);
329 let mut events = Some(events);
330
331 let mut sends = HashMap::new();
334 for (i, (key, sender)) in senders.iter_mut().enumerate() {
335 let mut sender = sender
336 .take()
337 .expect("sender must be present to initialize SendGroup");
338
339 if i == last_sender_idx {
341 sender.input = events.take();
342 } else {
343 sender.input.clone_from(&events);
344 }
345 sender.send_reference = send_reference;
346
347 let send = async move {
349 sender.flush().await?;
350 Ok(sender)
351 };
352
353 sends.insert(key.clone(), ReusableBoxFuture::new(send));
354 }
355
356 Self { senders, sends }
357 }
358
359 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
360 if let Some(send) = self.sends.remove(id) {
361 tokio::spawn(async move {
362 if let Err(e) = send.await {
363 warn!(
364 cause = %e,
365 message = "Encountered error writing to component after detaching from topology.",
366 );
367 }
368 });
369 true
370 } else {
371 false
372 }
373 }
374
375 #[allow(clippy::needless_pass_by_value)]
376 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
377 assert!(
380 self.senders
381 .insert(id.clone(), Some(Sender::new(sink)))
382 .is_none(),
383 "Adding duplicate output id to fanout: {id}"
384 );
385 }
386
387 fn remove(&mut self, id: &ComponentKey) {
388 assert!(
392 self.senders.shift_remove(id).is_some(),
393 "Removing nonexistent sink from fanout: {id}"
394 );
395
396 self.try_detach_send(id);
401 }
402
403 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
404 match self.senders.get_mut(id) {
405 Some(sender) => {
406 assert!(
410 sender.replace(sink).is_none(),
411 "Replacing existing sink is not valid: {id}"
412 );
413 }
414 None => panic!("Replacing unknown sink from fanout: {id}"),
415 }
416 }
417
418 fn pause(&mut self, id: &ComponentKey) {
419 match self.senders.get_mut(id) {
420 Some(sender) => {
421 if sender.take().is_none() {
428 assert!(
429 self.try_detach_send(id),
430 "Pausing already-paused sink is invalid: {id}"
431 );
432 }
433 }
434 None => panic!("Pausing unknown sink from fanout: {id}"),
435 }
436 }
437
438 async fn send(&mut self) -> crate::Result<()> {
439 loop {
443 if self.sends.is_empty() {
444 break;
445 }
446
447 let mut done = Vec::new();
448 for (key, send) in &mut self.sends {
449 if let Poll::Ready(result) = poll!(send.get_pin()) {
450 let sender = result?;
451
452 done.push((key.clone(), sender));
455 }
456 }
457
458 for (key, sender) in done {
459 self.sends.remove(&key);
460 self.replace(&key, sender);
461 }
462
463 if !self.sends.is_empty() {
464 pending!();
468 }
469 }
470
471 Ok(())
472 }
473}
474
475struct Sender {
476 inner: BufferSender<EventArray>,
477 input: Option<EventArray>,
478 send_reference: Option<Instant>,
479}
480
481impl Sender {
482 fn new(inner: BufferSender<EventArray>) -> Self {
483 Self {
484 inner,
485 input: None,
486 send_reference: None,
487 }
488 }
489
490 async fn flush(&mut self) -> crate::Result<()> {
491 let send_reference = self.send_reference.take();
492 if let Some(input) = self.input.take() {
493 self.inner.send(input, send_reference).await?;
494 self.inner.flush().await?;
495 }
496
497 Ok(())
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use std::{mem, num::NonZeroUsize};
504
505 use futures::poll;
506 use tokio::sync::mpsc::UnboundedSender;
507 use tokio_test::{assert_pending, assert_ready, task::spawn};
508 use tracing::Span;
509 use vector_buffers::{
510 WhenFull,
511 topology::{
512 builder::TopologyBuilder,
513 channel::{BufferReceiver, BufferSender},
514 },
515 };
516 use vrl::value::Value;
517
518 use super::{ControlMessage, Fanout};
519 use crate::{
520 config::ComponentKey,
521 event::{Event, EventArray, EventContainer, LogEvent},
522 test_util::{collect_ready, collect_ready_events},
523 };
524
525 fn build_sender_pair(
526 capacity: usize,
527 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
528 TopologyBuilder::standalone_memory(
529 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
530 WhenFull::Block,
531 &Span::current(),
532 None,
533 None,
534 )
535 }
536
537 fn build_sender_pairs(
538 capacities: &[usize],
539 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
540 let mut pairs = Vec::new();
541 for capacity in capacities {
542 pairs.push(build_sender_pair(*capacity));
543 }
544 pairs
545 }
546
547 fn fanout_from_senders(
548 capacities: &[usize],
549 ) -> (
550 Fanout,
551 UnboundedSender<ControlMessage>,
552 Vec<BufferReceiver<EventArray>>,
553 ) {
554 let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream"));
555 let pairs = build_sender_pairs(capacities);
556
557 let mut receivers = Vec::new();
558 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
559 fanout.add(ComponentKey::from(i.to_string()), sender);
560 receivers.push(receiver);
561 }
562
563 (fanout, control, receivers)
564 }
565
566 fn add_sender_to_fanout(
567 fanout: &mut Fanout,
568 receivers: &mut Vec<BufferReceiver<EventArray>>,
569 sender_id: usize,
570 capacity: usize,
571 ) {
572 let (sender, receiver) = build_sender_pair(capacity);
573 receivers.push(receiver);
574
575 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
576 }
577
578 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
579 control
580 .send(ControlMessage::Remove(ComponentKey::from(
581 sender_id.to_string(),
582 )))
583 .expect("sending control message should not fail");
584 }
585
586 fn replace_sender_in_fanout(
587 control: &UnboundedSender<ControlMessage>,
588 receivers: &mut [BufferReceiver<EventArray>],
589 sender_id: usize,
590 capacity: usize,
591 ) -> BufferReceiver<EventArray> {
592 let (sender, receiver) = build_sender_pair(capacity);
593 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
594
595 control
596 .send(ControlMessage::Pause(ComponentKey::from(
597 sender_id.to_string(),
598 )))
599 .expect("sending control message should not fail");
600
601 control
602 .send(ControlMessage::Replace(
603 ComponentKey::from(sender_id.to_string()),
604 sender,
605 ))
606 .expect("sending control message should not fail");
607
608 old_receiver
609 }
610
611 fn start_sender_replace(
612 control: &UnboundedSender<ControlMessage>,
613 receivers: &mut [BufferReceiver<EventArray>],
614 sender_id: usize,
615 capacity: usize,
616 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
617 let (sender, receiver) = build_sender_pair(capacity);
618 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
619
620 control
621 .send(ControlMessage::Pause(ComponentKey::from(
622 sender_id.to_string(),
623 )))
624 .expect("sending control message should not fail");
625
626 (old_receiver, sender)
627 }
628
629 fn finish_sender_resume(
630 control: &UnboundedSender<ControlMessage>,
631 sender_id: usize,
632 sender: BufferSender<EventArray>,
633 ) {
634 control
635 .send(ControlMessage::Replace(
636 ComponentKey::from(sender_id.to_string()),
637 sender,
638 ))
639 .expect("sending control message should not fail");
640 }
641
642 fn unwrap_log_event_message<E>(event: E) -> String
643 where
644 E: EventContainer,
645 {
646 let event = event
647 .into_events()
648 .next()
649 .expect("must have at least one event");
650 let event = event.into_log();
651 event
652 .get("message")
653 .and_then(Value::as_bytes)
654 .and_then(|b| String::from_utf8(b.to_vec()).ok())
655 .expect("must be valid log event with `message` field")
656 }
657
658 #[tokio::test]
659 async fn fanout_writes_to_all() {
660 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
661 let events = make_event_array(2);
662
663 let clones = events.clone();
664 fanout.send(clones, None).await.expect("should not fail");
665
666 for receiver in receivers {
667 assert_eq!(
668 collect_ready(receiver.into_stream()),
669 std::slice::from_ref(&events)
670 );
671 }
672 }
673
674 #[tokio::test]
675 async fn fanout_notready() {
676 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
677 let events = make_events(2);
678
679 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
681 assert_ready!(first_send.poll()).expect("should not fail");
682 drop(first_send);
683
684 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
686 assert_pending!(second_send.poll());
687
688 for receiver in &mut receivers {
690 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
691 }
692
693 assert_ready!(second_send.poll()).expect("should not fail");
695 drop(second_send);
696
697 for receiver in &mut receivers {
699 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
700 }
701 }
702
703 #[tokio::test]
704 async fn fanout_grow() {
705 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
706 let events = make_events(3);
707
708 fanout
710 .send(events[0].clone().into(), None)
711 .await
712 .expect("should not fail");
713 fanout
714 .send(events[1].clone().into(), None)
715 .await
716 .expect("should not fail");
717
718 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
720
721 fanout
723 .send(events[2].clone().into(), None)
724 .await
725 .expect("should not fail");
726
727 let expected_events = [&events, &events, &events[2..]];
730 for (i, receiver) in receivers.into_iter().enumerate() {
731 assert_eq!(
732 collect_ready_events(receiver.into_stream()),
733 expected_events[i]
734 );
735 }
736 }
737
738 #[tokio::test]
739 async fn fanout_shrink() {
740 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
741 let events = make_events(3);
742
743 fanout
745 .send(events[0].clone().into(), None)
746 .await
747 .expect("should not fail");
748 fanout
749 .send(events[1].clone().into(), None)
750 .await
751 .expect("should not fail");
752
753 remove_sender_from_fanout(&control, 1);
755
756 fanout
758 .send(events[2].clone().into(), None)
759 .await
760 .expect("should not fail");
761
762 let expected_events = [&events, &events[..2]];
764 for (i, receiver) in receivers.into_iter().enumerate() {
765 assert_eq!(
766 collect_ready_events(receiver.into_stream()),
767 expected_events[i]
768 );
769 }
770 }
771
772 #[tokio::test]
773 async fn fanout_shrink_when_notready() {
774 let events = make_events(2);
780 let expected_first_event = unwrap_log_event_message(events[0].clone());
781 let expected_second_event = unwrap_log_event_message(events[1].clone());
782
783 let cases = [
784 (
787 0,
788 false,
789 [
790 expected_second_event.clone(),
791 expected_first_event.clone(),
792 expected_second_event.clone(),
793 ],
794 ),
795 (
796 1,
797 true,
798 [
799 expected_second_event.clone(),
800 expected_second_event.clone(),
801 expected_second_event.clone(),
802 ],
803 ),
804 (
805 2,
806 false,
807 [
808 expected_second_event.clone(),
809 expected_first_event.clone(),
810 expected_second_event.clone(),
811 ],
812 ),
813 ];
814
815 for (sender_id, should_complete, expected_last_seen) in cases {
816 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
817
818 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
820 assert_ready!(first_send.poll()).expect("should not fail");
821 drop(first_send);
822
823 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
825 assert_pending!(second_send.poll());
826
827 remove_sender_from_fanout(&control, sender_id);
829
830 if should_complete {
831 assert_ready!(second_send.poll()).expect("should not fail");
832 } else {
833 assert_pending!(second_send.poll());
834 }
835 drop(second_send);
836
837 drop(fanout);
839
840 let mut last_seen = Vec::new();
841 for receiver in &mut receivers {
842 let mut events = Vec::new();
843 while let Some(event) = receiver.next().await {
844 events.insert(0, event);
845 }
846
847 last_seen.push(unwrap_log_event_message(events.remove(0)));
848 }
849
850 assert_eq!(&expected_last_seen[..], &last_seen);
851 }
852 }
853
854 #[tokio::test]
855 async fn fanout_no_sinks() {
856 let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream"));
857 let events = make_events(2);
858
859 fanout
860 .send(events[0].clone().into(), None)
861 .await
862 .expect("should not fail");
863 fanout
864 .send(events[1].clone().into(), None)
865 .await
866 .expect("should not fail");
867 }
868
869 #[tokio::test]
870 #[should_panic(expected = "Fanout received empty event batch from upstream component")]
871 async fn fanout_panics_on_empty_event_array_in_debug_builds() {
872 let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]);
873 let empty: EventArray = Vec::<LogEvent>::new().into();
874
875 let _ = fanout.send(empty, None).await;
876 }
877
878 #[tokio::test]
879 async fn fanout_replace() {
880 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
881 let events = make_events(3);
882
883 fanout
885 .send(events[0].clone().into(), None)
886 .await
887 .expect("should not fail");
888 fanout
889 .send(events[1].clone().into(), None)
890 .await
891 .expect("should not fail");
892
893 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
895
896 fanout
898 .send(events[2].clone().into(), None)
899 .await
900 .expect("should not fail");
901
902 let expected_events = [&events[2..], &events, &events];
905 for (i, receiver) in receivers.into_iter().enumerate() {
906 assert_eq!(
907 collect_ready_events(receiver.into_stream()),
908 expected_events[i]
909 );
910 }
911
912 assert_eq!(
914 collect_ready_events(old_first_receiver.into_stream()),
915 &events[..2]
916 );
917 }
918
919 #[tokio::test]
920 async fn fanout_wait() {
921 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
922 let events = make_events(3);
923
924 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
926 assert_ready!(poll!(send1)).expect("should not fail");
927 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
928 assert_ready!(poll!(send2)).expect("should not fail");
929
930 let (old_first_receiver, new_first_sender) =
934 start_sender_replace(&control, &mut receivers, 0, 4);
935
936 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
938 assert_pending!(third_send.poll());
939
940 finish_sender_resume(&control, 0, new_first_sender);
943 assert!(third_send.is_woken());
944 assert_ready!(third_send.poll()).expect("should not fail");
945
946 assert_eq!(
949 collect_ready_events(old_first_receiver.into_stream()),
950 &events[0..2]
951 );
952
953 let expected_events = [&events[2..], &events];
954 for (i, receiver) in receivers.into_iter().enumerate() {
955 assert_eq!(
956 collect_ready_events(receiver.into_stream()),
957 expected_events[i]
958 );
959 }
960 }
961
962 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
963 (0..count).map(|i| LogEvent::from(format!("line {i}")))
964 }
965
966 fn make_events(count: usize) -> Vec<Event> {
967 make_events_inner(count).map(Into::into).collect()
968 }
969
970 fn make_event_array(count: usize) -> EventArray {
971 make_events_inner(count).collect::<Vec<_>>().into()
972 }
973}