vector/internal_events/
tcp.rs

1use std::net::SocketAddr;
2
3use vector_lib::{
4    NamedInternalEvent, counter,
5    internal_event::{CounterName, InternalEvent, error_stage, error_type},
6};
7
8use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct TcpSocketConnectionEstablished {
12    pub peer_addr: Option<SocketAddr>,
13}
14
15impl InternalEvent for TcpSocketConnectionEstablished {
16    fn emit(self) {
17        if let Some(peer_addr) = self.peer_addr {
18            debug!(message = "Connected.", %peer_addr);
19        } else {
20            debug!(message = "Connected.", peer_addr = "unknown");
21        }
22        counter!(CounterName::ConnectionEstablishedTotal, "mode" => "tcp").increment(1);
23    }
24}
25
26#[derive(Debug, NamedInternalEvent)]
27pub struct TcpSocketOutgoingConnectionError<E> {
28    pub error: E,
29}
30
31impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
32    fn emit(self) {
33        // ## skip check-duplicate-events ##
34        // ## skip check-validity-events ##
35        emit!(SocketOutgoingConnectionError { error: self.error });
36    }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40pub struct TcpSocketConnectionShutdown;
41
42impl InternalEvent for TcpSocketConnectionShutdown {
43    fn emit(self) {
44        warn!(message = "Received EOF from the server, shutdown.");
45        counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
46    }
47}
48
49/// Emitted once per accepted TCP source connection, after the per-connection
50/// task exits — regardless of cause. This includes pre-loop exits (TLS
51/// handshake failure, shutdown signal arriving during handshake) as well as
52/// every read/ack loop exit (graceful peer EOF, decoder failure, downstream
53/// closed, ack write failure, shutdown signal, tripwire, max connection
54/// duration). Pairs exactly with `ConnectionOpen`.
55#[derive(Debug, NamedInternalEvent)]
56pub struct TcpSourceConnectionClosed;
57
58impl InternalEvent for TcpSourceConnectionClosed {
59    fn emit(self) {
60        debug!(message = "Connection closed.");
61        counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
62    }
63}
64
65#[cfg(all(unix, feature = "sources-dnstap"))]
66#[derive(Debug, NamedInternalEvent)]
67pub struct TcpSocketError<'a, E> {
68    pub(crate) error: &'a E,
69    pub peer_addr: SocketAddr,
70}
71
72#[cfg(all(unix, feature = "sources-dnstap"))]
73impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
74    fn emit(self) {
75        error!(
76            message = "TCP socket error.",
77            error = %self.error,
78            peer_addr = ?self.peer_addr,
79            error_type = error_type::CONNECTION_FAILED,
80            stage = error_stage::PROCESSING,
81        );
82        counter!(
83            CounterName::ComponentErrorsTotal,
84            "error_type" => error_type::CONNECTION_FAILED,
85            "stage" => error_stage::PROCESSING,
86        )
87        .increment(1);
88    }
89}
90
91#[derive(Debug, NamedInternalEvent)]
92pub struct TcpSocketTlsConnectionError {
93    pub error: TlsError,
94}
95
96impl InternalEvent for TcpSocketTlsConnectionError {
97    fn emit(self) {
98        match self.error {
99            // Specific error that occurs when the other side is only
100            // doing SYN/SYN-ACK connections for healthcheck.
101            // https://github.com/vectordotdev/vector/issues/7318
102            TlsError::Handshake { ref source }
103                if source.code() == openssl::ssl::ErrorCode::SYSCALL
104                    && source.io_error().is_none() =>
105            {
106                debug!(
107                    message = "Connection error, probably a healthcheck.",
108                    error = %self.error,
109                );
110            }
111            _ => {
112                error!(
113                    message = "Connection error.",
114                    error = %self.error,
115                    error_code = "connection_failed",
116                    error_type = error_type::WRITER_FAILED,
117                    stage = error_stage::SENDING,
118                );
119                counter!(
120                    CounterName::ComponentErrorsTotal,
121                    "error_code" => "connection_failed",
122                    "error_type" => error_type::WRITER_FAILED,
123                    "stage" => error_stage::SENDING,
124                    "mode" => "tcp",
125                )
126                .increment(1);
127            }
128        }
129    }
130}
131
132#[derive(Debug, NamedInternalEvent)]
133pub struct TcpSendAckError {
134    pub error: std::io::Error,
135}
136
137impl InternalEvent for TcpSendAckError {
138    fn emit(self) {
139        error!(
140            message = "Error writing acknowledgement, dropping connection.",
141            error = %self.error,
142            error_code = "ack_failed",
143            error_type = error_type::WRITER_FAILED,
144            stage = error_stage::SENDING,
145        );
146        counter!(
147            CounterName::ComponentErrorsTotal,
148            "error_code" => "ack_failed",
149            "error_type" => error_type::WRITER_FAILED,
150            "stage" => error_stage::SENDING,
151            "mode" => "tcp",
152        )
153        .increment(1);
154    }
155}
156
157#[derive(Debug, NamedInternalEvent)]
158pub struct TcpBytesReceived {
159    pub byte_size: usize,
160    pub peer_addr: SocketAddr,
161}
162
163impl InternalEvent for TcpBytesReceived {
164    fn emit(self) {
165        trace!(
166            message = "Bytes received.",
167            protocol = "tcp",
168            byte_size = %self.byte_size,
169            peer_addr = %self.peer_addr,
170        );
171        counter!(
172            CounterName::ComponentReceivedBytesTotal, "protocol" => "tcp"
173        )
174        .increment(self.byte_size as u64);
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use std::io;
181
182    use serial_test::serial;
183    use vector_lib::event::MetricValue;
184    use vector_lib::internal_event::InternalEvent;
185    use vector_lib::metrics::Controller;
186
187    use super::{TcpSendAckError, TcpSourceConnectionClosed};
188
189    /// Returns the current value of a counter matching `name` and all `tags`.
190    /// Counters that have not yet been touched aren't in the snapshot and
191    /// register as 0.0 here.
192    fn counter_value(name: &str, tags: &[(&str, &str)]) -> f64 {
193        Controller::get()
194            .expect("metrics controller initialized")
195            .capture_metrics()
196            .into_iter()
197            .find(|m| {
198                m.name() == name
199                    && tags
200                        .iter()
201                        .all(|(k, v)| m.tags().is_some_and(|t| t.get(k) == Some(*v)))
202            })
203            .map(|m| match m.value() {
204                MetricValue::Counter { value } => *value,
205                other => panic!("expected counter for {name}, got {other:?}"),
206            })
207            .unwrap_or(0.0)
208    }
209
210    /// `TcpSourceConnectionClosed` MUST bump `connection_shutdown_total{mode="tcp"}`
211    /// once per emit. Pins the contract that this event is the sole owner of the
212    /// connection-close counter on the source side.
213    #[test]
214    #[serial]
215    fn tcp_source_connection_closed_increments_shutdown_total() {
216        crate::test_util::trace_init();
217        let before = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
218
219        TcpSourceConnectionClosed.emit();
220
221        let after = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
222        assert_eq!(after - before, 1.0);
223    }
224
225    /// `TcpSendAckError` is an `*Error` event and per the instrumentation spec MUST
226    /// only emit on real errors — bumping `component_errors_total` with the
227    /// `ack_failed` error_code.
228    #[test]
229    #[serial]
230    fn tcp_send_ack_error_emit_always_increments_component_errors_total() {
231        crate::test_util::trace_init();
232        let errors_before = counter_value(
233            "component_errors_total",
234            &[
235                ("error_code", "ack_failed"),
236                ("error_type", "writer_failed"),
237                ("stage", "sending"),
238                ("mode", "tcp"),
239            ],
240        );
241        let shutdown_before = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
242
243        TcpSendAckError {
244            error: io::Error::from(io::ErrorKind::ConnectionReset),
245        }
246        .emit();
247
248        assert_eq!(
249            counter_value(
250                "component_errors_total",
251                &[
252                    ("error_code", "ack_failed"),
253                    ("error_type", "writer_failed"),
254                    ("stage", "sending"),
255                    ("mode", "tcp"),
256                ],
257            ) - errors_before,
258            1.0,
259        );
260        assert_eq!(
261            counter_value("connection_shutdown_total", &[("mode", "tcp")]),
262            shutdown_before,
263            "TcpSendAckError must not bump the connection-close counter — \
264             that is TcpSourceConnectionClosed's responsibility.",
265        );
266    }
267}