vector/sources/util/net/tcp/
mod.rs1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{FutureExt, StreamExt, future::BoxFuture};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13 io::AsyncWriteExt,
14 net::{TcpListener, TcpStream},
15 time::sleep,
16};
17use tokio_util::codec::Decoder;
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 codecs::{ReadyFrames, StreamDecodingError, internal_events::DecoderFramingError},
22 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
23 event::{BatchNotifier, BatchStatus, Event},
24 finalization::AddBatchNotifier,
25 lookup::{OwnedValuePath, path},
26 shutdown::ShutdownSignal,
27 source_sender::SourceSender,
28 tcp::TcpKeepaliveConfig,
29 tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
30};
31use vrl::value::ObjectMap;
32
33use self::request_limiter::RequestLimiter;
34use super::SocketListenAddr;
35use crate::{
36 config::SourceContext,
37 internal_events::{
38 ConnectionOpen, OpenGauge, SocketBindError, SocketEventsReceived, SocketMode,
39 SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
40 TcpSocketTlsConnectionError, TcpSourceConnectionClosed,
41 },
42 net::is_graceful_tls_shutdown,
43 sources::util::{AfterReadExt, LenientFramedRead},
44};
45
46pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
47
48pub async fn try_bind_tcp_listener(
49 addr: SocketListenAddr,
50 mut listenfd: ListenFd,
51 tls: &MaybeTlsSettings,
52 allowlist: Option<Vec<IpNet>>,
53) -> crate::Result<MaybeTlsListener> {
54 match addr {
55 SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
56 SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
57 Some(listener) => TcpListener::from_std(listener)
58 .map(Into::into)
59 .map_err(Into::into),
60 None => {
61 Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
62 }
63 },
64 }
65 .map(|listener| listener.with_allowlist(allowlist))
66}
67
68#[derive(Clone, Copy, Eq, PartialEq)]
69pub enum TcpSourceAck {
70 Ack,
71 Error,
72 Reject,
73}
74
75pub trait TcpSourceAcker {
76 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
77}
78
79pub struct TcpNullAcker;
80
81impl TcpSourceAcker for TcpNullAcker {
82 fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
85 None
86 }
87}
88
89pub trait TcpSource: Clone + Send + Sync + 'static
90where
91 <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
92{
93 type Error: From<io::Error>
96 + StreamDecodingError
97 + std::fmt::Debug
98 + std::fmt::Display
99 + Send
100 + Unpin;
101 type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
102 type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
103 type Acker: TcpSourceAcker + Send;
104
105 fn decoder(&self) -> Self::Decoder;
106
107 fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
108
109 fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
110
111 #[allow(clippy::too_many_arguments)]
112 fn run(
113 self,
114 addr: SocketListenAddr,
115 keepalive: Option<TcpKeepaliveConfig>,
116 shutdown_timeout_secs: Duration,
117 tls: MaybeTlsSettings,
118 tls_client_metadata_key: Option<OwnedValuePath>,
119 receive_buffer_bytes: Option<usize>,
120 max_connection_duration_secs: Option<u64>,
121 cx: SourceContext,
122 acknowledgements: SourceAcknowledgementsConfig,
123 max_connections: Option<u32>,
124 allowlist: Option<Vec<IpNet>>,
125 source_name: &'static str,
126 log_namespace: LogNamespace,
127 ) -> crate::Result<crate::sources::Source> {
128 let acknowledgements = cx.do_acknowledgements(acknowledgements);
129
130 Ok(Box::pin(async move {
131 let listenfd = ListenFd::from_env();
132 let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
133 .await
134 .map_err(|error| {
135 emit!(SocketBindError {
136 mode: SocketMode::Tcp,
137 error: &error,
138 })
139 })?;
140
141 info!(
142 message = "Listening.",
143 addr = %listener
144 .local_addr()
145 .map(SocketListenAddr::SocketAddr)
146 .unwrap_or(addr)
147 );
148
149 let tripwire = cx.shutdown.clone();
150 let tripwire = async move {
151 _ = tripwire.await;
152 sleep(shutdown_timeout_secs).await;
153 }
154 .shared();
155
156 let connection_gauge = OpenGauge::new();
157 let shutdown_clone = cx.shutdown.clone();
158
159 let request_limiter =
160 RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
161
162 listener
163 .accept_stream_limited(max_connections)
164 .take_until(shutdown_clone)
165 .for_each(move |(connection, tcp_connection_permit)| {
166 let shutdown_signal = cx.shutdown.clone();
167 let tripwire = tripwire.clone();
168 let source = self.clone();
169 let out = cx.out.clone();
170 let connection_gauge = connection_gauge.clone();
171 let request_limiter = request_limiter.clone();
172 let tls_client_metadata_key = tls_client_metadata_key.clone();
173
174 async move {
175 let socket = match connection {
176 Ok(socket) => socket,
177 Err(error) => {
178 emit!(SocketReceiveError {
179 mode: SocketMode::Tcp,
180 error: &error
181 });
182 return;
183 }
184 };
185
186 let peer_addr = socket.peer_addr();
187 let span = info_span!("connection", %peer_addr);
188
189 let tripwire = tripwire
190 .map(move |_| {
191 info!(
192 message = "Resetting connection (still open after seconds).",
193 seconds = ?shutdown_timeout_secs
194 );
195 })
196 .boxed();
197
198 span.clone().in_scope(|| {
199 debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
200
201 let open_token =
202 connection_gauge.open(|count| emit!(ConnectionOpen { count }));
203
204 let fut = handle_stream(
205 shutdown_signal,
206 socket,
207 keepalive,
208 receive_buffer_bytes,
209 max_connection_duration_secs,
210 source,
211 tripwire,
212 peer_addr,
213 out,
214 acknowledgements,
215 request_limiter,
216 tls_client_metadata_key.clone(),
217 source_name,
218 log_namespace,
219 );
220
221 tokio::spawn(
222 fut.map(move |()| {
223 drop(open_token);
224 emit!(TcpSourceConnectionClosed);
230 drop(tcp_connection_permit);
231 })
232 .instrument(span.or_current()),
233 );
234 });
235 }
236 })
237 .map(Ok)
238 .await
239 }))
240 }
241}
242
243#[allow(clippy::too_many_arguments)]
244async fn handle_stream<T>(
245 mut shutdown_signal: ShutdownSignal,
246 mut socket: MaybeTlsIncomingStream<TcpStream>,
247 keepalive: Option<TcpKeepaliveConfig>,
248 receive_buffer_bytes: Option<usize>,
249 max_connection_duration_secs: Option<u64>,
250 source: T,
251 mut tripwire: BoxFuture<'static, ()>,
252 peer_addr: SocketAddr,
253 mut out: SourceSender,
254 acknowledgements: bool,
255 request_limiter: RequestLimiter,
256 tls_client_metadata_key: Option<OwnedValuePath>,
257 source_name: &'static str,
258 log_namespace: LogNamespace,
259) where
260 <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
261 T: TcpSource,
262{
263 tokio::select! {
264 result = socket.handshake() => {
265 if let Err(error) = result {
266 emit!(TcpSocketTlsConnectionError { error });
267 return;
268 }
269 },
270 _ = &mut shutdown_signal => {
271 return;
272 }
273 };
274
275 if let Some(keepalive) = keepalive
276 && let Err(error) = socket.set_keepalive(keepalive)
277 {
278 warn!(message = "Failed configuring TCP keepalive.", %error);
279 }
280
281 if let Some(receive_buffer_bytes) = receive_buffer_bytes
282 && let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes)
283 {
284 warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
285 }
286
287 let socket = socket.after_read(move |byte_size| {
288 emit!(TcpBytesReceived {
289 byte_size,
290 peer_addr
291 });
292 });
293
294 let certificate_metadata = socket
295 .get_ref()
296 .ssl_stream()
297 .and_then(|stream| stream.ssl().peer_certificate())
298 .map(CertificateMetadata::from);
299
300 let reader = LenientFramedRead::new(socket, source.decoder());
301
302 let mut reader = ReadyFrames::new(reader);
303
304 let connection_close_timeout = OptionFuture::from(
305 max_connection_duration_secs
306 .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
307 );
308
309 tokio::pin!(connection_close_timeout);
310
311 loop {
312 let mut permit = tokio::select! {
313 _ = &mut tripwire => break,
314 Some(_) = &mut connection_close_timeout => {
315 if close_socket(reader.get_ref().get_ref().get_ref()) {
316 break;
317 }
318 None
319 },
320 _ = &mut shutdown_signal => {
321 if close_socket(reader.get_ref().get_ref().get_ref()) {
322 break;
323 }
324 None
325 },
326 permit = request_limiter.acquire() => {
327 Some(permit)
328 }
329 else => break,
330 };
331
332 let timeout = tokio::time::sleep(Duration::from_millis(10));
333 tokio::pin!(timeout);
334
335 tokio::select! {
336 _ = &mut tripwire => break,
337 _ = &mut shutdown_signal => {
338 if close_socket(reader.get_ref().get_ref().get_ref()) {
339 break;
340 }
341 },
342 _ = &mut timeout => {
343 continue;
346 }
347 res = reader.next() => {
348 match res {
349 Some(Ok((frames, _byte_size))) => {
350 let _num_frames = frames.len();
351 let acker = source.build_acker(&frames);
352 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
353
354 let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
355 let count = events.len();
356
357 emit!(SocketEventsReceived {
358 mode: SocketMode::Tcp,
359 byte_size: events.estimated_json_encoded_size_of(),
360 count,
361 });
362
363 if let Some(permit) = &mut permit {
364 permit.decoding_finished(events.len());
368 }
369
370 if let Some(batch) = batch {
371 for event in &mut events {
372 event.add_batch_notifier(batch.clone());
373 }
374 }
375
376
377 if let Some(certificate_metadata) = &certificate_metadata {
378 let mut metadata = ObjectMap::new();
379 metadata.insert("subject".into(), certificate_metadata.subject().into());
380 for event in &mut events {
381 let log = event.as_mut_log();
382
383 log_namespace.insert_source_metadata(
384 source_name,
385 log,
386 tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
387 path!("tls_client_metadata"),
388 metadata.clone()
389 );
390 }
391 }
392
393 source.handle_events(&mut events, peer_addr);
394 match out.send_batch(events).await {
395 Ok(_) => {
396 let ack = match receiver {
397 None => TcpSourceAck::Ack,
398 Some(receiver) =>
399 match receiver.await {
400 BatchStatus::Delivered => TcpSourceAck::Ack,
401 BatchStatus::Errored => {TcpSourceAck::Error},
402 BatchStatus::Rejected => {
403 TcpSourceAck::Reject
405 }
406 }
407 };
408 if let Some(ack_bytes) = acker.build_ack(ack){
409 let stream = reader.get_mut().get_mut();
410 if let Err(error) = stream.write_all(&ack_bytes).await {
411 if is_graceful_tls_shutdown(&error) {
417 warn!(
418 message = "Connection closed by peer before acknowledgement could be sent.",
419 error = %error,
420 );
421 } else {
422 emit!(TcpSendAckError { error });
423 }
424 break;
425 }
426 }
427 if ack != TcpSourceAck::Ack {
428 break;
429 }
430 }
431 Err(_) => {
432 emit!(StreamClosedError { count });
433 break;
434 }
435 }
436 }
437 Some(Err(error)) => {
438 if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
439 emit!(DecoderFramingError { error });
440 break;
441 }
442 }
443 None => {
444 debug!("Connection closed.");
445 break
446 },
447 }
448 }
449 else => break,
450 }
451
452 drop(permit);
453 }
454}
455
456fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
457 debug!("Start graceful shutdown.");
458 if let Some(stream) = socket.get_ref() {
461 let socket = SockRef::from(stream);
462 if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
463 warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
464 }
465 false
466 } else {
467 debug!("Closing connection that hasn't yet been fully established.");
469 true
470 }
471}