vector/sources/util/net/tcp/
mod.rs

1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{FutureExt, StreamExt, future::BoxFuture};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13    io::AsyncWriteExt,
14    net::{TcpListener, TcpStream},
15    time::sleep,
16};
17use tokio_util::codec::Decoder;
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    codecs::{ReadyFrames, StreamDecodingError, internal_events::DecoderFramingError},
22    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
23    event::{BatchNotifier, BatchStatus, Event},
24    finalization::AddBatchNotifier,
25    lookup::{OwnedValuePath, path},
26    shutdown::ShutdownSignal,
27    source_sender::SourceSender,
28    tcp::TcpKeepaliveConfig,
29    tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
30};
31use vrl::value::ObjectMap;
32
33use self::request_limiter::RequestLimiter;
34use super::SocketListenAddr;
35use crate::{
36    config::SourceContext,
37    internal_events::{
38        ConnectionOpen, OpenGauge, SocketBindError, SocketEventsReceived, SocketMode,
39        SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
40        TcpSocketTlsConnectionError, TcpSourceConnectionClosed,
41    },
42    net::is_graceful_tls_shutdown,
43    sources::util::{AfterReadExt, LenientFramedRead},
44};
45
46pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
47
48pub async fn try_bind_tcp_listener(
49    addr: SocketListenAddr,
50    mut listenfd: ListenFd,
51    tls: &MaybeTlsSettings,
52    allowlist: Option<Vec<IpNet>>,
53) -> crate::Result<MaybeTlsListener> {
54    match addr {
55        SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
56        SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
57            Some(listener) => TcpListener::from_std(listener)
58                .map(Into::into)
59                .map_err(Into::into),
60            None => {
61                Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
62            }
63        },
64    }
65    .map(|listener| listener.with_allowlist(allowlist))
66}
67
68#[derive(Clone, Copy, Eq, PartialEq)]
69pub enum TcpSourceAck {
70    Ack,
71    Error,
72    Reject,
73}
74
75pub trait TcpSourceAcker {
76    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
77}
78
79pub struct TcpNullAcker;
80
81impl TcpSourceAcker for TcpNullAcker {
82    // This function builds an acknowledgement from the source data in
83    // the acker and the given acknowledgement code.
84    fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
85        None
86    }
87}
88
89pub trait TcpSource: Clone + Send + Sync + 'static
90where
91    <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
92{
93    // Should be default: `std::io::Error`.
94    // Right now this is unstable: https://github.com/rust-lang/rust/issues/29661
95    type Error: From<io::Error>
96        + StreamDecodingError
97        + std::fmt::Debug
98        + std::fmt::Display
99        + Send
100        + Unpin;
101    type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
102    type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
103    type Acker: TcpSourceAcker + Send;
104
105    fn decoder(&self) -> Self::Decoder;
106
107    fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
108
109    fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
110
111    #[allow(clippy::too_many_arguments)]
112    fn run(
113        self,
114        addr: SocketListenAddr,
115        keepalive: Option<TcpKeepaliveConfig>,
116        shutdown_timeout_secs: Duration,
117        tls: MaybeTlsSettings,
118        tls_client_metadata_key: Option<OwnedValuePath>,
119        receive_buffer_bytes: Option<usize>,
120        max_connection_duration_secs: Option<u64>,
121        cx: SourceContext,
122        acknowledgements: SourceAcknowledgementsConfig,
123        max_connections: Option<u32>,
124        allowlist: Option<Vec<IpNet>>,
125        source_name: &'static str,
126        log_namespace: LogNamespace,
127    ) -> crate::Result<crate::sources::Source> {
128        let acknowledgements = cx.do_acknowledgements(acknowledgements);
129
130        Ok(Box::pin(async move {
131            let listenfd = ListenFd::from_env();
132            let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
133                .await
134                .map_err(|error| {
135                    emit!(SocketBindError {
136                        mode: SocketMode::Tcp,
137                        error: &error,
138                    })
139                })?;
140
141            info!(
142                message = "Listening.",
143                addr = %listener
144                    .local_addr()
145                    .map(SocketListenAddr::SocketAddr)
146                    .unwrap_or(addr)
147            );
148
149            let tripwire = cx.shutdown.clone();
150            let tripwire = async move {
151                _ = tripwire.await;
152                sleep(shutdown_timeout_secs).await;
153            }
154            .shared();
155
156            let connection_gauge = OpenGauge::new();
157            let shutdown_clone = cx.shutdown.clone();
158
159            let request_limiter =
160                RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
161
162            listener
163                .accept_stream_limited(max_connections)
164                .take_until(shutdown_clone)
165                .for_each(move |(connection, tcp_connection_permit)| {
166                    let shutdown_signal = cx.shutdown.clone();
167                    let tripwire = tripwire.clone();
168                    let source = self.clone();
169                    let out = cx.out.clone();
170                    let connection_gauge = connection_gauge.clone();
171                    let request_limiter = request_limiter.clone();
172                    let tls_client_metadata_key = tls_client_metadata_key.clone();
173
174                    async move {
175                        let socket = match connection {
176                            Ok(socket) => socket,
177                            Err(error) => {
178                                emit!(SocketReceiveError {
179                                    mode: SocketMode::Tcp,
180                                    error: &error
181                                });
182                                return;
183                            }
184                        };
185
186                        let peer_addr = socket.peer_addr();
187                        let span = info_span!("connection", %peer_addr);
188
189                        let tripwire = tripwire
190                            .map(move |_| {
191                                info!(
192                                    message = "Resetting connection (still open after seconds).",
193                                    seconds = ?shutdown_timeout_secs
194                                );
195                            })
196                            .boxed();
197
198                        span.clone().in_scope(|| {
199                            debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
200
201                            let open_token =
202                                connection_gauge.open(|count| emit!(ConnectionOpen { count }));
203
204                            let fut = handle_stream(
205                                shutdown_signal,
206                                socket,
207                                keepalive,
208                                receive_buffer_bytes,
209                                max_connection_duration_secs,
210                                source,
211                                tripwire,
212                                peer_addr,
213                                out,
214                                acknowledgements,
215                                request_limiter,
216                                tls_client_metadata_key.clone(),
217                                source_name,
218                                log_namespace,
219                            );
220
221                            tokio::spawn(
222                                fut.map(move |()| {
223                                    drop(open_token);
224                                    // Paired with the ConnectionOpen emit above:
225                                    // fires exactly once per accepted connection,
226                                    // including paths that return early from
227                                    // handle_stream (TLS handshake failure,
228                                    // shutdown during handshake).
229                                    emit!(TcpSourceConnectionClosed);
230                                    drop(tcp_connection_permit);
231                                })
232                                .instrument(span.or_current()),
233                            );
234                        });
235                    }
236                })
237                .map(Ok)
238                .await
239        }))
240    }
241}
242
243#[allow(clippy::too_many_arguments)]
244async fn handle_stream<T>(
245    mut shutdown_signal: ShutdownSignal,
246    mut socket: MaybeTlsIncomingStream<TcpStream>,
247    keepalive: Option<TcpKeepaliveConfig>,
248    receive_buffer_bytes: Option<usize>,
249    max_connection_duration_secs: Option<u64>,
250    source: T,
251    mut tripwire: BoxFuture<'static, ()>,
252    peer_addr: SocketAddr,
253    mut out: SourceSender,
254    acknowledgements: bool,
255    request_limiter: RequestLimiter,
256    tls_client_metadata_key: Option<OwnedValuePath>,
257    source_name: &'static str,
258    log_namespace: LogNamespace,
259) where
260    <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
261    T: TcpSource,
262{
263    tokio::select! {
264        result = socket.handshake() => {
265            if let Err(error) = result {
266                emit!(TcpSocketTlsConnectionError { error });
267                return;
268            }
269        },
270        _ = &mut shutdown_signal => {
271            return;
272        }
273    };
274
275    if let Some(keepalive) = keepalive
276        && let Err(error) = socket.set_keepalive(keepalive)
277    {
278        warn!(message = "Failed configuring TCP keepalive.", %error);
279    }
280
281    if let Some(receive_buffer_bytes) = receive_buffer_bytes
282        && let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes)
283    {
284        warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
285    }
286
287    let socket = socket.after_read(move |byte_size| {
288        emit!(TcpBytesReceived {
289            byte_size,
290            peer_addr
291        });
292    });
293
294    let certificate_metadata = socket
295        .get_ref()
296        .ssl_stream()
297        .and_then(|stream| stream.ssl().peer_certificate())
298        .map(CertificateMetadata::from);
299
300    let reader = LenientFramedRead::new(socket, source.decoder());
301
302    let mut reader = ReadyFrames::new(reader);
303
304    let connection_close_timeout = OptionFuture::from(
305        max_connection_duration_secs
306            .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
307    );
308
309    tokio::pin!(connection_close_timeout);
310
311    loop {
312        let mut permit = tokio::select! {
313            _ = &mut tripwire => break,
314            Some(_) = &mut connection_close_timeout  => {
315                if close_socket(reader.get_ref().get_ref().get_ref()) {
316                    break;
317                }
318                None
319            },
320            _ = &mut shutdown_signal => {
321                if close_socket(reader.get_ref().get_ref().get_ref()) {
322                    break;
323                }
324                None
325            },
326            permit = request_limiter.acquire() => {
327                Some(permit)
328            }
329            else => break,
330        };
331
332        let timeout = tokio::time::sleep(Duration::from_millis(10));
333        tokio::pin!(timeout);
334
335        tokio::select! {
336            _ = &mut tripwire => break,
337            _ = &mut shutdown_signal => {
338                if close_socket(reader.get_ref().get_ref().get_ref()) {
339                    break;
340                }
341            },
342            _ = &mut timeout => {
343                // This connection is currently holding a permit, but has not received data for some time. Release
344                // the permit to let another connection try
345                continue;
346            }
347            res = reader.next() => {
348                match res {
349                    Some(Ok((frames, _byte_size))) => {
350                        let _num_frames = frames.len();
351                        let acker = source.build_acker(&frames);
352                        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
353
354                        let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
355                        let count = events.len();
356
357                        emit!(SocketEventsReceived {
358                            mode: SocketMode::Tcp,
359                            byte_size: events.estimated_json_encoded_size_of(),
360                            count,
361                        });
362
363                        if let Some(permit) = &mut permit {
364                            // Note that this is intentionally not the "number of events in a single request", but rather
365                            // the "number of events currently available". This may contain events from multiple events,
366                            // but it should always contain all events from each request.
367                            permit.decoding_finished(events.len());
368                        }
369
370                        if let Some(batch) = batch {
371                            for event in &mut events {
372                                event.add_batch_notifier(batch.clone());
373                            }
374                        }
375
376
377                        if let Some(certificate_metadata) = &certificate_metadata {
378                            let mut metadata = ObjectMap::new();
379                            metadata.insert("subject".into(), certificate_metadata.subject().into());
380                            for event in &mut events {
381                                let log = event.as_mut_log();
382
383                                log_namespace.insert_source_metadata(
384                                    source_name,
385                                    log,
386                                    tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
387                                    path!("tls_client_metadata"),
388                                    metadata.clone()
389                                );
390                            }
391                        }
392
393                        source.handle_events(&mut events, peer_addr);
394                        match out.send_batch(events).await {
395                            Ok(_) => {
396                                let ack = match receiver {
397                                    None => TcpSourceAck::Ack,
398                                    Some(receiver) =>
399                                        match receiver.await {
400                                            BatchStatus::Delivered => TcpSourceAck::Ack,
401                                            BatchStatus::Errored => {TcpSourceAck::Error},
402                                            BatchStatus::Rejected => {
403                                                // Sinks are responsible for emitting ComponentEventsDropped.
404                                                TcpSourceAck::Reject
405                                            }
406                                        }
407                                };
408                                if let Some(ack_bytes) = acker.build_ack(ack){
409                                    let stream = reader.get_mut().get_mut();
410                                    if let Err(error) = stream.write_all(&ack_bytes).await {
411                                        // Per spec, `*Error` events MUST only be
412                                        // emitted on real errors. A peer-initiated
413                                        // graceful TLS shutdown during the ack
414                                        // write is a lifecycle event, not an error
415                                        // — log at warn and skip the emit.
416                                        if is_graceful_tls_shutdown(&error) {
417                                            warn!(
418                                                message = "Connection closed by peer before acknowledgement could be sent.",
419                                                error = %error,
420                                            );
421                                        } else {
422                                            emit!(TcpSendAckError { error });
423                                        }
424                                        break;
425                                    }
426                                }
427                                if ack != TcpSourceAck::Ack {
428                                    break;
429                                }
430                            }
431                            Err(_) => {
432                                emit!(StreamClosedError { count });
433                                break;
434                            }
435                        }
436                    }
437                    Some(Err(error)) => {
438                        if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
439                            emit!(DecoderFramingError { error });
440                            break;
441                        }
442                    }
443                    None => {
444                        debug!("Connection closed.");
445                        break
446                    },
447                }
448            }
449            else => break,
450        }
451
452        drop(permit);
453    }
454}
455
456fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
457    debug!("Start graceful shutdown.");
458    // Close our write part of TCP socket to signal the other side
459    // that it should stop writing and close the channel.
460    if let Some(stream) = socket.get_ref() {
461        let socket = SockRef::from(stream);
462        if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
463            warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
464        }
465        false
466    } else {
467        // Connection hasn't yet been established so we are done here.
468        debug!("Closing connection that hasn't yet been fully established.");
469        true
470    }
471}