vector/sources/
http_server.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9    codecs::{
10        BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11        NewlineDelimitedDecoderConfig,
12        decoding::{DeserializerConfig, FramingConfig},
13    },
14    config::{DataType, LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    lookup::{PathPrefix, lookup_v2::OptionalValuePath, owned_value_path, path},
17    schema::Definition,
18};
19use vrl::{
20    path::ValuePath as _,
21    value::{Kind, ObjectMap, kind::Collection},
22};
23use warp::http::HeaderMap;
24
25use crate::{
26    codecs::{Decoder, DecodingConfig},
27    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28    config::{
29        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30        SourceOutput,
31    },
32    event::Event,
33    http::KeepaliveConfig,
34    serde::{bool_or_struct, default_decoding},
35    sources::util::{
36        Encoding, HttpSource,
37        http::{HttpMethod, add_headers, add_query_parameters},
38    },
39    tls::TlsEnableableConfig,
40};
41
42/// Configuration for the `http` source.
43#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
44#[configurable(metadata(deprecated))]
45#[derive(Clone, Debug)]
46pub struct HttpConfig(SimpleHttpConfig);
47
48impl GenerateConfig for HttpConfig {
49    fn generate_config() -> toml::Value {
50        <SimpleHttpConfig as GenerateConfig>::generate_config()
51    }
52}
53
54#[async_trait::async_trait]
55#[typetag::serde(name = "http")]
56impl SourceConfig for HttpConfig {
57    async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
58        self.0.build(cx).await
59    }
60
61    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
62        self.0.outputs(global_log_namespace)
63    }
64
65    fn resources(&self) -> Vec<Resource> {
66        self.0.resources()
67    }
68
69    fn can_acknowledge(&self) -> bool {
70        self.0.can_acknowledge()
71    }
72}
73
74/// Configuration for the `http_server` source.
75#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
76#[derive(Clone, Debug)]
77pub struct SimpleHttpConfig {
78    /// The socket address to listen for connections on.
79    ///
80    /// It _must_ include a port.
81    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82    #[configurable(metadata(docs::examples = "localhost:80"))]
83    address: SocketAddr,
84
85    /// The expected encoding of received data.
86    ///
87    /// For `json` and `ndjson` encodings, the fields of the JSON objects are output as separate fields.
88    #[configurable(deprecated)]
89    #[serde(default)]
90    encoding: Option<Encoding>,
91
92    /// A list of HTTP headers to include in the log event.
93    ///
94    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
95    ///
96    /// Specifying "*" results in all headers included in the log event.
97    ///
98    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
99    #[serde(default)]
100    #[configurable(metadata(docs::examples = "User-Agent"))]
101    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
102    #[configurable(metadata(docs::examples = "X-*"))]
103    #[configurable(metadata(docs::examples = "*"))]
104    headers: Vec<String>,
105
106    /// A list of URL query parameters to include in the log event.
107    ///
108    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
109    ///
110    /// Specifying "*" results in all query parameters included in the log event.
111    ///
112    /// These override any values included in the body with conflicting names.
113    #[serde(default)]
114    #[configurable(metadata(docs::examples = "application"))]
115    #[configurable(metadata(docs::examples = "source"))]
116    #[configurable(metadata(docs::examples = "param*"))]
117    #[configurable(metadata(docs::examples = "*"))]
118    query_parameters: Vec<String>,
119
120    /// HTTP authentication configuration.
121    ///
122    /// Use HTTP authentication with HTTPS only. The authentication credentials are passed as an
123    /// HTTP header without any additional encryption beyond what is provided by the transport itself.
124    ///
125    /// When using the `custom` strategy, the VRL program may write `%field = value` to enrich
126    /// authenticated events. These metadata fields are injected into the event body (legacy
127    /// namespace) or under `http_server.<field>` in event metadata (Vector namespace).
128    #[configurable(derived)]
129    auth: Option<HttpServerAuthConfig>,
130
131    /// Whether or not to treat the configured `path` as an absolute path.
132    ///
133    /// If set to `true`, only requests using the exact URL path specified in `path` are accepted. Otherwise,
134    /// requests sent to a URL path that starts with the value of `path` are accepted.
135    ///
136    /// With `strict_path` set to `false` and `path` set to `""`, the configured HTTP source accepts requests from
137    /// any URL path.
138    #[serde(default = "crate::serde::default_true")]
139    strict_path: bool,
140
141    /// The URL path on which log event POST requests are sent.
142    #[serde(default = "default_path")]
143    #[configurable(metadata(docs::examples = "/event/path"))]
144    #[configurable(metadata(docs::examples = "/logs"))]
145    path: String,
146
147    /// The event key in which the requested URL path used to send the request is stored.
148    #[serde(default = "default_path_key")]
149    #[configurable(metadata(docs::examples = "vector_http_path"))]
150    path_key: OptionalValuePath,
151
152    /// If set, the name of the log field used to add the remote IP to each event
153    #[serde(default = "default_host_key")]
154    #[configurable(metadata(docs::examples = "hostname"))]
155    host_key: OptionalValuePath,
156
157    /// Specifies the action of the HTTP request.
158    #[serde(default = "default_http_method")]
159    method: HttpMethod,
160
161    /// Specifies the HTTP response status code that will be returned on successful requests.
162    #[configurable(metadata(docs::examples = 202))]
163    #[configurable(metadata(docs::numeric_type = "uint"))]
164    #[serde(with = "http_serde::status_code")]
165    #[serde(default = "default_http_response_code")]
166    response_code: StatusCode,
167
168    #[configurable(derived)]
169    tls: Option<TlsEnableableConfig>,
170
171    #[configurable(derived)]
172    framing: Option<FramingConfig>,
173
174    #[configurable(derived)]
175    decoding: Option<DeserializerConfig>,
176
177    #[configurable(derived)]
178    #[serde(default, deserialize_with = "bool_or_struct")]
179    acknowledgements: SourceAcknowledgementsConfig,
180
181    /// The namespace to use for logs. This overrides the global setting.
182    #[configurable(metadata(docs::hidden))]
183    #[serde(default)]
184    log_namespace: Option<bool>,
185
186    #[configurable(derived)]
187    #[serde(default)]
188    keepalive: KeepaliveConfig,
189}
190
191impl SimpleHttpConfig {
192    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
193    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
194        let mut schema_definition = self
195            .decoding
196            .as_ref()
197            .unwrap_or(&default_decoding())
198            .schema_definition(log_namespace)
199            .with_source_metadata(
200                SimpleHttpConfig::NAME,
201                self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
202                &owned_value_path!("path"),
203                Kind::bytes(),
204                None,
205            )
206            // for metadata that is added to the events dynamically from the self.headers
207            .with_source_metadata(
208                SimpleHttpConfig::NAME,
209                None,
210                &owned_value_path!("headers"),
211                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
212                None,
213            )
214            // for metadata that is added to the events dynamically from the self.query_parameters
215            .with_source_metadata(
216                SimpleHttpConfig::NAME,
217                None,
218                &owned_value_path!("query_parameters"),
219                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
220                None,
221            )
222            .with_source_metadata(
223                SimpleHttpConfig::NAME,
224                self.host_key.path.clone().map(LegacyKey::Overwrite),
225                &owned_value_path!("host"),
226                Kind::bytes().or_undefined(),
227                None,
228            )
229            .with_standard_vector_source_metadata();
230
231        // For metadata that is added to the events dynamically from config options.
232        if log_namespace == LogNamespace::Legacy {
233            // Custom auth programs can inject any VRL value, not just bytes; widen the unknown
234            // field kind accordingly so schema-aware downstream components don't reject events.
235            let unknown_kind = if matches!(self.auth, Some(HttpServerAuthConfig::Custom { .. })) {
236                Kind::any()
237            } else {
238                Kind::bytes()
239            };
240            schema_definition = schema_definition.unknown_fields(unknown_kind);
241        }
242
243        schema_definition
244    }
245
246    fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
247        if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
248            return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
249        }
250
251        let (framing, decoding) = if let Some(encoding) = self.encoding {
252            match encoding {
253                Encoding::Text => (
254                    NewlineDelimitedDecoderConfig::new().into(),
255                    BytesDeserializerConfig::new().into(),
256                ),
257                Encoding::Json => (
258                    BytesDecoderConfig::new().into(),
259                    JsonDeserializerConfig::default().into(),
260                ),
261                Encoding::Ndjson => (
262                    NewlineDelimitedDecoderConfig::new().into(),
263                    JsonDeserializerConfig::default().into(),
264                ),
265                Encoding::Binary => (
266                    BytesDecoderConfig::new().into(),
267                    BytesDeserializerConfig::new().into(),
268                ),
269            }
270        } else {
271            let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
272            let framing = self
273                .framing
274                .clone()
275                .unwrap_or_else(|| decoding.default_stream_framing());
276            (framing, decoding)
277        };
278
279        Ok(DecodingConfig::new(
280            framing,
281            decoding,
282            self.log_namespace.unwrap_or(false).into(),
283        ))
284    }
285}
286
287impl Default for SimpleHttpConfig {
288    fn default() -> Self {
289        Self {
290            address: "0.0.0.0:8080".parse().unwrap(),
291            encoding: None,
292            headers: Vec::new(),
293            query_parameters: Vec::new(),
294            tls: None,
295            auth: None,
296            path: default_path(),
297            path_key: default_path_key(),
298            host_key: default_host_key(),
299            method: default_http_method(),
300            response_code: default_http_response_code(),
301            strict_path: true,
302            framing: None,
303            decoding: Some(default_decoding()),
304            acknowledgements: SourceAcknowledgementsConfig::default(),
305            log_namespace: None,
306            keepalive: KeepaliveConfig::default(),
307        }
308    }
309}
310
311impl_generate_config_from_default!(SimpleHttpConfig);
312
313const fn default_http_method() -> HttpMethod {
314    HttpMethod::Post
315}
316
317fn default_path() -> String {
318    "/".to_string()
319}
320
321fn default_path_key() -> OptionalValuePath {
322    OptionalValuePath::from(owned_value_path!("path"))
323}
324
325fn default_host_key() -> OptionalValuePath {
326    OptionalValuePath::none()
327}
328
329const fn default_http_response_code() -> StatusCode {
330    StatusCode::OK
331}
332
333/// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed.
334pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
335    list.sort();
336
337    let mut dedup = false;
338    for (idx, name) in list.iter().enumerate() {
339        if idx < list.len() - 1 && list[idx] == list[idx + 1] {
340            warn!(
341                "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
342                list_name, name
343            );
344            dedup = true;
345        }
346    }
347
348    if dedup {
349        list.dedup();
350    }
351    list
352}
353
354/// Convert [`SocketAddr`] into a string, returning only the IP address.
355fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
356    addr.ip().to_string()
357}
358
359#[derive(Clone)]
360pub enum HttpConfigParamKind {
361    Glob(glob::Pattern),
362    Exact(String),
363}
364
365pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
366    list.iter()
367        .map(|s| match s.contains('*') {
368            true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
369            false => Ok(HttpConfigParamKind::Exact(s.to_string())),
370        })
371        .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
372}
373
374#[async_trait::async_trait]
375#[typetag::serde(name = "http_server")]
376impl SourceConfig for SimpleHttpConfig {
377    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
378        let log_namespace = cx.log_namespace(self.log_namespace);
379        let decoder = self
380            .get_decoding_config()?
381            .build()?
382            .with_log_namespace(log_namespace);
383
384        let source = SimpleHttpSource {
385            headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
386            query_parameters: build_param_matcher(&remove_duplicates(
387                self.query_parameters.clone(),
388                "query_parameters",
389            ))?,
390            path_key: self.path_key.clone(),
391            host_key: self.host_key.clone(),
392            decoder,
393            log_namespace,
394        };
395        source.run(
396            self.address,
397            self.path.as_str(),
398            self.method,
399            self.response_code,
400            self.strict_path,
401            self.tls.as_ref(),
402            self.auth.as_ref(),
403            cx,
404            self.acknowledgements,
405            self.keepalive.clone(),
406        )
407    }
408
409    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
410        // There is a global and per-source `log_namespace` config.
411        // The source config overrides the global setting and is merged here.
412        let log_namespace = global_log_namespace.merge(self.log_namespace);
413
414        let schema_definition = self.schema_definition(log_namespace);
415
416        vec![SourceOutput::new_maybe_logs(
417            self.decoding
418                .as_ref()
419                .map(|d| d.output_type())
420                .unwrap_or(DataType::Log),
421            schema_definition,
422        )]
423    }
424
425    fn resources(&self) -> Vec<Resource> {
426        vec![Resource::tcp(self.address)]
427    }
428
429    fn can_acknowledge(&self) -> bool {
430        true
431    }
432}
433
434#[derive(Clone)]
435struct SimpleHttpSource {
436    headers: Vec<HttpConfigParamKind>,
437    query_parameters: Vec<HttpConfigParamKind>,
438    path_key: OptionalValuePath,
439    host_key: OptionalValuePath,
440    decoder: Decoder,
441    log_namespace: LogNamespace,
442}
443
444impl HttpSource for SimpleHttpSource {
445    /// Enriches the log events with metadata for the `request_path` and for each of the headers.
446    /// Non-log events are skipped.
447    fn enrich_events(
448        &self,
449        events: &mut [Event],
450        request_path: &str,
451        headers: &HeaderMap,
452        query_parameters: &HashMap<String, String>,
453        source_ip: Option<&SocketAddr>,
454    ) {
455        let now = Utc::now();
456        for event in events.iter_mut() {
457            match event {
458                Event::Log(log) => {
459                    // add request_path to each event
460                    self.log_namespace.insert_source_metadata(
461                        SimpleHttpConfig::NAME,
462                        log,
463                        self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
464                        path!("path"),
465                        request_path.to_owned(),
466                    );
467
468                    self.log_namespace.insert_standard_vector_source_metadata(
469                        log,
470                        SimpleHttpConfig::NAME,
471                        now,
472                    );
473
474                    if let Some(addr) = source_ip {
475                        self.log_namespace.insert_source_metadata(
476                            SimpleHttpConfig::NAME,
477                            log,
478                            self.host_key.path.as_ref().map(LegacyKey::Overwrite),
479                            path!("host"),
480                            socket_addr_to_ip_string(addr),
481                        );
482                    }
483                }
484                _ => {
485                    continue;
486                }
487            }
488        }
489
490        add_headers(
491            events,
492            &self.headers,
493            headers,
494            self.log_namespace,
495            SimpleHttpConfig::NAME,
496        );
497
498        add_query_parameters(
499            events,
500            &self.query_parameters,
501            query_parameters,
502            self.log_namespace,
503            SimpleHttpConfig::NAME,
504        );
505    }
506
507    fn build_events(
508        &self,
509        body: Bytes,
510        _header_map: &HeaderMap,
511        _query_parameters: &HashMap<String, String>,
512        _request_path: &str,
513    ) -> Result<Vec<Event>, ErrorMessage> {
514        let mut decoder = self.decoder.clone();
515        let mut events = Vec::new();
516        let mut bytes = BytesMut::new();
517        bytes.extend_from_slice(&body);
518
519        loop {
520            match decoder.decode_eof(&mut bytes) {
521                Ok(Some((next, _))) => {
522                    events.extend(next);
523                }
524                Ok(None) => break,
525                Err(error) => {
526                    // Error is logged / emitted by `vector_lib::codecs::Decoder`, no further
527                    // handling is needed here
528                    return Err(ErrorMessage::new(
529                        StatusCode::BAD_REQUEST,
530                        format!("Failed decoding body: {error}"),
531                    ));
532                }
533            }
534        }
535
536        Ok(events)
537    }
538
539    fn enable_source_ip(&self) -> bool {
540        self.host_key.path.is_some()
541    }
542
543    /// Injects `%field` enrichment from a `custom` auth VRL program into events.
544    /// Both namespaces use insert-if-empty semantics so auth enrichment never
545    /// overwrites built-in source metadata (`path`, `host`, `headers`, …) that
546    /// `enrich_events` already populated.
547    /// Vector namespace: inserted into event metadata under `http_server.<field>` for
548    ///   all event types (Log, Metric, Trace).
549    /// Legacy namespace: inserted into the Log event body only (Metric/Trace are skipped).
550    fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
551        for event in events.iter_mut() {
552            match self.log_namespace {
553                LogNamespace::Vector => {
554                    // metadata_mut() dispatches to Log, Metric, and Trace so every
555                    // decoded event type receives the auth enrichment fields.
556                    let meta = event.metadata_mut().value_mut();
557                    for (key, value) in &enrichment {
558                        let key_str = key.as_str();
559                        let name_part = path!(SimpleHttpConfig::NAME);
560                        let key_part = path!(key_str);
561                        let full_path = name_part.concat(key_part);
562                        if meta.get(full_path.clone()).is_none() {
563                            meta.insert(full_path, value.clone());
564                        }
565                    }
566                }
567                LogNamespace::Legacy => {
568                    // Legacy enrichment targets the event body; only Log events have one.
569                    if let Event::Log(log) = event {
570                        for (key, value) in &enrichment {
571                            log.try_insert((PathPrefix::Event, path!(key.as_str())), value.clone());
572                        }
573                    }
574                }
575            }
576        }
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use std::{io::Write, net::SocketAddr, str::FromStr};
583
584    use flate2::{
585        Compression,
586        write::{GzEncoder, ZlibEncoder},
587    };
588    use futures::Stream;
589    use headers::{Authorization, authorization::Credentials};
590    use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
591    use similar_asserts::assert_eq;
592    use vector_lib::{
593        codecs::{
594            BytesDecoderConfig, JsonDeserializerConfig,
595            decoding::{DeserializerConfig, FramingConfig},
596        },
597        config::LogNamespace,
598        event::LogEvent,
599        lookup::{
600            OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath,
601            owned_value_path, path,
602        },
603        schema::Definition,
604    };
605    use vrl::{
606        path::ValuePath as _,
607        value::{Kind, ObjectMap, kind::Collection},
608    };
609
610    use super::{SimpleHttpConfig, remove_duplicates};
611    use crate::{
612        SourceSender,
613        common::http::server_auth::HttpServerAuthConfig,
614        components::validation::prelude::*,
615        config::{SourceConfig, SourceContext, log_schema},
616        event::{Event, EventStatus, Value},
617        sources::http_server::HttpMethod,
618        test_util::{
619            addr::next_addr,
620            components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
621            spawn_collect_n, wait_for_tcp,
622        },
623    };
624
625    #[test]
626    fn generate_config() {
627        crate::test_util::test_generate_config::<SimpleHttpConfig>();
628    }
629
630    #[allow(clippy::too_many_arguments)]
631    async fn source<'a>(
632        headers: Vec<String>,
633        query_parameters: Vec<String>,
634        path_key: &'a str,
635        host_key: &'a str,
636        path: &'a str,
637        method: &'a str,
638        response_code: StatusCode,
639        auth: Option<HttpServerAuthConfig>,
640        strict_path: bool,
641        status: EventStatus,
642        acknowledgements: bool,
643        framing: Option<FramingConfig>,
644        decoding: Option<DeserializerConfig>,
645    ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
646        let (sender, recv) = SourceSender::new_test_finalize(status);
647        let (_guard, address) = next_addr();
648        let path = path.to_owned();
649        let host_key = OptionalValuePath::from(owned_value_path!(host_key));
650        let path_key = OptionalValuePath::from(owned_value_path!(path_key));
651        let context = SourceContext::new_test(sender, None);
652        let method = match Method::from_str(method).unwrap() {
653            Method::GET => HttpMethod::Get,
654            Method::POST => HttpMethod::Post,
655            _ => HttpMethod::Post,
656        };
657
658        tokio::spawn(async move {
659            SimpleHttpConfig {
660                address,
661                headers,
662                encoding: None,
663                query_parameters,
664                response_code,
665                tls: None,
666                auth,
667                strict_path,
668                path_key,
669                host_key,
670                path,
671                method,
672                framing,
673                decoding,
674                acknowledgements: acknowledgements.into(),
675                log_namespace: None,
676                keepalive: Default::default(),
677            }
678            .build(context)
679            .await
680            .unwrap()
681            .await
682            .unwrap();
683        });
684        wait_for_tcp(address).await;
685        (recv, address)
686    }
687
688    async fn send(address: SocketAddr, body: &str) -> u16 {
689        reqwest::Client::new()
690            .post(format!("http://{address}/"))
691            .body(body.to_owned())
692            .send()
693            .await
694            .unwrap()
695            .status()
696            .as_u16()
697    }
698
699    async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
700        reqwest::Client::new()
701            .post(format!("http://{address}/"))
702            .headers(headers)
703            .body(body.to_owned())
704            .send()
705            .await
706            .unwrap()
707            .status()
708            .as_u16()
709    }
710
711    async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
712        reqwest::Client::new()
713            .post(format!("http://{address}?{query}"))
714            .body(body.to_owned())
715            .send()
716            .await
717            .unwrap()
718            .status()
719            .as_u16()
720    }
721
722    async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
723        reqwest::Client::new()
724            .post(format!("http://{address}{path}"))
725            .body(body.to_owned())
726            .send()
727            .await
728            .unwrap()
729            .status()
730            .as_u16()
731    }
732
733    async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
734        let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
735        reqwest::Client::new()
736            .request(method, format!("http://{address}{path}"))
737            .body(body.to_owned())
738            .send()
739            .await
740            .unwrap()
741            .status()
742            .as_u16()
743    }
744
745    async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
746        reqwest::Client::new()
747            .post(format!("http://{address}/"))
748            .headers(headers)
749            .body(body)
750            .send()
751            .await
752            .unwrap()
753            .status()
754            .as_u16()
755    }
756
757    async fn spawn_ok_collect_n(
758        send: impl std::future::Future<Output = u16> + Send + 'static,
759        rx: impl Stream<Item = Event> + Unpin,
760        n: usize,
761    ) -> Vec<Event> {
762        spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
763    }
764
765    #[tokio::test]
766    async fn http_multiline_text() {
767        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
768            let body = "test body\ntest body 2";
769
770            let (rx, addr) = source(
771                vec![],
772                vec![],
773                "http_path",
774                "remote_ip",
775                "/",
776                "POST",
777                StatusCode::OK,
778                None,
779                true,
780                EventStatus::Delivered,
781                true,
782                None,
783                None,
784            )
785            .await;
786
787            spawn_ok_collect_n(send(addr, body), rx, 2).await
788        })
789        .await;
790
791        {
792            let event = events.remove(0);
793            let log = event.as_log();
794            assert_eq!(*log.get_message().unwrap(), "test body".into());
795            assert!(log.get_timestamp().is_some());
796            assert_eq!(
797                *log.get_source_type().unwrap(),
798                SimpleHttpConfig::NAME.into()
799            );
800            assert_eq!(log["http_path"], "/".into());
801            assert_event_metadata(log).await;
802        }
803        {
804            let event = events.remove(0);
805            let log = event.as_log();
806            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
807            assert_event_metadata(log).await;
808        }
809    }
810
811    #[tokio::test]
812    async fn http_multiline_text2() {
813        //same as above test but with a newline at the end
814        let body = "test body\ntest body 2\n";
815
816        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
817            let (rx, addr) = source(
818                vec![],
819                vec![],
820                "http_path",
821                "remote_ip",
822                "/",
823                "POST",
824                StatusCode::OK,
825                None,
826                true,
827                EventStatus::Delivered,
828                true,
829                None,
830                None,
831            )
832            .await;
833
834            spawn_ok_collect_n(send(addr, body), rx, 2).await
835        })
836        .await;
837
838        {
839            let event = events.remove(0);
840            let log = event.as_log();
841            assert_eq!(*log.get_message().unwrap(), "test body".into());
842            assert_event_metadata(log).await;
843        }
844        {
845            let event = events.remove(0);
846            let log = event.as_log();
847            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
848            assert_event_metadata(log).await;
849        }
850    }
851
852    #[tokio::test]
853    async fn http_bytes_codec_preserves_newlines() {
854        let body = "foo\nbar";
855
856        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
857            let (rx, addr) = source(
858                vec![],
859                vec![],
860                "http_path",
861                "remote_ip",
862                "/",
863                "POST",
864                StatusCode::OK,
865                None,
866                true,
867                EventStatus::Delivered,
868                true,
869                Some(BytesDecoderConfig::new().into()),
870                None,
871            )
872            .await;
873
874            spawn_ok_collect_n(send(addr, body), rx, 1).await
875        })
876        .await;
877
878        assert_eq!(events.len(), 1);
879
880        {
881            let event = events.remove(0);
882            let log = event.as_log();
883            assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
884            assert_event_metadata(log).await;
885        }
886    }
887
888    #[tokio::test]
889    async fn http_json_parsing() {
890        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
891            let (rx, addr) = source(
892                vec![],
893                vec![],
894                "http_path",
895                "remote_ip",
896                "/",
897                "POST",
898                StatusCode::OK,
899                None,
900                true,
901                EventStatus::Delivered,
902                true,
903                None,
904                Some(JsonDeserializerConfig::default().into()),
905            )
906            .await;
907
908            spawn_collect_n(
909                async move {
910                    assert_eq!(400, send(addr, "{").await); //malformed
911                    assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value
912
913                    assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects
914                    assert_eq!(200, send(addr, "[{},{},{}]").await);
915                },
916                rx,
917                2,
918            )
919            .await
920        })
921        .await;
922
923        assert!(events.remove(1).as_log().get_timestamp().is_some());
924        assert!(events.remove(0).as_log().get_timestamp().is_some());
925    }
926
927    #[tokio::test]
928    async fn http_json_values() {
929        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
930            let (rx, addr) = source(
931                vec![],
932                vec![],
933                "http_path",
934                "remote_ip",
935                "/",
936                "POST",
937                StatusCode::OK,
938                None,
939                true,
940                EventStatus::Delivered,
941                true,
942                None,
943                Some(JsonDeserializerConfig::default().into()),
944            )
945            .await;
946
947            spawn_collect_n(
948                async move {
949                    assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
950                    assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
951                },
952                rx,
953                2,
954            )
955            .await
956        })
957        .await;
958
959        {
960            let event = events.remove(0);
961            let log = event.as_log();
962            assert_eq!(log["key"], "value".into());
963            assert_event_metadata(log).await;
964        }
965        {
966            let event = events.remove(0);
967            let log = event.as_log();
968            assert_eq!(log["key2"], "value2".into());
969            assert_event_metadata(log).await;
970        }
971    }
972
973    #[tokio::test]
974    async fn http_json_dotted_keys() {
975        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
976            let (rx, addr) = source(
977                vec![],
978                vec![],
979                "http_path",
980                "remote_ip",
981                "/",
982                "POST",
983                StatusCode::OK,
984                None,
985                true,
986                EventStatus::Delivered,
987                true,
988                None,
989                Some(JsonDeserializerConfig::default().into()),
990            )
991            .await;
992
993            spawn_collect_n(
994                async move {
995                    assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
996                    assert_eq!(
997                        200,
998                        send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
999                    );
1000                },
1001                rx,
1002                2,
1003            )
1004            .await
1005        })
1006        .await;
1007
1008        {
1009            let event = events.remove(0);
1010            let log = event.as_log();
1011            assert_eq!(
1012                log.get(event_path!("dotted.key")).unwrap(),
1013                &Value::from("value")
1014            );
1015        }
1016        {
1017            let event = events.remove(0);
1018            let log = event.as_log();
1019            let mut map = ObjectMap::new();
1020            map.insert("dotted.key2".into(), Value::from("value2"));
1021            assert_eq!(log["nested"], map.into());
1022        }
1023    }
1024
1025    #[tokio::test]
1026    async fn http_ndjson() {
1027        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1028            let (rx, addr) = source(
1029                vec![],
1030                vec![],
1031                "http_path",
1032                "remote_ip",
1033                "/",
1034                "POST",
1035                StatusCode::OK,
1036                None,
1037                true,
1038                EventStatus::Delivered,
1039                true,
1040                None,
1041                Some(JsonDeserializerConfig::default().into()),
1042            )
1043            .await;
1044
1045            spawn_collect_n(
1046                async move {
1047                    assert_eq!(
1048                        200,
1049                        send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
1050                    );
1051
1052                    assert_eq!(
1053                        200,
1054                        send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
1055                    );
1056                },
1057                rx,
1058                4,
1059            )
1060            .await
1061        })
1062        .await;
1063
1064        {
1065            let event = events.remove(0);
1066            let log = event.as_log();
1067            assert_eq!(log["key1"], "value1".into());
1068            assert_event_metadata(log).await;
1069        }
1070        {
1071            let event = events.remove(0);
1072            let log = event.as_log();
1073            assert_eq!(log["key2"], "value2".into());
1074            assert_event_metadata(log).await;
1075        }
1076        {
1077            let event = events.remove(0);
1078            let log = event.as_log();
1079            assert_eq!(log["key1"], "value1".into());
1080            assert_event_metadata(log).await;
1081        }
1082        {
1083            let event = events.remove(0);
1084            let log = event.as_log();
1085            assert_eq!(log["key2"], "value2".into());
1086            assert_event_metadata(log).await;
1087        }
1088    }
1089
1090    async fn assert_event_metadata(log: &LogEvent) {
1091        assert!(log.get_timestamp().is_some());
1092
1093        let source_type_key_value = log
1094            .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1095            .unwrap()
1096            .as_str()
1097            .unwrap();
1098        assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1099        assert_eq!(log["http_path"], "/".into());
1100    }
1101
1102    #[tokio::test]
1103    async fn http_headers() {
1104        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1105            let mut headers = HeaderMap::new();
1106            headers.insert("User-Agent", "test_client".parse().unwrap());
1107            headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1108            headers.insert("X-Test-Header", "true".parse().unwrap());
1109
1110            let (rx, addr) = source(
1111                vec![
1112                    "User-Agent".to_string(),
1113                    "Upgrade-Insecure-Requests".to_string(),
1114                    "X-*".to_string(),
1115                    "AbsentHeader".to_string(),
1116                ],
1117                vec![],
1118                "http_path",
1119                "remote_ip",
1120                "/",
1121                "POST",
1122                StatusCode::OK,
1123                None,
1124                true,
1125                EventStatus::Delivered,
1126                true,
1127                None,
1128                Some(JsonDeserializerConfig::default().into()),
1129            )
1130            .await;
1131
1132            spawn_ok_collect_n(
1133                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1134                rx,
1135                1,
1136            )
1137            .await
1138        })
1139        .await;
1140
1141        {
1142            let event = events.remove(0);
1143            let log = event.as_log();
1144            assert_eq!(log["key1"], "value1".into());
1145            assert_eq!(log["\"User-Agent\""], "test_client".into());
1146            assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1147            assert_eq!(log["\"x-test-header\""], "true".into());
1148            assert_eq!(log["AbsentHeader"], Value::Null);
1149            assert_event_metadata(log).await;
1150        }
1151    }
1152
1153    #[tokio::test]
1154    async fn http_headers_wildcard() {
1155        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1156            let mut headers = HeaderMap::new();
1157            headers.insert("User-Agent", "test_client".parse().unwrap());
1158            headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1159            // Header that conflicts with an existing field.
1160            headers.insert("key1", "value_from_header".parse().unwrap());
1161
1162            let (rx, addr) = source(
1163                vec!["*".to_string()],
1164                vec![],
1165                "http_path",
1166                "remote_ip",
1167                "/",
1168                "POST",
1169                StatusCode::OK,
1170                None,
1171                true,
1172                EventStatus::Delivered,
1173                true,
1174                None,
1175                Some(JsonDeserializerConfig::default().into()),
1176            )
1177            .await;
1178
1179            spawn_ok_collect_n(
1180                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1181                rx,
1182                1,
1183            )
1184            .await
1185        })
1186        .await;
1187
1188        {
1189            let event = events.remove(0);
1190            let log = event.as_log();
1191            assert_eq!(log["key1"], "value1".into());
1192            assert_eq!(log["\"user-agent\""], "test_client".into());
1193            assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1194            assert_event_metadata(log).await;
1195        }
1196    }
1197
1198    #[tokio::test]
1199    async fn http_query() {
1200        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1201            let (rx, addr) = source(
1202                vec![],
1203                vec![
1204                    "source".to_string(),
1205                    "region".to_string(),
1206                    "absent".to_string(),
1207                ],
1208                "http_path",
1209                "remote_ip",
1210                "/",
1211                "POST",
1212                StatusCode::OK,
1213                None,
1214                true,
1215                EventStatus::Delivered,
1216                true,
1217                None,
1218                Some(JsonDeserializerConfig::default().into()),
1219            )
1220            .await;
1221
1222            spawn_ok_collect_n(
1223                send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
1224                rx,
1225                1,
1226            )
1227            .await
1228        })
1229        .await;
1230
1231        {
1232            let event = events.remove(0);
1233            let log = event.as_log();
1234            assert_eq!(log["key1"], "value1".into());
1235            assert_eq!(log["source"], "staging".into());
1236            assert_eq!(log["region"], "gb".into());
1237            assert_eq!(log["absent"], Value::Null);
1238            assert_event_metadata(log).await;
1239        }
1240    }
1241
1242    #[tokio::test]
1243    async fn http_query_wildcard() {
1244        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1245            let (rx, addr) = source(
1246                vec![],
1247                vec!["*".to_string()],
1248                "http_path",
1249                "remote_ip",
1250                "/",
1251                "POST",
1252                StatusCode::OK,
1253                None,
1254                true,
1255                EventStatus::Delivered,
1256                true,
1257                None,
1258                Some(JsonDeserializerConfig::default().into()),
1259            )
1260            .await;
1261
1262            spawn_ok_collect_n(
1263                send_with_query(
1264                    addr,
1265                    "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1266                    "source=staging&region=gb&key1=value_from_query",
1267                ),
1268                rx,
1269                1,
1270            )
1271            .await
1272        })
1273        .await;
1274
1275        {
1276            let event = events.remove(0);
1277            let log = event.as_log();
1278            assert_eq!(log["key1"], "value_from_query".into());
1279            assert_eq!(log["key2"], "value2".into());
1280            assert_eq!(log["source"], "staging".into());
1281            assert_eq!(log["region"], "gb".into());
1282            assert_event_metadata(log).await;
1283        }
1284    }
1285
1286    #[tokio::test]
1287    async fn http_gzip_deflate() {
1288        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1289            let body = "test body";
1290
1291            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1292            encoder.write_all(body.as_bytes()).unwrap();
1293            let body = encoder.finish().unwrap();
1294
1295            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1296            encoder.write_all(body.as_slice()).unwrap();
1297            let body = encoder.finish().unwrap();
1298
1299            let mut headers = HeaderMap::new();
1300            headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1301
1302            let (rx, addr) = source(
1303                vec![],
1304                vec![],
1305                "http_path",
1306                "remote_ip",
1307                "/",
1308                "POST",
1309                StatusCode::OK,
1310                None,
1311                true,
1312                EventStatus::Delivered,
1313                true,
1314                None,
1315                None,
1316            )
1317            .await;
1318
1319            spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1320        })
1321        .await;
1322
1323        {
1324            let event = events.remove(0);
1325            let log = event.as_log();
1326            assert_eq!(*log.get_message().unwrap(), "test body".into());
1327            assert_event_metadata(log).await;
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn http_path() {
1333        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1334            let (rx, addr) = source(
1335                vec![],
1336                vec![],
1337                "vector_http_path",
1338                "vector_remote_ip",
1339                "/event/path",
1340                "POST",
1341                StatusCode::OK,
1342                None,
1343                true,
1344                EventStatus::Delivered,
1345                true,
1346                None,
1347                Some(JsonDeserializerConfig::default().into()),
1348            )
1349            .await;
1350
1351            spawn_ok_collect_n(
1352                send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1353                rx,
1354                1,
1355            )
1356            .await
1357        })
1358        .await;
1359
1360        {
1361            let event = events.remove(0);
1362            let log = event.as_log();
1363            assert_eq!(log["key1"], "value1".into());
1364            assert_eq!(log["vector_http_path"], "/event/path".into());
1365            assert!(log.get_timestamp().is_some());
1366            assert_eq!(
1367                *log.get_source_type().unwrap(),
1368                SimpleHttpConfig::NAME.into()
1369            );
1370        }
1371    }
1372
1373    #[tokio::test]
1374    async fn http_path_no_restriction() {
1375        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1376            let (rx, addr) = source(
1377                vec![],
1378                vec![],
1379                "vector_http_path",
1380                "vector_remote_ip",
1381                "/event",
1382                "POST",
1383                StatusCode::OK,
1384                None,
1385                false,
1386                EventStatus::Delivered,
1387                true,
1388                None,
1389                Some(JsonDeserializerConfig::default().into()),
1390            )
1391            .await;
1392
1393            spawn_collect_n(
1394                async move {
1395                    assert_eq!(
1396                        200,
1397                        send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1398                    );
1399                    assert_eq!(
1400                        200,
1401                        send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1402                    );
1403                },
1404                rx,
1405                2,
1406            )
1407            .await
1408        })
1409        .await;
1410
1411        {
1412            let event = events.remove(0);
1413            let log = event.as_log();
1414            assert_eq!(log["key1"], "value1".into());
1415            assert_eq!(log["vector_http_path"], "/event/path1".into());
1416            assert!(log.get_timestamp().is_some());
1417            assert_eq!(
1418                *log.get_source_type().unwrap(),
1419                SimpleHttpConfig::NAME.into()
1420            );
1421        }
1422        {
1423            let event = events.remove(0);
1424            let log = event.as_log();
1425            assert_eq!(log["key2"], "value2".into());
1426            assert_eq!(log["vector_http_path"], "/event/path2".into());
1427            assert!(log.get_timestamp().is_some());
1428            assert_eq!(
1429                *log.get_source_type().unwrap(),
1430                SimpleHttpConfig::NAME.into()
1431            );
1432        }
1433    }
1434
1435    #[tokio::test]
1436    async fn http_wrong_path() {
1437        components::init_test();
1438        let (_rx, addr) = source(
1439            vec![],
1440            vec![],
1441            "vector_http_path",
1442            "vector_remote_ip",
1443            "/",
1444            "POST",
1445            StatusCode::OK,
1446            None,
1447            true,
1448            EventStatus::Delivered,
1449            true,
1450            None,
1451            Some(JsonDeserializerConfig::default().into()),
1452        )
1453        .await;
1454
1455        assert_eq!(
1456            404,
1457            send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1458        );
1459    }
1460
1461    #[tokio::test]
1462    async fn http_status_code() {
1463        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1464            let (rx, addr) = source(
1465                vec![],
1466                vec![],
1467                "http_path",
1468                "remote_ip",
1469                "/",
1470                "POST",
1471                StatusCode::ACCEPTED,
1472                None,
1473                true,
1474                EventStatus::Delivered,
1475                true,
1476                None,
1477                None,
1478            )
1479            .await;
1480
1481            spawn_collect_n(
1482                async move {
1483                    assert_eq!(
1484                        StatusCode::ACCEPTED,
1485                        send(addr, "{\"key1\":\"value1\"}").await
1486                    );
1487                },
1488                rx,
1489                1,
1490            )
1491            .await;
1492        })
1493        .await;
1494    }
1495
1496    #[tokio::test]
1497    async fn http_delivery_failure() {
1498        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1499            let (rx, addr) = source(
1500                vec![],
1501                vec![],
1502                "http_path",
1503                "remote_ip",
1504                "/",
1505                "POST",
1506                StatusCode::OK,
1507                None,
1508                true,
1509                EventStatus::Rejected,
1510                true,
1511                None,
1512                None,
1513            )
1514            .await;
1515
1516            spawn_collect_n(
1517                async move {
1518                    assert_eq!(400, send(addr, "test body\n").await);
1519                },
1520                rx,
1521                1,
1522            )
1523            .await;
1524        })
1525        .await;
1526    }
1527
1528    #[tokio::test]
1529    async fn ignores_disabled_acknowledgements() {
1530        let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1531            let (rx, addr) = source(
1532                vec![],
1533                vec![],
1534                "http_path",
1535                "remote_ip",
1536                "/",
1537                "POST",
1538                StatusCode::OK,
1539                None,
1540                true,
1541                EventStatus::Rejected,
1542                false,
1543                None,
1544                None,
1545            )
1546            .await;
1547
1548            spawn_collect_n(
1549                async move {
1550                    assert_eq!(200, send(addr, "test body\n").await);
1551                },
1552                rx,
1553                1,
1554            )
1555            .await
1556        })
1557        .await;
1558
1559        assert_eq!(events.len(), 1);
1560    }
1561
1562    #[tokio::test]
1563    async fn http_get_method() {
1564        components::init_test();
1565        let (_rx, addr) = source(
1566            vec![],
1567            vec![],
1568            "http_path",
1569            "remote_ip",
1570            "/",
1571            "GET",
1572            StatusCode::OK,
1573            None,
1574            true,
1575            EventStatus::Delivered,
1576            true,
1577            None,
1578            None,
1579        )
1580        .await;
1581
1582        assert_eq!(200, send_request(addr, "GET", "", "/").await);
1583    }
1584
1585    #[tokio::test]
1586    async fn returns_401_when_required_auth_is_missing() {
1587        components::init_test();
1588        let (_rx, addr) = source(
1589            vec![],
1590            vec![],
1591            "http_path",
1592            "remote_ip",
1593            "/",
1594            "GET",
1595            StatusCode::OK,
1596            Some(HttpServerAuthConfig::Basic {
1597                username: "test".to_string(),
1598                password: "test".to_string().into(),
1599            }),
1600            true,
1601            EventStatus::Delivered,
1602            true,
1603            None,
1604            None,
1605        )
1606        .await;
1607
1608        assert_eq!(401, send_request(addr, "GET", "", "/").await);
1609    }
1610
1611    #[tokio::test]
1612    async fn returns_401_when_required_auth_is_wrong() {
1613        components::init_test();
1614        let (_rx, addr) = source(
1615            vec![],
1616            vec![],
1617            "http_path",
1618            "remote_ip",
1619            "/",
1620            "POST",
1621            StatusCode::OK,
1622            Some(HttpServerAuthConfig::Basic {
1623                username: "test".to_string(),
1624                password: "test".to_string().into(),
1625            }),
1626            true,
1627            EventStatus::Delivered,
1628            true,
1629            None,
1630            None,
1631        )
1632        .await;
1633
1634        let mut headers = HeaderMap::new();
1635        headers.insert(
1636            AUTHORIZATION,
1637            Authorization::basic("wrong", "test").0.encode(),
1638        );
1639        assert_eq!(401, send_with_headers(addr, "", headers).await);
1640    }
1641
1642    #[tokio::test]
1643    async fn http_get_with_correct_auth() {
1644        components::init_test();
1645        let (_rx, addr) = source(
1646            vec![],
1647            vec![],
1648            "http_path",
1649            "remote_ip",
1650            "/",
1651            "POST",
1652            StatusCode::OK,
1653            Some(HttpServerAuthConfig::Basic {
1654                username: "test".to_string(),
1655                password: "test".to_string().into(),
1656            }),
1657            true,
1658            EventStatus::Delivered,
1659            true,
1660            None,
1661            None,
1662        )
1663        .await;
1664
1665        let mut headers = HeaderMap::new();
1666        headers.insert(
1667            AUTHORIZATION,
1668            Authorization::basic("test", "test").0.encode(),
1669        );
1670        assert_eq!(200, send_with_headers(addr, "", headers).await);
1671    }
1672
1673    #[test]
1674    fn output_schema_definition_vector_namespace() {
1675        let config = SimpleHttpConfig {
1676            log_namespace: Some(true),
1677            ..Default::default()
1678        };
1679
1680        let definitions = config
1681            .outputs(LogNamespace::Vector)
1682            .remove(0)
1683            .schema_definition(true);
1684
1685        let expected_definition =
1686            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1687                .with_meaning(OwnedTargetPath::event_root(), "message")
1688                .with_metadata_field(
1689                    &owned_value_path!("vector", "source_type"),
1690                    Kind::bytes(),
1691                    None,
1692                )
1693                .with_metadata_field(
1694                    &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1695                    Kind::bytes(),
1696                    None,
1697                )
1698                .with_metadata_field(
1699                    &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1700                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1701                    None,
1702                )
1703                .with_metadata_field(
1704                    &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1705                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1706                    None,
1707                )
1708                .with_metadata_field(
1709                    &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1710                    Kind::bytes().or_undefined(),
1711                    None,
1712                )
1713                .with_metadata_field(
1714                    &owned_value_path!("vector", "ingest_timestamp"),
1715                    Kind::timestamp(),
1716                    None,
1717                );
1718
1719        assert_eq!(definitions, Some(expected_definition))
1720    }
1721
1722    #[test]
1723    fn output_schema_definition_legacy_namespace() {
1724        let config = SimpleHttpConfig::default();
1725
1726        let definitions = config
1727            .outputs(LogNamespace::Legacy)
1728            .remove(0)
1729            .schema_definition(true);
1730
1731        let expected_definition = Definition::new_with_default_metadata(
1732            Kind::object(Collection::empty()),
1733            [LogNamespace::Legacy],
1734        )
1735        .with_event_field(
1736            &owned_value_path!("message"),
1737            Kind::bytes(),
1738            Some("message"),
1739        )
1740        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1741        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1742        .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1743        .with_event_field(
1744            &owned_value_path!("host"),
1745            Kind::bytes().or_undefined(),
1746            None,
1747        )
1748        .unknown_fields(Kind::bytes());
1749
1750        assert_eq!(definitions, Some(expected_definition))
1751    }
1752
1753    #[test]
1754    fn validate_remove_duplicates() {
1755        let mut list = vec![
1756            "a".to_owned(),
1757            "b".to_owned(),
1758            "c".to_owned(),
1759            "d".to_owned(),
1760        ];
1761
1762        // no duplicates should be identical
1763        {
1764            let list_dedup = remove_duplicates(list.clone(), "foo");
1765
1766            assert_eq!(list, list_dedup);
1767        }
1768
1769        list.push("b".to_owned());
1770
1771        // remove duplicate "b"
1772        {
1773            let list_dedup = remove_duplicates(list.clone(), "foo");
1774            assert_eq!(
1775                vec![
1776                    "a".to_owned(),
1777                    "b".to_owned(),
1778                    "c".to_owned(),
1779                    "d".to_owned()
1780                ],
1781                list_dedup
1782            );
1783        }
1784    }
1785
1786    #[test]
1787    fn inject_auth_enrichment_does_not_clobber_vector_namespace_builtin_fields() {
1788        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1789        use vector_lib::codecs::BytesDeserializerConfig;
1790        use vrl::value::KeyString;
1791
1792        let decoder = DecodingConfig::new(
1793            BytesDecoderConfig::new().into(),
1794            BytesDeserializerConfig::new().into(),
1795            LogNamespace::Vector,
1796        )
1797        .build()
1798        .unwrap()
1799        .with_log_namespace(LogNamespace::Vector);
1800
1801        let source = super::SimpleHttpSource {
1802            headers: vec![],
1803            query_parameters: vec![],
1804            path_key: OptionalValuePath::none(),
1805            host_key: OptionalValuePath::none(),
1806            decoder,
1807            log_namespace: LogNamespace::Vector,
1808        };
1809
1810        let mut log = LogEvent::default();
1811        // Pre-populate %http_server.path as enrich_events would.
1812        log.insert(
1813            (
1814                PathPrefix::Metadata,
1815                path!(SimpleHttpConfig::NAME).concat(path!("path")),
1816            ),
1817            "/real/path",
1818        );
1819
1820        let mut events = vec![Event::Log(log)];
1821        let mut enrichment = ObjectMap::new();
1822        // Attempt to clobber the built-in `path` field and inject a new field.
1823        enrichment.insert(KeyString::from("path"), Value::from("/clobbered"));
1824        enrichment.insert(KeyString::from("tenant_id"), Value::from("t-123"));
1825
1826        source.inject_auth_enrichment(&mut events, enrichment);
1827
1828        let Event::Log(log) = &events[0] else {
1829            panic!("expected log event");
1830        };
1831        assert_eq!(
1832            log.get((
1833                PathPrefix::Metadata,
1834                path!(SimpleHttpConfig::NAME).concat(path!("path")),
1835            )),
1836            Some(&Value::from("/real/path")),
1837            "auth enrichment must not overwrite built-in source metadata"
1838        );
1839        assert_eq!(
1840            log.get((
1841                PathPrefix::Metadata,
1842                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1843            )),
1844            Some(&Value::from("t-123")),
1845            "new auth enrichment field must be injected"
1846        );
1847    }
1848
1849    #[test]
1850    fn inject_auth_enrichment_does_not_overwrite_existing_metadata_in_vector_namespace() {
1851        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1852        use vector_lib::codecs::BytesDeserializerConfig;
1853        use vrl::value::KeyString;
1854
1855        let decoder = DecodingConfig::new(
1856            BytesDecoderConfig::new().into(),
1857            BytesDeserializerConfig::new().into(),
1858            LogNamespace::Vector,
1859        )
1860        .build()
1861        .unwrap()
1862        .with_log_namespace(LogNamespace::Vector);
1863
1864        let source = super::SimpleHttpSource {
1865            headers: vec![],
1866            query_parameters: vec![],
1867            path_key: OptionalValuePath::none(),
1868            host_key: OptionalValuePath::none(),
1869            decoder,
1870            log_namespace: LogNamespace::Vector,
1871        };
1872
1873        let mut log = LogEvent::default();
1874        // Pre-populate a key (e.g. already written by enrich_events or the decoded event).
1875        log.insert(
1876            (
1877                PathPrefix::Metadata,
1878                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1879            ),
1880            "existing",
1881        );
1882
1883        let mut events = vec![Event::Log(log)];
1884        let mut enrichment = ObjectMap::new();
1885        enrichment.insert(KeyString::from("tenant_id"), Value::from("auth-value"));
1886
1887        source.inject_auth_enrichment(&mut events, enrichment);
1888
1889        let Event::Log(log) = &events[0] else {
1890            panic!("expected log event");
1891        };
1892        assert_eq!(
1893            log.get((
1894                PathPrefix::Metadata,
1895                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1896            )),
1897            Some(&Value::from("existing")),
1898            "auth enrichment must not overwrite already-present metadata keys"
1899        );
1900    }
1901
1902    #[test]
1903    fn inject_auth_enrichment_applies_to_non_log_events_in_vector_namespace() {
1904        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1905        use vector_lib::{
1906            codecs::BytesDeserializerConfig,
1907            event::{Metric, MetricKind, MetricValue},
1908        };
1909        use vrl::value::KeyString;
1910
1911        let decoder = DecodingConfig::new(
1912            BytesDecoderConfig::new().into(),
1913            BytesDeserializerConfig::new().into(),
1914            LogNamespace::Vector,
1915        )
1916        .build()
1917        .unwrap()
1918        .with_log_namespace(LogNamespace::Vector);
1919
1920        let source = super::SimpleHttpSource {
1921            headers: vec![],
1922            query_parameters: vec![],
1923            path_key: OptionalValuePath::none(),
1924            host_key: OptionalValuePath::none(),
1925            decoder,
1926            log_namespace: LogNamespace::Vector,
1927        };
1928
1929        let metric = Metric::new(
1930            "requests",
1931            MetricKind::Incremental,
1932            MetricValue::Counter { value: 1.0 },
1933        );
1934        let mut events = vec![Event::Metric(metric)];
1935
1936        let mut enrichment = ObjectMap::new();
1937        enrichment.insert(KeyString::from("tenant_id"), Value::from("t-456"));
1938
1939        source.inject_auth_enrichment(&mut events, enrichment);
1940
1941        let Event::Metric(metric) = &events[0] else {
1942            panic!("expected metric event");
1943        };
1944        assert_eq!(
1945            metric
1946                .metadata()
1947                .value()
1948                .get(path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),),
1949            Some(&Value::from("t-456")),
1950            "auth enrichment must be written to non-log event metadata"
1951        );
1952    }
1953
1954    impl ValidatableComponent for SimpleHttpConfig {
1955        fn validation_configuration() -> ValidationConfiguration {
1956            let config = Self {
1957                decoding: Some(DeserializerConfig::Json(Default::default())),
1958                ..Default::default()
1959            };
1960
1961            let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1962
1963            let listen_addr_http = format!("http://{}/", config.address);
1964            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1965
1966            let external_resource = ExternalResource::new(
1967                ResourceDirection::Push,
1968                HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1969                config
1970                    .get_decoding_config()
1971                    .expect("should not fail to get decoding config"),
1972            );
1973
1974            ValidationConfiguration::from_source(
1975                Self::NAME,
1976                log_namespace,
1977                vec![ComponentTestCaseConfig::from_source(
1978                    config,
1979                    None,
1980                    Some(external_resource),
1981                )],
1982            )
1983        }
1984    }
1985
1986    register_validatable_component!(SimpleHttpConfig);
1987}