vector/sources/util/http/
prelude.rs

1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    config::SourceAcknowledgementsConfig,
12    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use vrl::value::ObjectMap;
15use warp::{
16    Filter,
17    filters::{
18        BoxedFilter,
19        path::{FullPath, Tail},
20    },
21    http::{HeaderMap, StatusCode},
22    reject::Rejection,
23};
24
25use super::encoding::decompress_body;
26use crate::{
27    SourceSender,
28    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
29    config::SourceContext,
30    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
31    internal_events::{
32        HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
33    },
34    sources::util::http::HttpMethod,
35    tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
36};
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39    // This function can be defined to enrich events with additional HTTP
40    // metadata. This function should be used rather than internal enrichment so
41    // that accurate byte count metrics can be emitted.
42    fn enrich_events(
43        &self,
44        _events: &mut [Event],
45        _request_path: &str,
46        _headers_config: &HeaderMap,
47        _query_parameters: &HashMap<String, String>,
48        _source_ip: Option<&SocketAddr>,
49    ) {
50    }
51
52    fn build_events(
53        &self,
54        body: Bytes,
55        header_map: &HeaderMap,
56        query_parameters: &HashMap<String, String>,
57        path: &str,
58    ) -> Result<Vec<Event>, ErrorMessage>;
59
60    /// Called after `enrich_events` when `custom` auth returned metadata enrichment fields.
61    /// Sources that do not override this will emit a warning and drop the enrichment.
62    fn inject_auth_enrichment(&self, _events: &mut [Event], enrichment: ObjectMap) {
63        if !enrichment.is_empty() {
64            warn!(
65                message = "Auth metadata enrichment is not supported by this source and will be dropped. \
66                           Remove %field writes from the custom auth VRL program or switch to a source that supports enrichment.",
67                fields = ?enrichment.keys().collect::<Vec<_>>(),
68            );
69        }
70    }
71
72    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
73        decompress_body(encoding_header, body)
74    }
75
76    #[allow(clippy::too_many_arguments)]
77    fn run(
78        self,
79        address: SocketAddr,
80        path: &str,
81        method: HttpMethod,
82        response_code: StatusCode,
83        strict_path: bool,
84        tls: Option<&TlsEnableableConfig>,
85        auth: Option<&HttpServerAuthConfig>,
86        cx: SourceContext,
87        acknowledgements: SourceAcknowledgementsConfig,
88        keepalive_settings: KeepaliveConfig,
89    ) -> crate::Result<crate::sources::Source> {
90        let tls = MaybeTlsSettings::from_config(tls, true)?;
91        let protocol = tls.http_protocol_name();
92        let auth_matcher = auth
93            .map(|a| a.build(&cx.enrichment_tables, &cx.metrics_storage))
94            .transpose()?;
95        let path = path.to_owned();
96        let acknowledgements = cx.do_acknowledgements(acknowledgements);
97        let enable_source_ip = self.enable_source_ip();
98
99        Ok(Box::pin(async move {
100            let mut filter: BoxedFilter<()> = match method {
101                HttpMethod::Head => warp::head().boxed(),
102                HttpMethod::Get => warp::get().boxed(),
103                HttpMethod::Put => warp::put().boxed(),
104                HttpMethod::Post => warp::post().boxed(),
105                HttpMethod::Patch => warp::patch().boxed(),
106                HttpMethod::Delete => warp::delete().boxed(),
107                HttpMethod::Options => warp::options().boxed(),
108            };
109
110            // https://github.com/rust-lang/rust-clippy/issues/8148
111            #[allow(clippy::unnecessary_to_owned)]
112            for s in path.split('/').filter(|&x| !x.is_empty()) {
113                filter = filter.and(warp::path(s.to_string())).boxed()
114            }
115            let svc = filter
116                .and(warp::path::tail())
117                .and_then(move |tail: Tail| async move {
118                    if !strict_path || tail.as_str().is_empty() {
119                        Ok(())
120                    } else {
121                        emit!(HttpInternalError {
122                            message: "Path not found."
123                        });
124                        Err(warp::reject::custom(ErrorMessage::new(
125                            StatusCode::NOT_FOUND,
126                            "Not found".to_string(),
127                        )))
128                    }
129                })
130                .untuple_one()
131                .and(warp::path::full())
132                .and(warp::header::optional::<String>("content-encoding"))
133                .and(warp::header::headers_cloned())
134                .and(warp::body::bytes())
135                .and(warp::query::<HashMap<String, String>>())
136                .and(warp::filters::ext::optional())
137                .and_then(
138                    move |path: FullPath,
139                          encoding_header: Option<String>,
140                          headers: HeaderMap,
141                          body: Bytes,
142                          query_parameters: HashMap<String, String>,
143                          addr: Option<PeerAddr>| {
144                        debug!(message = "Handling HTTP request.", headers = ?headers);
145                        let http_path = path.as_str();
146                        let events = auth_matcher
147                            .as_ref()
148                            .map_or(Ok(None), |a| {
149                                a.handle_auth(
150                                    addr.as_ref().map(|a| a.0).as_ref(),
151                                    &headers,
152                                    path.as_str(),
153                                )
154                            })
155                            .and_then(|auth_enrichment| {
156                                self.decode(encoding_header.as_deref(), body)
157                                    .map(|body| (body, auth_enrichment))
158                            })
159                            .and_then(|(body, auth_enrichment)| {
160                                emit!(HttpBytesReceived {
161                                    byte_size: body.len(),
162                                    http_path,
163                                    protocol,
164                                });
165                                self.build_events(body, &headers, &query_parameters, path.as_str())
166                                    .map(|events| (events, auth_enrichment))
167                            })
168                            .map(|(mut events, auth_enrichment)| {
169                                emit!(HttpEventsReceived {
170                                    count: events.len(),
171                                    byte_size: events.estimated_json_encoded_size_of(),
172                                    http_path,
173                                    protocol,
174                                });
175
176                                self.enrich_events(
177                                    &mut events,
178                                    path.as_str(),
179                                    &headers,
180                                    &query_parameters,
181                                    addr.and_then(|a| enable_source_ip.then_some(a))
182                                        .map(|PeerAddr(inner_addr)| inner_addr)
183                                        .as_ref(),
184                                );
185
186                                if let Some(enrichment) = auth_enrichment {
187                                    self.inject_auth_enrichment(&mut events, enrichment);
188                                }
189
190                                events
191                            });
192
193                        handle_request(events, acknowledgements, response_code, cx.out.clone())
194                    },
195                );
196
197            let ping = warp::get().and(warp::path("ping")).map(|| "pong");
198            let routes = svc.or(ping).recover(|r: Rejection| async move {
199                if let Some(e_msg) = r.find::<ErrorMessage>() {
200                    let json = warp::reply::json(e_msg);
201                    Ok(warp::reply::with_status(json, e_msg.status_code()))
202                } else {
203                    //other internal error - will return 500 internal server error
204                    emit!(HttpInternalError {
205                        message: &format!("Internal error: {r:?}")
206                    });
207                    Err(r)
208                }
209            });
210
211            let span = Span::current();
212            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
213                let remote_addr = conn.peer_addr();
214                let svc = ServiceBuilder::new()
215                    .layer(build_http_trace_layer(span.clone()))
216                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
217                        MaxConnectionAgeLayer::new(
218                            Duration::from_secs(secs),
219                            keepalive_settings.max_connection_age_jitter_factor,
220                            remote_addr,
221                        )
222                    }))
223                    .map_request(move |mut request: hyper::Request<_>| {
224                        request.extensions_mut().insert(PeerAddr::new(remote_addr));
225
226                        request
227                    })
228                    .service(warp::service(routes.clone()));
229                futures_util::future::ok::<_, Infallible>(svc)
230            });
231
232            info!(message = "Building HTTP server.", address = %address);
233
234            let listener = tls.bind(&address).await.map_err(|err| {
235                error!("An error occurred: {:?}.", err);
236            })?;
237
238            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
239                .serve(make_svc)
240                .with_graceful_shutdown(cx.shutdown.map(|_| ()))
241                .await
242                .map_err(|err| {
243                    error!("An error occurred: {:?}.", err);
244                })?;
245
246            Ok(())
247        }))
248    }
249
250    fn enable_source_ip(&self) -> bool {
251        false
252    }
253}
254
255#[derive(Clone)]
256#[repr(transparent)]
257struct PeerAddr(SocketAddr);
258
259impl PeerAddr {
260    const fn new(addr: SocketAddr) -> Self {
261        Self(addr)
262    }
263}
264
265struct RejectShuttingDown;
266
267impl fmt::Debug for RejectShuttingDown {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.write_str("shutting down")
270    }
271}
272
273impl warp::reject::Reject for RejectShuttingDown {}
274
275async fn handle_request(
276    events: Result<Vec<Event>, ErrorMessage>,
277    acknowledgements: bool,
278    response_code: StatusCode,
279    mut out: SourceSender,
280) -> Result<impl warp::Reply, Rejection> {
281    match events {
282        Ok(mut events) => {
283            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
284
285            let count = events.len();
286            out.send_batch(events)
287                .map_err(|_| {
288                    // can only fail if receiving end disconnected, so we are shutting down,
289                    // probably not gracefully.
290                    emit!(StreamClosedError { count });
291                    warp::reject::custom(RejectShuttingDown)
292                })
293                .and_then(|_| handle_batch_status(response_code, receiver))
294                .await
295        }
296        Err(error) => {
297            emit!(HttpBadRequest::new(error.code(), error.message()));
298            Err(warp::reject::custom(error))
299        }
300    }
301}
302
303async fn handle_batch_status(
304    success_response_code: StatusCode,
305    receiver: Option<BatchStatusReceiver>,
306) -> Result<impl warp::Reply, Rejection> {
307    match receiver {
308        None => Ok(success_response_code),
309        Some(receiver) => match receiver.await {
310            BatchStatus::Delivered => Ok(success_response_code),
311            BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
312                StatusCode::INTERNAL_SERVER_ERROR,
313                "Error delivering contents to sink".into(),
314            ))),
315            BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
316                StatusCode::BAD_REQUEST,
317                "Contents failed to deliver to sink".into(),
318            ))),
319        },
320    }
321}