vector/sources/util/http/
prelude.rs1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 config::SourceAcknowledgementsConfig,
12 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use vrl::value::ObjectMap;
15use warp::{
16 Filter,
17 filters::{
18 BoxedFilter,
19 path::{FullPath, Tail},
20 },
21 http::{HeaderMap, StatusCode},
22 reject::Rejection,
23};
24
25use super::encoding::decompress_body;
26use crate::{
27 SourceSender,
28 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
29 config::SourceContext,
30 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
31 internal_events::{
32 HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
33 },
34 sources::util::http::HttpMethod,
35 tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
36};
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39 fn enrich_events(
43 &self,
44 _events: &mut [Event],
45 _request_path: &str,
46 _headers_config: &HeaderMap,
47 _query_parameters: &HashMap<String, String>,
48 _source_ip: Option<&SocketAddr>,
49 ) {
50 }
51
52 fn build_events(
53 &self,
54 body: Bytes,
55 header_map: &HeaderMap,
56 query_parameters: &HashMap<String, String>,
57 path: &str,
58 ) -> Result<Vec<Event>, ErrorMessage>;
59
60 fn inject_auth_enrichment(&self, _events: &mut [Event], enrichment: ObjectMap) {
63 if !enrichment.is_empty() {
64 warn!(
65 message = "Auth metadata enrichment is not supported by this source and will be dropped. \
66 Remove %field writes from the custom auth VRL program or switch to a source that supports enrichment.",
67 fields = ?enrichment.keys().collect::<Vec<_>>(),
68 );
69 }
70 }
71
72 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
73 decompress_body(encoding_header, body)
74 }
75
76 #[allow(clippy::too_many_arguments)]
77 fn run(
78 self,
79 address: SocketAddr,
80 path: &str,
81 method: HttpMethod,
82 response_code: StatusCode,
83 strict_path: bool,
84 tls: Option<&TlsEnableableConfig>,
85 auth: Option<&HttpServerAuthConfig>,
86 cx: SourceContext,
87 acknowledgements: SourceAcknowledgementsConfig,
88 keepalive_settings: KeepaliveConfig,
89 ) -> crate::Result<crate::sources::Source> {
90 let tls = MaybeTlsSettings::from_config(tls, true)?;
91 let protocol = tls.http_protocol_name();
92 let auth_matcher = auth
93 .map(|a| a.build(&cx.enrichment_tables, &cx.metrics_storage))
94 .transpose()?;
95 let path = path.to_owned();
96 let acknowledgements = cx.do_acknowledgements(acknowledgements);
97 let enable_source_ip = self.enable_source_ip();
98
99 Ok(Box::pin(async move {
100 let mut filter: BoxedFilter<()> = match method {
101 HttpMethod::Head => warp::head().boxed(),
102 HttpMethod::Get => warp::get().boxed(),
103 HttpMethod::Put => warp::put().boxed(),
104 HttpMethod::Post => warp::post().boxed(),
105 HttpMethod::Patch => warp::patch().boxed(),
106 HttpMethod::Delete => warp::delete().boxed(),
107 HttpMethod::Options => warp::options().boxed(),
108 };
109
110 #[allow(clippy::unnecessary_to_owned)]
112 for s in path.split('/').filter(|&x| !x.is_empty()) {
113 filter = filter.and(warp::path(s.to_string())).boxed()
114 }
115 let svc = filter
116 .and(warp::path::tail())
117 .and_then(move |tail: Tail| async move {
118 if !strict_path || tail.as_str().is_empty() {
119 Ok(())
120 } else {
121 emit!(HttpInternalError {
122 message: "Path not found."
123 });
124 Err(warp::reject::custom(ErrorMessage::new(
125 StatusCode::NOT_FOUND,
126 "Not found".to_string(),
127 )))
128 }
129 })
130 .untuple_one()
131 .and(warp::path::full())
132 .and(warp::header::optional::<String>("content-encoding"))
133 .and(warp::header::headers_cloned())
134 .and(warp::body::bytes())
135 .and(warp::query::<HashMap<String, String>>())
136 .and(warp::filters::ext::optional())
137 .and_then(
138 move |path: FullPath,
139 encoding_header: Option<String>,
140 headers: HeaderMap,
141 body: Bytes,
142 query_parameters: HashMap<String, String>,
143 addr: Option<PeerAddr>| {
144 debug!(message = "Handling HTTP request.", headers = ?headers);
145 let http_path = path.as_str();
146 let events = auth_matcher
147 .as_ref()
148 .map_or(Ok(None), |a| {
149 a.handle_auth(
150 addr.as_ref().map(|a| a.0).as_ref(),
151 &headers,
152 path.as_str(),
153 )
154 })
155 .and_then(|auth_enrichment| {
156 self.decode(encoding_header.as_deref(), body)
157 .map(|body| (body, auth_enrichment))
158 })
159 .and_then(|(body, auth_enrichment)| {
160 emit!(HttpBytesReceived {
161 byte_size: body.len(),
162 http_path,
163 protocol,
164 });
165 self.build_events(body, &headers, &query_parameters, path.as_str())
166 .map(|events| (events, auth_enrichment))
167 })
168 .map(|(mut events, auth_enrichment)| {
169 emit!(HttpEventsReceived {
170 count: events.len(),
171 byte_size: events.estimated_json_encoded_size_of(),
172 http_path,
173 protocol,
174 });
175
176 self.enrich_events(
177 &mut events,
178 path.as_str(),
179 &headers,
180 &query_parameters,
181 addr.and_then(|a| enable_source_ip.then_some(a))
182 .map(|PeerAddr(inner_addr)| inner_addr)
183 .as_ref(),
184 );
185
186 if let Some(enrichment) = auth_enrichment {
187 self.inject_auth_enrichment(&mut events, enrichment);
188 }
189
190 events
191 });
192
193 handle_request(events, acknowledgements, response_code, cx.out.clone())
194 },
195 );
196
197 let ping = warp::get().and(warp::path("ping")).map(|| "pong");
198 let routes = svc.or(ping).recover(|r: Rejection| async move {
199 if let Some(e_msg) = r.find::<ErrorMessage>() {
200 let json = warp::reply::json(e_msg);
201 Ok(warp::reply::with_status(json, e_msg.status_code()))
202 } else {
203 emit!(HttpInternalError {
205 message: &format!("Internal error: {r:?}")
206 });
207 Err(r)
208 }
209 });
210
211 let span = Span::current();
212 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
213 let remote_addr = conn.peer_addr();
214 let svc = ServiceBuilder::new()
215 .layer(build_http_trace_layer(span.clone()))
216 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
217 MaxConnectionAgeLayer::new(
218 Duration::from_secs(secs),
219 keepalive_settings.max_connection_age_jitter_factor,
220 remote_addr,
221 )
222 }))
223 .map_request(move |mut request: hyper::Request<_>| {
224 request.extensions_mut().insert(PeerAddr::new(remote_addr));
225
226 request
227 })
228 .service(warp::service(routes.clone()));
229 futures_util::future::ok::<_, Infallible>(svc)
230 });
231
232 info!(message = "Building HTTP server.", address = %address);
233
234 let listener = tls.bind(&address).await.map_err(|err| {
235 error!("An error occurred: {:?}.", err);
236 })?;
237
238 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
239 .serve(make_svc)
240 .with_graceful_shutdown(cx.shutdown.map(|_| ()))
241 .await
242 .map_err(|err| {
243 error!("An error occurred: {:?}.", err);
244 })?;
245
246 Ok(())
247 }))
248 }
249
250 fn enable_source_ip(&self) -> bool {
251 false
252 }
253}
254
255#[derive(Clone)]
256#[repr(transparent)]
257struct PeerAddr(SocketAddr);
258
259impl PeerAddr {
260 const fn new(addr: SocketAddr) -> Self {
261 Self(addr)
262 }
263}
264
265struct RejectShuttingDown;
266
267impl fmt::Debug for RejectShuttingDown {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.write_str("shutting down")
270 }
271}
272
273impl warp::reject::Reject for RejectShuttingDown {}
274
275async fn handle_request(
276 events: Result<Vec<Event>, ErrorMessage>,
277 acknowledgements: bool,
278 response_code: StatusCode,
279 mut out: SourceSender,
280) -> Result<impl warp::Reply, Rejection> {
281 match events {
282 Ok(mut events) => {
283 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
284
285 let count = events.len();
286 out.send_batch(events)
287 .map_err(|_| {
288 emit!(StreamClosedError { count });
291 warp::reject::custom(RejectShuttingDown)
292 })
293 .and_then(|_| handle_batch_status(response_code, receiver))
294 .await
295 }
296 Err(error) => {
297 emit!(HttpBadRequest::new(error.code(), error.message()));
298 Err(warp::reject::custom(error))
299 }
300 }
301}
302
303async fn handle_batch_status(
304 success_response_code: StatusCode,
305 receiver: Option<BatchStatusReceiver>,
306) -> Result<impl warp::Reply, Rejection> {
307 match receiver {
308 None => Ok(success_response_code),
309 Some(receiver) => match receiver.await {
310 BatchStatus::Delivered => Ok(success_response_code),
311 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
312 StatusCode::INTERNAL_SERVER_ERROR,
313 "Error delivering contents to sink".into(),
314 ))),
315 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
316 StatusCode::BAD_REQUEST,
317 "Contents failed to deliver to sink".into(),
318 ))),
319 },
320 }
321}