vector/internal_events/
http_client.rs

1use std::time::Duration;
2
3use http::{
4    Request, Response,
5    header::{self, HeaderMap, HeaderName, HeaderValue},
6};
7use hyper::{Error, body::HttpBody};
8use vector_lib::{
9    NamedInternalEvent, counter, histogram,
10    internal_event::{CounterName, HistogramName, InternalEvent, error_stage, error_type},
11};
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct AboutToSendHttpRequest<'a, T> {
15    pub request: &'a Request<T>,
16}
17
18fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
19    let mut headers = headers.clone();
20    let sensitive: &[HeaderName] = &[
21        header::AUTHORIZATION,
22        header::PROXY_AUTHORIZATION,
23        header::PROXY_AUTHENTICATE,
24        header::WWW_AUTHENTICATE,
25        header::COOKIE,
26        header::SET_COOKIE,
27        HeaderName::from_static("cookie2"),
28        HeaderName::from_static("dd-api-key"),
29        HeaderName::from_static("x-honeycomb-team"),
30        HeaderName::from_static("x-api-key"),
31        HeaderName::from_static("api-key"),
32    ];
33    for (name, value) in headers.iter_mut() {
34        if sensitive.contains(name) {
35            value.set_sensitive(true);
36        }
37    }
38    headers
39}
40
41impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
42    fn emit(self) {
43        debug!(
44            message = "Sending HTTP request.",
45            uri = %self.request.uri(),
46            method = %self.request.method(),
47            version = ?self.request.version(),
48            headers = ?remove_sensitive(self.request.headers()),
49            body = %FormatBody(self.request.body()),
50        );
51        counter!(CounterName::HttpClientRequestsSentTotal, "method" => self.request.method().to_string())
52            .increment(1);
53    }
54}
55
56#[derive(Debug, NamedInternalEvent)]
57pub struct GotHttpResponse<'a, T> {
58    pub response: &'a Response<T>,
59    pub roundtrip: Duration,
60}
61
62impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
63    fn emit(self) {
64        debug!(
65            message = "HTTP response.",
66            status = %self.response.status(),
67            version = ?self.response.version(),
68            headers = ?remove_sensitive(self.response.headers()),
69            body = %FormatBody(self.response.body()),
70        );
71        counter!(
72            CounterName::HttpClientResponsesTotal,
73            "status" => self.response.status().as_u16().to_string(),
74        )
75        .increment(1);
76        histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
77        histogram!(
78            HistogramName::HttpClientResponseRttSeconds,
79            "status" => self.response.status().as_u16().to_string(),
80        )
81        .record(self.roundtrip);
82    }
83}
84
85#[derive(Debug, NamedInternalEvent)]
86pub struct GotHttpWarning<'a> {
87    pub error: &'a Error,
88    pub roundtrip: Duration,
89}
90
91impl InternalEvent for GotHttpWarning<'_> {
92    fn emit(self) {
93        warn!(
94            message = "HTTP error.",
95            error = %self.error,
96            error_type = error_type::REQUEST_FAILED,
97            stage = error_stage::PROCESSING,
98        );
99        counter!(CounterName::HttpClientErrorsTotal, "error_kind" => self.error.to_string())
100            .increment(1);
101        histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
102        histogram!(HistogramName::HttpClientErrorRttSeconds, "error_kind" => self.error.to_string())
103            .record(self.roundtrip);
104    }
105}
106
107/// Newtype placeholder to provide a formatter for the request and response body.
108struct FormatBody<'a, B>(&'a B);
109
110impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
111    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
112        let size = self.0.size_hint();
113        match (size.lower(), size.upper()) {
114            (0, None) => write!(fmt, "[unknown]"),
115            (lower, None) => write!(fmt, "[>={lower} bytes]"),
116
117            (0, Some(0)) => write!(fmt, "[empty]"),
118            (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
119
120            (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
121            (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use http::header::{self, HeaderMap, HeaderName, HeaderValue};
129
130    use super::remove_sensitive;
131
132    fn is_sensitive(map: &HeaderMap, name: &HeaderName) -> Vec<bool> {
133        map.get_all(name)
134            .iter()
135            .map(HeaderValue::is_sensitive)
136            .collect()
137    }
138
139    #[test]
140    fn marks_single_sensitive_header() {
141        let mut headers = HeaderMap::new();
142        headers.insert(
143            header::AUTHORIZATION,
144            HeaderValue::from_static("Bearer token"),
145        );
146        let result = remove_sensitive(&headers);
147        assert!(
148            is_sensitive(&result, &header::AUTHORIZATION)
149                .iter()
150                .all(|&s| s)
151        );
152    }
153
154    #[test]
155    fn marks_all_duplicate_sensitive_headers() {
156        let x_api_key: HeaderName = HeaderName::from_static("x-api-key");
157        let mut headers = HeaderMap::new();
158        headers.insert(x_api_key.clone(), HeaderValue::from_static("key-one"));
159        headers.append(x_api_key.clone(), HeaderValue::from_static("key-two"));
160        headers.append(x_api_key.clone(), HeaderValue::from_static("key-three"));
161
162        let result = remove_sensitive(&headers);
163        let sensitive_flags = is_sensitive(&result, &x_api_key);
164        assert_eq!(sensitive_flags.len(), 3);
165        assert!(
166            sensitive_flags.iter().all(|&s| s),
167            "not all duplicate x-api-key values were marked sensitive: {sensitive_flags:?}"
168        );
169    }
170
171    #[test]
172    fn header_name_matching_is_case_insensitive() {
173        // HeaderName normalizes to lowercase, so mixed-case variants are identical.
174        let mut headers = HeaderMap::new();
175        headers.insert(
176            HeaderName::from_static("x-api-key"),
177            HeaderValue::from_static("secret"),
178        );
179        let result = remove_sensitive(&headers);
180        // Lookup with the mixed-case form resolves to the same normalized name.
181        let mixed_case = HeaderName::from_bytes(b"X-Api-Key").unwrap();
182        assert!(is_sensitive(&result, &mixed_case).iter().all(|&s| s));
183    }
184
185    #[test]
186    fn does_not_mark_non_sensitive_headers() {
187        let mut headers = HeaderMap::new();
188        headers.insert(
189            header::CONTENT_TYPE,
190            HeaderValue::from_static("application/json"),
191        );
192        let result = remove_sensitive(&headers);
193        assert!(
194            is_sensitive(&result, &header::CONTENT_TYPE)
195                .iter()
196                .all(|&s| !s)
197        );
198    }
199}