1use std::net::SocketAddr;
2
3use vector_lib::{
4 NamedInternalEvent, counter,
5 internal_event::{CounterName, InternalEvent, error_stage, error_type},
6};
7
8use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct TcpSocketConnectionEstablished {
12 pub peer_addr: Option<SocketAddr>,
13}
14
15impl InternalEvent for TcpSocketConnectionEstablished {
16 fn emit(self) {
17 if let Some(peer_addr) = self.peer_addr {
18 debug!(message = "Connected.", %peer_addr);
19 } else {
20 debug!(message = "Connected.", peer_addr = "unknown");
21 }
22 counter!(CounterName::ConnectionEstablishedTotal, "mode" => "tcp").increment(1);
23 }
24}
25
26#[derive(Debug, NamedInternalEvent)]
27pub struct TcpSocketOutgoingConnectionError<E> {
28 pub error: E,
29}
30
31impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
32 fn emit(self) {
33 emit!(SocketOutgoingConnectionError { error: self.error });
36 }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40pub struct TcpSocketConnectionShutdown;
41
42impl InternalEvent for TcpSocketConnectionShutdown {
43 fn emit(self) {
44 warn!(message = "Received EOF from the server, shutdown.");
45 counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
46 }
47}
48
49#[derive(Debug, NamedInternalEvent)]
56pub struct TcpSourceConnectionClosed;
57
58impl InternalEvent for TcpSourceConnectionClosed {
59 fn emit(self) {
60 debug!(message = "Connection closed.");
61 counter!(CounterName::ConnectionShutdownTotal, "mode" => "tcp").increment(1);
62 }
63}
64
65#[cfg(all(unix, feature = "sources-dnstap"))]
66#[derive(Debug, NamedInternalEvent)]
67pub struct TcpSocketError<'a, E> {
68 pub(crate) error: &'a E,
69 pub peer_addr: SocketAddr,
70}
71
72#[cfg(all(unix, feature = "sources-dnstap"))]
73impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
74 fn emit(self) {
75 error!(
76 message = "TCP socket error.",
77 error = %self.error,
78 peer_addr = ?self.peer_addr,
79 error_type = error_type::CONNECTION_FAILED,
80 stage = error_stage::PROCESSING,
81 );
82 counter!(
83 CounterName::ComponentErrorsTotal,
84 "error_type" => error_type::CONNECTION_FAILED,
85 "stage" => error_stage::PROCESSING,
86 )
87 .increment(1);
88 }
89}
90
91#[derive(Debug, NamedInternalEvent)]
92pub struct TcpSocketTlsConnectionError {
93 pub error: TlsError,
94}
95
96impl InternalEvent for TcpSocketTlsConnectionError {
97 fn emit(self) {
98 match self.error {
99 TlsError::Handshake { ref source }
103 if source.code() == openssl::ssl::ErrorCode::SYSCALL
104 && source.io_error().is_none() =>
105 {
106 debug!(
107 message = "Connection error, probably a healthcheck.",
108 error = %self.error,
109 );
110 }
111 _ => {
112 error!(
113 message = "Connection error.",
114 error = %self.error,
115 error_code = "connection_failed",
116 error_type = error_type::WRITER_FAILED,
117 stage = error_stage::SENDING,
118 );
119 counter!(
120 CounterName::ComponentErrorsTotal,
121 "error_code" => "connection_failed",
122 "error_type" => error_type::WRITER_FAILED,
123 "stage" => error_stage::SENDING,
124 "mode" => "tcp",
125 )
126 .increment(1);
127 }
128 }
129 }
130}
131
132#[derive(Debug, NamedInternalEvent)]
133pub struct TcpSendAckError {
134 pub error: std::io::Error,
135}
136
137impl InternalEvent for TcpSendAckError {
138 fn emit(self) {
139 error!(
140 message = "Error writing acknowledgement, dropping connection.",
141 error = %self.error,
142 error_code = "ack_failed",
143 error_type = error_type::WRITER_FAILED,
144 stage = error_stage::SENDING,
145 );
146 counter!(
147 CounterName::ComponentErrorsTotal,
148 "error_code" => "ack_failed",
149 "error_type" => error_type::WRITER_FAILED,
150 "stage" => error_stage::SENDING,
151 "mode" => "tcp",
152 )
153 .increment(1);
154 }
155}
156
157#[derive(Debug, NamedInternalEvent)]
158pub struct TcpBytesReceived {
159 pub byte_size: usize,
160 pub peer_addr: SocketAddr,
161}
162
163impl InternalEvent for TcpBytesReceived {
164 fn emit(self) {
165 trace!(
166 message = "Bytes received.",
167 protocol = "tcp",
168 byte_size = %self.byte_size,
169 peer_addr = %self.peer_addr,
170 );
171 counter!(
172 CounterName::ComponentReceivedBytesTotal, "protocol" => "tcp"
173 )
174 .increment(self.byte_size as u64);
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use std::io;
181
182 use serial_test::serial;
183 use vector_lib::event::MetricValue;
184 use vector_lib::internal_event::InternalEvent;
185 use vector_lib::metrics::Controller;
186
187 use super::{TcpSendAckError, TcpSourceConnectionClosed};
188
189 fn counter_value(name: &str, tags: &[(&str, &str)]) -> f64 {
193 Controller::get()
194 .expect("metrics controller initialized")
195 .capture_metrics()
196 .into_iter()
197 .find(|m| {
198 m.name() == name
199 && tags
200 .iter()
201 .all(|(k, v)| m.tags().is_some_and(|t| t.get(k) == Some(*v)))
202 })
203 .map(|m| match m.value() {
204 MetricValue::Counter { value } => *value,
205 other => panic!("expected counter for {name}, got {other:?}"),
206 })
207 .unwrap_or(0.0)
208 }
209
210 #[test]
214 #[serial]
215 fn tcp_source_connection_closed_increments_shutdown_total() {
216 crate::test_util::trace_init();
217 let before = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
218
219 TcpSourceConnectionClosed.emit();
220
221 let after = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
222 assert_eq!(after - before, 1.0);
223 }
224
225 #[test]
229 #[serial]
230 fn tcp_send_ack_error_emit_always_increments_component_errors_total() {
231 crate::test_util::trace_init();
232 let errors_before = counter_value(
233 "component_errors_total",
234 &[
235 ("error_code", "ack_failed"),
236 ("error_type", "writer_failed"),
237 ("stage", "sending"),
238 ("mode", "tcp"),
239 ],
240 );
241 let shutdown_before = counter_value("connection_shutdown_total", &[("mode", "tcp")]);
242
243 TcpSendAckError {
244 error: io::Error::from(io::ErrorKind::ConnectionReset),
245 }
246 .emit();
247
248 assert_eq!(
249 counter_value(
250 "component_errors_total",
251 &[
252 ("error_code", "ack_failed"),
253 ("error_type", "writer_failed"),
254 ("stage", "sending"),
255 ("mode", "tcp"),
256 ],
257 ) - errors_before,
258 1.0,
259 );
260 assert_eq!(
261 counter_value("connection_shutdown_total", &[("mode", "tcp")]),
262 shutdown_before,
263 "TcpSendAckError must not bump the connection-close counter — \
264 that is TcpSourceConnectionClosed's responsibility.",
265 );
266 }
267}