vector_core/
fanout.rs

1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{
11    config::ComponentKey,
12    event::{EventArray, EventContainer},
13};
14
15pub enum ControlMessage {
16    /// Adds a new sink to the fanout.
17    Add(ComponentKey, BufferSender<EventArray>),
18
19    /// Removes a sink from the fanout.
20    Remove(ComponentKey),
21
22    /// Pauses a sink in the fanout.
23    ///
24    /// If a fanout has any paused sinks, subsequent sends cannot proceed until all paused sinks
25    /// have been replaced.
26    Pause(ComponentKey),
27
28    /// Replaces a paused sink with its new sender.
29    Replace(ComponentKey, BufferSender<EventArray>),
30}
31
32impl fmt::Debug for ControlMessage {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        write!(f, "ControlMessage::")?;
35        match self {
36            Self::Add(id, _) => write!(f, "Add({id:?})"),
37            Self::Remove(id) => write!(f, "Remove({id:?})"),
38            Self::Pause(id) => write!(f, "Pause({id:?})"),
39            Self::Replace(id, _) => write!(f, "Replace({id:?})"),
40        }
41    }
42}
43
44// TODO: We should really wrap this in a custom type that has dedicated methods for each operation
45// so that high-lever components don't need to do the raw channel sends, etc.
46pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
47
48pub struct Fanout {
49    senders: IndexMap<ComponentKey, Option<Sender>>,
50    control_channel: mpsc::UnboundedReceiver<ControlMessage>,
51    upstream_component: ComponentKey,
52}
53
54impl Fanout {
55    pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) {
56        let (control_tx, control_rx) = mpsc::unbounded_channel();
57
58        let fanout = Self {
59            senders: Default::default(),
60            control_channel: control_rx,
61            upstream_component,
62        };
63
64        (fanout, control_tx)
65    }
66
67    /// Add a new sink as an output.
68    ///
69    /// # Panics
70    ///
71    /// Function will panic if a sink with the same ID is already present.
72    pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
73        assert!(
74            !self.senders.contains_key(&id),
75            "Adding duplicate output id to fanout: {id}"
76        );
77        self.senders.insert(id, Some(Sender::new(sink)));
78    }
79
80    fn remove(&mut self, id: &ComponentKey) {
81        assert!(
82            self.senders.shift_remove(id).is_some(),
83            "Removing nonexistent sink from fanout: {id}"
84        );
85    }
86
87    fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
88        match self.senders.get_mut(id) {
89            Some(sender) => {
90                // While a sink must be _known_ to be replaced, it must also be empty (previously
91                // paused or consumed when the `SendGroup` was created), otherwise an invalid
92                // sequence of control operations has been applied.
93                assert!(
94                    sender.replace(Sender::new(sink)).is_none(),
95                    "Replacing existing sink is not valid: {id}"
96                );
97            }
98            None => panic!("Replacing unknown sink from fanout: {id}"),
99        }
100    }
101
102    fn pause(&mut self, id: &ComponentKey) {
103        match self.senders.get_mut(id) {
104            Some(sender) => {
105                // A sink must be known and present to be replaced, otherwise an invalid sequence of
106                // control operations has been applied.
107                assert!(
108                    sender.take().is_some(),
109                    "Pausing nonexistent sink is not valid: {id}"
110                );
111            }
112            None => panic!("Pausing unknown sink from fanout: {id}"),
113        }
114    }
115
116    /// Waits for the next control message and applies it.
117    ///
118    /// Returns `true` if a message was processed, `false` if the control
119    /// channel was closed.
120    pub async fn recv_control_message(&mut self) -> bool {
121        match self.control_channel.recv().await {
122            Some(msg) => {
123                self.apply_control_message(msg);
124                true
125            }
126            None => false,
127        }
128    }
129
130    /// Apply a control message directly against this instance.
131    ///
132    /// This method should not be used if there is an active `SendGroup` being processed.
133    fn apply_control_message(&mut self, message: ControlMessage) {
134        trace!("Processing control message outside of send: {:?}", message);
135
136        match message {
137            ControlMessage::Add(id, sink) => self.add(id, sink),
138            ControlMessage::Remove(id) => self.remove(&id),
139            ControlMessage::Pause(id) => self.pause(&id),
140            ControlMessage::Replace(id, sink) => self.replace(&id, sink),
141        }
142    }
143
144    /// Waits for all paused sinks to be replaced.
145    ///
146    /// Control messages are processed until all senders have been replaced, so it is guaranteed
147    /// that when this method returns, all senders are ready for the next send to be triggered.
148    async fn wait_for_replacements(&mut self) {
149        while self.senders.values().any(Option::is_none) {
150            if let Some(msg) = self.control_channel.recv().await {
151                self.apply_control_message(msg);
152            } else {
153                // If the control channel is closed, there's nothing else we can do.
154
155                // TODO: It _seems_ like we should probably panic here, or at least return.
156                //
157                // Essentially, we should only land here if the control channel is closed but we
158                // haven't yet replaced all of the paused sinks... and we shouldn't have any paused
159                // sinks if Vector is stopping normally/gracefully, so like... we'd only get
160                // here during a configuration reload where we panicked in another thread due to
161                // an error of some sort, and the control channel got dropped, closed itself, and
162                // we're never going to be able to recover.
163                //
164                // The flipside is that by leaving it as-is, in the above hypothesized scenario,
165                // we'd avoid emitting additional panics/error logging when the root cause error was
166                // already doing so, like there's little value in knowing the fanout also hit an
167                // unrecoverable state if the whole process is about to come crashing down
168                // anyways... but it still does feel weird to have that encoded here by virtue of
169                // only a comment, and not an actual terminating expression. *shrug*
170            }
171        }
172    }
173
174    /// Send a stream of events to all connected sinks.
175    ///
176    /// This function will send events until the provided stream finishes. It will also block on the
177    /// resolution of any pending reload before proceeding with a send operation, similar to `send`.
178    ///
179    /// # Panics
180    ///
181    /// This method can panic if the fanout receives a control message that violates some invariant
182    /// about its current state (e.g. remove a nonexistent sink, etc.). This would imply a bug in
183    /// Vector's config reloading logic.
184    ///
185    /// # Errors
186    ///
187    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
188    /// returned detailing the cause.
189    pub async fn send_stream(
190        &mut self,
191        events: impl Stream<Item = (EventArray, Instant)>,
192    ) -> crate::Result<()> {
193        tokio::pin!(events);
194        while let Some((event_array, send_reference)) = events.next().await {
195            self.send(event_array, Some(send_reference)).await?;
196        }
197        Ok(())
198    }
199
200    /// Send a batch of events to all connected sinks.
201    ///
202    /// This will block on the resolution of any pending reload before proceeding with the send
203    /// operation.
204    ///
205    /// # Panics
206    ///
207    /// This method can panic if the fanout receives a control message that violates some invariant
208    /// about its current state (e.g. remove a nonexistent sink, etc). This would imply a bug in
209    /// Vector's config reloading logic.
210    ///
211    /// # Errors
212    ///
213    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
214    /// returned detailing the cause.
215    pub async fn send(
216        &mut self,
217        events: EventArray,
218        send_reference: Option<Instant>,
219    ) -> crate::Result<()> {
220        // First, process any available control messages in a non-blocking fashion.
221        while let Ok(message) = self.control_channel.try_recv() {
222            self.apply_control_message(message);
223        }
224
225        // Wait for any senders that are paused to be replaced first before continuing with the send.
226        self.wait_for_replacements().await;
227
228        // Drop empty event batches before they reach any downstream buffer, this is technically
229        // programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail
230        // loudly.
231        debug_assert!(
232            !events.is_empty(),
233            "Fanout received empty event batch from upstream component '{}'",
234            self.upstream_component,
235        );
236        // TODO: Wrap the conditional below with `std::hint::unlikely` once it stabilizes. This is an
237        // applicable situation to use it in since the following conditional should never evaluate to
238        // true.
239        #[cfg(not(debug_assertions))]
240        if events.is_empty() {
241            warn!(
242                message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.",
243                component_id = %self.upstream_component,
244                downstream_count = self.senders.len(),
245            );
246            return Ok(());
247        }
248
249        // Nothing to send if we have no sender.
250        if self.senders.is_empty() {
251            trace!("No senders present.");
252            return Ok(());
253        }
254
255        // Keep track of whether the control channel has returned `Ready(None)`, and stop polling
256        // it once it has. If we don't do this check, it will continue to return `Ready(None)` any
257        // time it is polled, which can lead to a busy loop below.
258        //
259        // In real life this is likely a non-issue, but it can lead to strange behavior in tests if
260        // left unhandled.
261        let mut control_channel_open = true;
262
263        // Create our send group which arms all senders to send the given events, and handles
264        // adding/removing/replacing senders while the send is in-flight.
265        let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
266
267        loop {
268            tokio::select! {
269                // Semantically, it's not hugely important that this select is biased. It does,
270                // however, make testing simpler when you can count on control messages being
271                // processed first.
272                biased;
273
274                maybe_msg = self.control_channel.recv(), if control_channel_open => {
275                    trace!("Processing control message inside of send: {:?}", maybe_msg);
276
277                    // During a send operation, control messages must be applied via the
278                    // `SendGroup`, since it has exclusive access to the senders.
279                    match maybe_msg {
280                        Some(ControlMessage::Add(id, sink)) => {
281                            send_group.add(id, sink);
282                        },
283                        Some(ControlMessage::Remove(id)) => {
284                            send_group.remove(&id);
285                        },
286                        Some(ControlMessage::Pause(id)) => {
287                            send_group.pause(&id);
288                        },
289                        Some(ControlMessage::Replace(id, sink)) => {
290                            send_group.replace(&id, Sender::new(sink));
291                        },
292                        None => {
293                            // Control channel is closed, which means Vector is shutting down.
294                            control_channel_open = false;
295                        }
296                    }
297                }
298
299                result = send_group.send() => match result {
300                    Ok(()) => {
301                        trace!("Sent item to fanout.");
302                        break;
303                    },
304                    Err(e) => return Err(e),
305                }
306            }
307        }
308
309        Ok(())
310    }
311}
312
313struct SendGroup<'a> {
314    senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
315    sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
316}
317
318impl<'a> SendGroup<'a> {
319    fn new(
320        senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
321        events: EventArray,
322        send_reference: Option<Instant>,
323    ) -> Self {
324        // If we don't have a valid `Sender` for all sinks, then something went wrong in our logic
325        // to ensure we were starting with all valid/idle senders prior to initiating the send.
326        debug_assert!(senders.values().all(Option::is_some));
327
328        let last_sender_idx = senders.len().saturating_sub(1);
329        let mut events = Some(events);
330
331        // We generate a send future for each sender we have, which arms them with the events to
332        // send but also takes ownership of the sender itself, which we give back when the sender completes.
333        let mut sends = HashMap::new();
334        for (i, (key, sender)) in senders.iter_mut().enumerate() {
335            let mut sender = sender
336                .take()
337                .expect("sender must be present to initialize SendGroup");
338
339            // First, arm each sender with the item to actually send.
340            if i == last_sender_idx {
341                sender.input = events.take();
342            } else {
343                sender.input.clone_from(&events);
344            }
345            sender.send_reference = send_reference;
346
347            // Now generate a send for that sender which we'll drive to completion.
348            let send = async move {
349                sender.flush().await?;
350                Ok(sender)
351            };
352
353            sends.insert(key.clone(), ReusableBoxFuture::new(send));
354        }
355
356        Self { senders, sends }
357    }
358
359    fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
360        if let Some(send) = self.sends.remove(id) {
361            tokio::spawn(async move {
362                if let Err(e) = send.await {
363                    warn!(
364                        cause = %e,
365                        message = "Encountered error writing to component after detaching from topology.",
366                    );
367                }
368            });
369            true
370        } else {
371            false
372        }
373    }
374
375    #[allow(clippy::needless_pass_by_value)]
376    fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
377        // When we're in the middle of a send, we can only keep track of the new sink, but can't
378        // actually send to it, as we don't have the item to send... so only add it to `senders`.
379        assert!(
380            self.senders
381                .insert(id.clone(), Some(Sender::new(sink)))
382                .is_none(),
383            "Adding duplicate output id to fanout: {id}"
384        );
385    }
386
387    fn remove(&mut self, id: &ComponentKey) {
388        // We may or may not be removing a sender that we're try to drive a send against, so we have
389        // to also detach the send future for the sender if it exists, otherwise we'd be hanging
390        // around still trying to send to it.
391        assert!(
392            self.senders.shift_remove(id).is_some(),
393            "Removing nonexistent sink from fanout: {id}"
394        );
395
396        // Now try and detach the in-flight send, if it exists.
397        //
398        // We don't ensure that a send was or wasn't detached because this could be called either
399        // during an in-flight send _or_ after the send has completed.
400        self.try_detach_send(id);
401    }
402
403    fn replace(&mut self, id: &ComponentKey, sink: Sender) {
404        match self.senders.get_mut(id) {
405            Some(sender) => {
406                // While a sink must be _known_ to be replaced, it must also be empty (previously
407                // paused or consumed when the `SendGroup` was created), otherwise an invalid
408                // sequence of control operations has been applied.
409                assert!(
410                    sender.replace(sink).is_none(),
411                    "Replacing existing sink is not valid: {id}"
412                );
413            }
414            None => panic!("Replacing unknown sink from fanout: {id}"),
415        }
416    }
417
418    fn pause(&mut self, id: &ComponentKey) {
419        match self.senders.get_mut(id) {
420            Some(sender) => {
421                // If we don't currently own the `Sender` for the given component, that implies
422                // there is an in-flight send: a `SendGroup` cannot be created without all
423                // participating components having a send operation triggered.
424                //
425                // As such, `try_detach_send` should always succeed here, as pausing only occurs
426                // when a component is being _replaced_, and should not be called multiple times.
427                if sender.take().is_none() {
428                    assert!(
429                        self.try_detach_send(id),
430                        "Pausing already-paused sink is invalid: {id}"
431                    );
432                }
433            }
434            None => panic!("Pausing unknown sink from fanout: {id}"),
435        }
436    }
437
438    async fn send(&mut self) -> crate::Result<()> {
439        // Right now, we do a linear scan of all sends, polling each send once in order to avoid
440        // waiting forever, such that we can let our control messages get picked up while sends are
441        // waiting.
442        loop {
443            if self.sends.is_empty() {
444                break;
445            }
446
447            let mut done = Vec::new();
448            for (key, send) in &mut self.sends {
449                if let Poll::Ready(result) = poll!(send.get_pin()) {
450                    let sender = result?;
451
452                    // The send completed, so we restore the sender and mark ourselves so that this
453                    // future gets dropped.
454                    done.push((key.clone(), sender));
455                }
456            }
457
458            for (key, sender) in done {
459                self.sends.remove(&key);
460                self.replace(&key, sender);
461            }
462
463            if !self.sends.is_empty() {
464                // We manually yield ourselves because we've polled all of the sends at this point,
465                // so if any are left, then we're scheduled for a wake-up... this is a really poor
466                // approximation of what `FuturesUnordered` is doing.
467                pending!();
468            }
469        }
470
471        Ok(())
472    }
473}
474
475struct Sender {
476    inner: BufferSender<EventArray>,
477    input: Option<EventArray>,
478    send_reference: Option<Instant>,
479}
480
481impl Sender {
482    fn new(inner: BufferSender<EventArray>) -> Self {
483        Self {
484            inner,
485            input: None,
486            send_reference: None,
487        }
488    }
489
490    async fn flush(&mut self) -> crate::Result<()> {
491        let send_reference = self.send_reference.take();
492        if let Some(input) = self.input.take() {
493            self.inner.send(input, send_reference).await?;
494            self.inner.flush().await?;
495        }
496
497        Ok(())
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use std::{mem, num::NonZeroUsize};
504
505    use futures::poll;
506    use tokio::sync::mpsc::UnboundedSender;
507    use tokio_test::{assert_pending, assert_ready, task::spawn};
508    use tracing::Span;
509    use vector_buffers::{
510        WhenFull,
511        topology::{
512            builder::TopologyBuilder,
513            channel::{BufferReceiver, BufferSender},
514        },
515    };
516    use vrl::value::Value;
517
518    use super::{ControlMessage, Fanout};
519    use crate::{
520        config::ComponentKey,
521        event::{Event, EventArray, EventContainer, LogEvent},
522        test_util::{collect_ready, collect_ready_events},
523    };
524
525    fn build_sender_pair(
526        capacity: usize,
527    ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
528        TopologyBuilder::standalone_memory(
529            NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
530            WhenFull::Block,
531            &Span::current(),
532            None,
533            None,
534        )
535    }
536
537    fn build_sender_pairs(
538        capacities: &[usize],
539    ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
540        let mut pairs = Vec::new();
541        for capacity in capacities {
542            pairs.push(build_sender_pair(*capacity));
543        }
544        pairs
545    }
546
547    fn fanout_from_senders(
548        capacities: &[usize],
549    ) -> (
550        Fanout,
551        UnboundedSender<ControlMessage>,
552        Vec<BufferReceiver<EventArray>>,
553    ) {
554        let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream"));
555        let pairs = build_sender_pairs(capacities);
556
557        let mut receivers = Vec::new();
558        for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
559            fanout.add(ComponentKey::from(i.to_string()), sender);
560            receivers.push(receiver);
561        }
562
563        (fanout, control, receivers)
564    }
565
566    fn add_sender_to_fanout(
567        fanout: &mut Fanout,
568        receivers: &mut Vec<BufferReceiver<EventArray>>,
569        sender_id: usize,
570        capacity: usize,
571    ) {
572        let (sender, receiver) = build_sender_pair(capacity);
573        receivers.push(receiver);
574
575        fanout.add(ComponentKey::from(sender_id.to_string()), sender);
576    }
577
578    fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
579        control
580            .send(ControlMessage::Remove(ComponentKey::from(
581                sender_id.to_string(),
582            )))
583            .expect("sending control message should not fail");
584    }
585
586    fn replace_sender_in_fanout(
587        control: &UnboundedSender<ControlMessage>,
588        receivers: &mut [BufferReceiver<EventArray>],
589        sender_id: usize,
590        capacity: usize,
591    ) -> BufferReceiver<EventArray> {
592        let (sender, receiver) = build_sender_pair(capacity);
593        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
594
595        control
596            .send(ControlMessage::Pause(ComponentKey::from(
597                sender_id.to_string(),
598            )))
599            .expect("sending control message should not fail");
600
601        control
602            .send(ControlMessage::Replace(
603                ComponentKey::from(sender_id.to_string()),
604                sender,
605            ))
606            .expect("sending control message should not fail");
607
608        old_receiver
609    }
610
611    fn start_sender_replace(
612        control: &UnboundedSender<ControlMessage>,
613        receivers: &mut [BufferReceiver<EventArray>],
614        sender_id: usize,
615        capacity: usize,
616    ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
617        let (sender, receiver) = build_sender_pair(capacity);
618        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
619
620        control
621            .send(ControlMessage::Pause(ComponentKey::from(
622                sender_id.to_string(),
623            )))
624            .expect("sending control message should not fail");
625
626        (old_receiver, sender)
627    }
628
629    fn finish_sender_resume(
630        control: &UnboundedSender<ControlMessage>,
631        sender_id: usize,
632        sender: BufferSender<EventArray>,
633    ) {
634        control
635            .send(ControlMessage::Replace(
636                ComponentKey::from(sender_id.to_string()),
637                sender,
638            ))
639            .expect("sending control message should not fail");
640    }
641
642    fn unwrap_log_event_message<E>(event: E) -> String
643    where
644        E: EventContainer,
645    {
646        let event = event
647            .into_events()
648            .next()
649            .expect("must have at least one event");
650        let event = event.into_log();
651        event
652            .get("message")
653            .and_then(Value::as_bytes)
654            .and_then(|b| String::from_utf8(b.to_vec()).ok())
655            .expect("must be valid log event with `message` field")
656    }
657
658    #[tokio::test]
659    async fn fanout_writes_to_all() {
660        let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
661        let events = make_event_array(2);
662
663        let clones = events.clone();
664        fanout.send(clones, None).await.expect("should not fail");
665
666        for receiver in receivers {
667            assert_eq!(
668                collect_ready(receiver.into_stream()),
669                std::slice::from_ref(&events)
670            );
671        }
672    }
673
674    #[tokio::test]
675    async fn fanout_notready() {
676        let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
677        let events = make_events(2);
678
679        // First send should immediately complete because all senders have capacity:
680        let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
681        assert_ready!(first_send.poll()).expect("should not fail");
682        drop(first_send);
683
684        // Second send should return pending because sender B is now full:
685        let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
686        assert_pending!(second_send.poll());
687
688        // Now read an item from each receiver to free up capacity for the second sender:
689        for receiver in &mut receivers {
690            assert_eq!(Some(events[0].clone().into()), receiver.next().await);
691        }
692
693        // Now our second send should actually be able to complete:
694        assert_ready!(second_send.poll()).expect("should not fail");
695        drop(second_send);
696
697        // And make sure the second item comes through:
698        for receiver in &mut receivers {
699            assert_eq!(Some(events[1].clone().into()), receiver.next().await);
700        }
701    }
702
703    #[tokio::test]
704    async fn fanout_grow() {
705        let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
706        let events = make_events(3);
707
708        // Send in the first two events to our initial two senders:
709        fanout
710            .send(events[0].clone().into(), None)
711            .await
712            .expect("should not fail");
713        fanout
714            .send(events[1].clone().into(), None)
715            .await
716            .expect("should not fail");
717
718        // Now add a third sender:
719        add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
720
721        // Send in the last event which all three senders will now get:
722        fanout
723            .send(events[2].clone().into(), None)
724            .await
725            .expect("should not fail");
726
727        // Make sure the first two senders got all three events, but the third sender only got the
728        // last event:
729        let expected_events = [&events, &events, &events[2..]];
730        for (i, receiver) in receivers.into_iter().enumerate() {
731            assert_eq!(
732                collect_ready_events(receiver.into_stream()),
733                expected_events[i]
734            );
735        }
736    }
737
738    #[tokio::test]
739    async fn fanout_shrink() {
740        let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
741        let events = make_events(3);
742
743        // Send in the first two events to our initial two senders:
744        fanout
745            .send(events[0].clone().into(), None)
746            .await
747            .expect("should not fail");
748        fanout
749            .send(events[1].clone().into(), None)
750            .await
751            .expect("should not fail");
752
753        // Now remove the second sender:
754        remove_sender_from_fanout(&control, 1);
755
756        // Send in the last event which only the first sender will get:
757        fanout
758            .send(events[2].clone().into(), None)
759            .await
760            .expect("should not fail");
761
762        // Make sure the first sender got all three events, but the second sender only got the first two:
763        let expected_events = [&events, &events[..2]];
764        for (i, receiver) in receivers.into_iter().enumerate() {
765            assert_eq!(
766                collect_ready_events(receiver.into_stream()),
767                expected_events[i]
768            );
769        }
770    }
771
772    #[tokio::test]
773    async fn fanout_shrink_when_notready() {
774        // This test exercises that when we're waiting for a send to complete, we can correctly
775        // remove a sink whether or not it is the one that the send operation is still waiting on.
776        //
777        // This means that if we remove a sink that a current send is blocked on, we should be able
778        // to immediately proceed.
779        let events = make_events(2);
780        let expected_first_event = unwrap_log_event_message(events[0].clone());
781        let expected_second_event = unwrap_log_event_message(events[1].clone());
782
783        let cases = [
784            // Sender ID to drop, whether the second send should succeed after dropping, and the
785            // final "last event" a receiver should see after the second send:
786            (
787                0,
788                false,
789                [
790                    expected_second_event.clone(),
791                    expected_first_event.clone(),
792                    expected_second_event.clone(),
793                ],
794            ),
795            (
796                1,
797                true,
798                [
799                    expected_second_event.clone(),
800                    expected_second_event.clone(),
801                    expected_second_event.clone(),
802                ],
803            ),
804            (
805                2,
806                false,
807                [
808                    expected_second_event.clone(),
809                    expected_first_event.clone(),
810                    expected_second_event.clone(),
811                ],
812            ),
813        ];
814
815        for (sender_id, should_complete, expected_last_seen) in cases {
816            let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
817
818            // First send should immediately complete because all senders have capacity:
819            let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
820            assert_ready!(first_send.poll()).expect("should not fail");
821            drop(first_send);
822
823            // Second send should return pending because sender B is now full:
824            let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
825            assert_pending!(second_send.poll());
826
827            // Now drop our chosen sender and assert that polling the second send behaves as expected:
828            remove_sender_from_fanout(&control, sender_id);
829
830            if should_complete {
831                assert_ready!(second_send.poll()).expect("should not fail");
832            } else {
833                assert_pending!(second_send.poll());
834            }
835            drop(second_send);
836
837            // Now grab the last value available to each receiver and assert it's the second event.
838            drop(fanout);
839
840            let mut last_seen = Vec::new();
841            for receiver in &mut receivers {
842                let mut events = Vec::new();
843                while let Some(event) = receiver.next().await {
844                    events.insert(0, event);
845                }
846
847                last_seen.push(unwrap_log_event_message(events.remove(0)));
848            }
849
850            assert_eq!(&expected_last_seen[..], &last_seen);
851        }
852    }
853
854    #[tokio::test]
855    async fn fanout_no_sinks() {
856        let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream"));
857        let events = make_events(2);
858
859        fanout
860            .send(events[0].clone().into(), None)
861            .await
862            .expect("should not fail");
863        fanout
864            .send(events[1].clone().into(), None)
865            .await
866            .expect("should not fail");
867    }
868
869    #[tokio::test]
870    #[should_panic(expected = "Fanout received empty event batch from upstream component")]
871    async fn fanout_panics_on_empty_event_array_in_debug_builds() {
872        let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]);
873        let empty: EventArray = Vec::<LogEvent>::new().into();
874
875        let _ = fanout.send(empty, None).await;
876    }
877
878    #[tokio::test]
879    async fn fanout_replace() {
880        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
881        let events = make_events(3);
882
883        // First two sends should immediately complete because all senders have capacity:
884        fanout
885            .send(events[0].clone().into(), None)
886            .await
887            .expect("should not fail");
888        fanout
889            .send(events[1].clone().into(), None)
890            .await
891            .expect("should not fail");
892
893        // Replace the first sender with a brand new one before polling again:
894        let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
895
896        // And do the third send which should also complete since all senders still have capacity:
897        fanout
898            .send(events[2].clone().into(), None)
899            .await
900            .expect("should not fail");
901
902        // Now make sure that the new "first" sender only got the third event, but that the second and
903        // third sender got all three events:
904        let expected_events = [&events[2..], &events, &events];
905        for (i, receiver) in receivers.into_iter().enumerate() {
906            assert_eq!(
907                collect_ready_events(receiver.into_stream()),
908                expected_events[i]
909            );
910        }
911
912        // And make sure our original "first" sender got the first two events:
913        assert_eq!(
914            collect_ready_events(old_first_receiver.into_stream()),
915            &events[..2]
916        );
917    }
918
919    #[tokio::test]
920    async fn fanout_wait() {
921        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
922        let events = make_events(3);
923
924        // First two sends should immediately complete because all senders have capacity:
925        let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
926        assert_ready!(poll!(send1)).expect("should not fail");
927        let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
928        assert_ready!(poll!(send2)).expect("should not fail");
929
930        // Now do an empty replace on the second sender, which we'll test to make sure that `Fanout`
931        // doesn't let any writes through until we replace it properly.  We get back the receiver
932        // we've replaced, but also the sender that we want to eventually install:
933        let (old_first_receiver, new_first_sender) =
934            start_sender_replace(&control, &mut receivers, 0, 4);
935
936        // Third send should return pending because now we have an in-flight replacement:
937        let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
938        assert_pending!(third_send.poll());
939
940        // Finish our sender replacement, which should wake up the third send and allow it to
941        // actually complete:
942        finish_sender_resume(&control, 0, new_first_sender);
943        assert!(third_send.is_woken());
944        assert_ready!(third_send.poll()).expect("should not fail");
945
946        // Make sure the original first sender got the first two events, the new first sender got
947        // the last event, and the second sender got all three:
948        assert_eq!(
949            collect_ready_events(old_first_receiver.into_stream()),
950            &events[0..2]
951        );
952
953        let expected_events = [&events[2..], &events];
954        for (i, receiver) in receivers.into_iter().enumerate() {
955            assert_eq!(
956                collect_ready_events(receiver.into_stream()),
957                expected_events[i]
958            );
959        }
960    }
961
962    fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
963        (0..count).map(|i| LogEvent::from(format!("line {i}")))
964    }
965
966    fn make_events(count: usize) -> Vec<Event> {
967        make_events_inner(count).map(Into::into).collect()
968    }
969
970    fn make_event_array(count: usize) -> EventArray {
971        make_events_inner(count).collect::<Vec<_>>().into()
972    }
973}