1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9 codecs::{
10 BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11 NewlineDelimitedDecoderConfig,
12 decoding::{DeserializerConfig, FramingConfig},
13 },
14 config::{DataType, LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 lookup::{PathPrefix, lookup_v2::OptionalValuePath, owned_value_path, path},
17 schema::Definition,
18};
19use vrl::{
20 path::ValuePath as _,
21 value::{Kind, ObjectMap, kind::Collection},
22};
23use warp::http::HeaderMap;
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::{
29 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30 SourceOutput,
31 },
32 event::Event,
33 http::KeepaliveConfig,
34 serde::{bool_or_struct, default_decoding},
35 sources::util::{
36 Encoding, HttpSource,
37 http::{HttpMethod, add_headers, add_query_parameters},
38 },
39 tls::TlsEnableableConfig,
40};
41
42#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
44#[configurable(metadata(deprecated))]
45#[derive(Clone, Debug)]
46pub struct HttpConfig(SimpleHttpConfig);
47
48impl GenerateConfig for HttpConfig {
49 fn generate_config() -> toml::Value {
50 <SimpleHttpConfig as GenerateConfig>::generate_config()
51 }
52}
53
54#[async_trait::async_trait]
55#[typetag::serde(name = "http")]
56impl SourceConfig for HttpConfig {
57 async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
58 self.0.build(cx).await
59 }
60
61 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
62 self.0.outputs(global_log_namespace)
63 }
64
65 fn resources(&self) -> Vec<Resource> {
66 self.0.resources()
67 }
68
69 fn can_acknowledge(&self) -> bool {
70 self.0.can_acknowledge()
71 }
72}
73
74#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
76#[derive(Clone, Debug)]
77pub struct SimpleHttpConfig {
78 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82 #[configurable(metadata(docs::examples = "localhost:80"))]
83 address: SocketAddr,
84
85 #[configurable(deprecated)]
89 #[serde(default)]
90 encoding: Option<Encoding>,
91
92 #[serde(default)]
100 #[configurable(metadata(docs::examples = "User-Agent"))]
101 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
102 #[configurable(metadata(docs::examples = "X-*"))]
103 #[configurable(metadata(docs::examples = "*"))]
104 headers: Vec<String>,
105
106 #[serde(default)]
114 #[configurable(metadata(docs::examples = "application"))]
115 #[configurable(metadata(docs::examples = "source"))]
116 #[configurable(metadata(docs::examples = "param*"))]
117 #[configurable(metadata(docs::examples = "*"))]
118 query_parameters: Vec<String>,
119
120 #[configurable(derived)]
129 auth: Option<HttpServerAuthConfig>,
130
131 #[serde(default = "crate::serde::default_true")]
139 strict_path: bool,
140
141 #[serde(default = "default_path")]
143 #[configurable(metadata(docs::examples = "/event/path"))]
144 #[configurable(metadata(docs::examples = "/logs"))]
145 path: String,
146
147 #[serde(default = "default_path_key")]
149 #[configurable(metadata(docs::examples = "vector_http_path"))]
150 path_key: OptionalValuePath,
151
152 #[serde(default = "default_host_key")]
154 #[configurable(metadata(docs::examples = "hostname"))]
155 host_key: OptionalValuePath,
156
157 #[serde(default = "default_http_method")]
159 method: HttpMethod,
160
161 #[configurable(metadata(docs::examples = 202))]
163 #[configurable(metadata(docs::numeric_type = "uint"))]
164 #[serde(with = "http_serde::status_code")]
165 #[serde(default = "default_http_response_code")]
166 response_code: StatusCode,
167
168 #[configurable(derived)]
169 tls: Option<TlsEnableableConfig>,
170
171 #[configurable(derived)]
172 framing: Option<FramingConfig>,
173
174 #[configurable(derived)]
175 decoding: Option<DeserializerConfig>,
176
177 #[configurable(derived)]
178 #[serde(default, deserialize_with = "bool_or_struct")]
179 acknowledgements: SourceAcknowledgementsConfig,
180
181 #[configurable(metadata(docs::hidden))]
183 #[serde(default)]
184 log_namespace: Option<bool>,
185
186 #[configurable(derived)]
187 #[serde(default)]
188 keepalive: KeepaliveConfig,
189}
190
191impl SimpleHttpConfig {
192 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
194 let mut schema_definition = self
195 .decoding
196 .as_ref()
197 .unwrap_or(&default_decoding())
198 .schema_definition(log_namespace)
199 .with_source_metadata(
200 SimpleHttpConfig::NAME,
201 self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
202 &owned_value_path!("path"),
203 Kind::bytes(),
204 None,
205 )
206 .with_source_metadata(
208 SimpleHttpConfig::NAME,
209 None,
210 &owned_value_path!("headers"),
211 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
212 None,
213 )
214 .with_source_metadata(
216 SimpleHttpConfig::NAME,
217 None,
218 &owned_value_path!("query_parameters"),
219 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
220 None,
221 )
222 .with_source_metadata(
223 SimpleHttpConfig::NAME,
224 self.host_key.path.clone().map(LegacyKey::Overwrite),
225 &owned_value_path!("host"),
226 Kind::bytes().or_undefined(),
227 None,
228 )
229 .with_standard_vector_source_metadata();
230
231 if log_namespace == LogNamespace::Legacy {
233 let unknown_kind = if matches!(self.auth, Some(HttpServerAuthConfig::Custom { .. })) {
236 Kind::any()
237 } else {
238 Kind::bytes()
239 };
240 schema_definition = schema_definition.unknown_fields(unknown_kind);
241 }
242
243 schema_definition
244 }
245
246 fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
247 if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
248 return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
249 }
250
251 let (framing, decoding) = if let Some(encoding) = self.encoding {
252 match encoding {
253 Encoding::Text => (
254 NewlineDelimitedDecoderConfig::new().into(),
255 BytesDeserializerConfig::new().into(),
256 ),
257 Encoding::Json => (
258 BytesDecoderConfig::new().into(),
259 JsonDeserializerConfig::default().into(),
260 ),
261 Encoding::Ndjson => (
262 NewlineDelimitedDecoderConfig::new().into(),
263 JsonDeserializerConfig::default().into(),
264 ),
265 Encoding::Binary => (
266 BytesDecoderConfig::new().into(),
267 BytesDeserializerConfig::new().into(),
268 ),
269 }
270 } else {
271 let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
272 let framing = self
273 .framing
274 .clone()
275 .unwrap_or_else(|| decoding.default_stream_framing());
276 (framing, decoding)
277 };
278
279 Ok(DecodingConfig::new(
280 framing,
281 decoding,
282 self.log_namespace.unwrap_or(false).into(),
283 ))
284 }
285}
286
287impl Default for SimpleHttpConfig {
288 fn default() -> Self {
289 Self {
290 address: "0.0.0.0:8080".parse().unwrap(),
291 encoding: None,
292 headers: Vec::new(),
293 query_parameters: Vec::new(),
294 tls: None,
295 auth: None,
296 path: default_path(),
297 path_key: default_path_key(),
298 host_key: default_host_key(),
299 method: default_http_method(),
300 response_code: default_http_response_code(),
301 strict_path: true,
302 framing: None,
303 decoding: Some(default_decoding()),
304 acknowledgements: SourceAcknowledgementsConfig::default(),
305 log_namespace: None,
306 keepalive: KeepaliveConfig::default(),
307 }
308 }
309}
310
311impl_generate_config_from_default!(SimpleHttpConfig);
312
313const fn default_http_method() -> HttpMethod {
314 HttpMethod::Post
315}
316
317fn default_path() -> String {
318 "/".to_string()
319}
320
321fn default_path_key() -> OptionalValuePath {
322 OptionalValuePath::from(owned_value_path!("path"))
323}
324
325fn default_host_key() -> OptionalValuePath {
326 OptionalValuePath::none()
327}
328
329const fn default_http_response_code() -> StatusCode {
330 StatusCode::OK
331}
332
333pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
335 list.sort();
336
337 let mut dedup = false;
338 for (idx, name) in list.iter().enumerate() {
339 if idx < list.len() - 1 && list[idx] == list[idx + 1] {
340 warn!(
341 "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
342 list_name, name
343 );
344 dedup = true;
345 }
346 }
347
348 if dedup {
349 list.dedup();
350 }
351 list
352}
353
354fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
356 addr.ip().to_string()
357}
358
359#[derive(Clone)]
360pub enum HttpConfigParamKind {
361 Glob(glob::Pattern),
362 Exact(String),
363}
364
365pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
366 list.iter()
367 .map(|s| match s.contains('*') {
368 true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
369 false => Ok(HttpConfigParamKind::Exact(s.to_string())),
370 })
371 .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
372}
373
374#[async_trait::async_trait]
375#[typetag::serde(name = "http_server")]
376impl SourceConfig for SimpleHttpConfig {
377 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
378 let log_namespace = cx.log_namespace(self.log_namespace);
379 let decoder = self
380 .get_decoding_config()?
381 .build()?
382 .with_log_namespace(log_namespace);
383
384 let source = SimpleHttpSource {
385 headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
386 query_parameters: build_param_matcher(&remove_duplicates(
387 self.query_parameters.clone(),
388 "query_parameters",
389 ))?,
390 path_key: self.path_key.clone(),
391 host_key: self.host_key.clone(),
392 decoder,
393 log_namespace,
394 };
395 source.run(
396 self.address,
397 self.path.as_str(),
398 self.method,
399 self.response_code,
400 self.strict_path,
401 self.tls.as_ref(),
402 self.auth.as_ref(),
403 cx,
404 self.acknowledgements,
405 self.keepalive.clone(),
406 )
407 }
408
409 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
410 let log_namespace = global_log_namespace.merge(self.log_namespace);
413
414 let schema_definition = self.schema_definition(log_namespace);
415
416 vec![SourceOutput::new_maybe_logs(
417 self.decoding
418 .as_ref()
419 .map(|d| d.output_type())
420 .unwrap_or(DataType::Log),
421 schema_definition,
422 )]
423 }
424
425 fn resources(&self) -> Vec<Resource> {
426 vec![Resource::tcp(self.address)]
427 }
428
429 fn can_acknowledge(&self) -> bool {
430 true
431 }
432}
433
434#[derive(Clone)]
435struct SimpleHttpSource {
436 headers: Vec<HttpConfigParamKind>,
437 query_parameters: Vec<HttpConfigParamKind>,
438 path_key: OptionalValuePath,
439 host_key: OptionalValuePath,
440 decoder: Decoder,
441 log_namespace: LogNamespace,
442}
443
444impl HttpSource for SimpleHttpSource {
445 fn enrich_events(
448 &self,
449 events: &mut [Event],
450 request_path: &str,
451 headers: &HeaderMap,
452 query_parameters: &HashMap<String, String>,
453 source_ip: Option<&SocketAddr>,
454 ) {
455 let now = Utc::now();
456 for event in events.iter_mut() {
457 match event {
458 Event::Log(log) => {
459 self.log_namespace.insert_source_metadata(
461 SimpleHttpConfig::NAME,
462 log,
463 self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
464 path!("path"),
465 request_path.to_owned(),
466 );
467
468 self.log_namespace.insert_standard_vector_source_metadata(
469 log,
470 SimpleHttpConfig::NAME,
471 now,
472 );
473
474 if let Some(addr) = source_ip {
475 self.log_namespace.insert_source_metadata(
476 SimpleHttpConfig::NAME,
477 log,
478 self.host_key.path.as_ref().map(LegacyKey::Overwrite),
479 path!("host"),
480 socket_addr_to_ip_string(addr),
481 );
482 }
483 }
484 _ => {
485 continue;
486 }
487 }
488 }
489
490 add_headers(
491 events,
492 &self.headers,
493 headers,
494 self.log_namespace,
495 SimpleHttpConfig::NAME,
496 );
497
498 add_query_parameters(
499 events,
500 &self.query_parameters,
501 query_parameters,
502 self.log_namespace,
503 SimpleHttpConfig::NAME,
504 );
505 }
506
507 fn build_events(
508 &self,
509 body: Bytes,
510 _header_map: &HeaderMap,
511 _query_parameters: &HashMap<String, String>,
512 _request_path: &str,
513 ) -> Result<Vec<Event>, ErrorMessage> {
514 let mut decoder = self.decoder.clone();
515 let mut events = Vec::new();
516 let mut bytes = BytesMut::new();
517 bytes.extend_from_slice(&body);
518
519 loop {
520 match decoder.decode_eof(&mut bytes) {
521 Ok(Some((next, _))) => {
522 events.extend(next);
523 }
524 Ok(None) => break,
525 Err(error) => {
526 return Err(ErrorMessage::new(
529 StatusCode::BAD_REQUEST,
530 format!("Failed decoding body: {error}"),
531 ));
532 }
533 }
534 }
535
536 Ok(events)
537 }
538
539 fn enable_source_ip(&self) -> bool {
540 self.host_key.path.is_some()
541 }
542
543 fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
551 for event in events.iter_mut() {
552 match self.log_namespace {
553 LogNamespace::Vector => {
554 let meta = event.metadata_mut().value_mut();
557 for (key, value) in &enrichment {
558 let key_str = key.as_str();
559 let name_part = path!(SimpleHttpConfig::NAME);
560 let key_part = path!(key_str);
561 let full_path = name_part.concat(key_part);
562 if meta.get(full_path.clone()).is_none() {
563 meta.insert(full_path, value.clone());
564 }
565 }
566 }
567 LogNamespace::Legacy => {
568 if let Event::Log(log) = event {
570 for (key, value) in &enrichment {
571 log.try_insert((PathPrefix::Event, path!(key.as_str())), value.clone());
572 }
573 }
574 }
575 }
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use std::{io::Write, net::SocketAddr, str::FromStr};
583
584 use flate2::{
585 Compression,
586 write::{GzEncoder, ZlibEncoder},
587 };
588 use futures::Stream;
589 use headers::{Authorization, authorization::Credentials};
590 use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
591 use similar_asserts::assert_eq;
592 use vector_lib::{
593 codecs::{
594 BytesDecoderConfig, JsonDeserializerConfig,
595 decoding::{DeserializerConfig, FramingConfig},
596 },
597 config::LogNamespace,
598 event::LogEvent,
599 lookup::{
600 OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath,
601 owned_value_path, path,
602 },
603 schema::Definition,
604 };
605 use vrl::{
606 path::ValuePath as _,
607 value::{Kind, ObjectMap, kind::Collection},
608 };
609
610 use super::{SimpleHttpConfig, remove_duplicates};
611 use crate::{
612 SourceSender,
613 common::http::server_auth::HttpServerAuthConfig,
614 components::validation::prelude::*,
615 config::{SourceConfig, SourceContext, log_schema},
616 event::{Event, EventStatus, Value},
617 sources::http_server::HttpMethod,
618 test_util::{
619 addr::next_addr,
620 components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
621 spawn_collect_n, wait_for_tcp,
622 },
623 };
624
625 #[test]
626 fn generate_config() {
627 crate::test_util::test_generate_config::<SimpleHttpConfig>();
628 }
629
630 #[allow(clippy::too_many_arguments)]
631 async fn source<'a>(
632 headers: Vec<String>,
633 query_parameters: Vec<String>,
634 path_key: &'a str,
635 host_key: &'a str,
636 path: &'a str,
637 method: &'a str,
638 response_code: StatusCode,
639 auth: Option<HttpServerAuthConfig>,
640 strict_path: bool,
641 status: EventStatus,
642 acknowledgements: bool,
643 framing: Option<FramingConfig>,
644 decoding: Option<DeserializerConfig>,
645 ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
646 let (sender, recv) = SourceSender::new_test_finalize(status);
647 let (_guard, address) = next_addr();
648 let path = path.to_owned();
649 let host_key = OptionalValuePath::from(owned_value_path!(host_key));
650 let path_key = OptionalValuePath::from(owned_value_path!(path_key));
651 let context = SourceContext::new_test(sender, None);
652 let method = match Method::from_str(method).unwrap() {
653 Method::GET => HttpMethod::Get,
654 Method::POST => HttpMethod::Post,
655 _ => HttpMethod::Post,
656 };
657
658 tokio::spawn(async move {
659 SimpleHttpConfig {
660 address,
661 headers,
662 encoding: None,
663 query_parameters,
664 response_code,
665 tls: None,
666 auth,
667 strict_path,
668 path_key,
669 host_key,
670 path,
671 method,
672 framing,
673 decoding,
674 acknowledgements: acknowledgements.into(),
675 log_namespace: None,
676 keepalive: Default::default(),
677 }
678 .build(context)
679 .await
680 .unwrap()
681 .await
682 .unwrap();
683 });
684 wait_for_tcp(address).await;
685 (recv, address)
686 }
687
688 async fn send(address: SocketAddr, body: &str) -> u16 {
689 reqwest::Client::new()
690 .post(format!("http://{address}/"))
691 .body(body.to_owned())
692 .send()
693 .await
694 .unwrap()
695 .status()
696 .as_u16()
697 }
698
699 async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
700 reqwest::Client::new()
701 .post(format!("http://{address}/"))
702 .headers(headers)
703 .body(body.to_owned())
704 .send()
705 .await
706 .unwrap()
707 .status()
708 .as_u16()
709 }
710
711 async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
712 reqwest::Client::new()
713 .post(format!("http://{address}?{query}"))
714 .body(body.to_owned())
715 .send()
716 .await
717 .unwrap()
718 .status()
719 .as_u16()
720 }
721
722 async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
723 reqwest::Client::new()
724 .post(format!("http://{address}{path}"))
725 .body(body.to_owned())
726 .send()
727 .await
728 .unwrap()
729 .status()
730 .as_u16()
731 }
732
733 async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
734 let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
735 reqwest::Client::new()
736 .request(method, format!("http://{address}{path}"))
737 .body(body.to_owned())
738 .send()
739 .await
740 .unwrap()
741 .status()
742 .as_u16()
743 }
744
745 async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
746 reqwest::Client::new()
747 .post(format!("http://{address}/"))
748 .headers(headers)
749 .body(body)
750 .send()
751 .await
752 .unwrap()
753 .status()
754 .as_u16()
755 }
756
757 async fn spawn_ok_collect_n(
758 send: impl std::future::Future<Output = u16> + Send + 'static,
759 rx: impl Stream<Item = Event> + Unpin,
760 n: usize,
761 ) -> Vec<Event> {
762 spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
763 }
764
765 #[tokio::test]
766 async fn http_multiline_text() {
767 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
768 let body = "test body\ntest body 2";
769
770 let (rx, addr) = source(
771 vec![],
772 vec![],
773 "http_path",
774 "remote_ip",
775 "/",
776 "POST",
777 StatusCode::OK,
778 None,
779 true,
780 EventStatus::Delivered,
781 true,
782 None,
783 None,
784 )
785 .await;
786
787 spawn_ok_collect_n(send(addr, body), rx, 2).await
788 })
789 .await;
790
791 {
792 let event = events.remove(0);
793 let log = event.as_log();
794 assert_eq!(*log.get_message().unwrap(), "test body".into());
795 assert!(log.get_timestamp().is_some());
796 assert_eq!(
797 *log.get_source_type().unwrap(),
798 SimpleHttpConfig::NAME.into()
799 );
800 assert_eq!(log["http_path"], "/".into());
801 assert_event_metadata(log).await;
802 }
803 {
804 let event = events.remove(0);
805 let log = event.as_log();
806 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
807 assert_event_metadata(log).await;
808 }
809 }
810
811 #[tokio::test]
812 async fn http_multiline_text2() {
813 let body = "test body\ntest body 2\n";
815
816 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
817 let (rx, addr) = source(
818 vec![],
819 vec![],
820 "http_path",
821 "remote_ip",
822 "/",
823 "POST",
824 StatusCode::OK,
825 None,
826 true,
827 EventStatus::Delivered,
828 true,
829 None,
830 None,
831 )
832 .await;
833
834 spawn_ok_collect_n(send(addr, body), rx, 2).await
835 })
836 .await;
837
838 {
839 let event = events.remove(0);
840 let log = event.as_log();
841 assert_eq!(*log.get_message().unwrap(), "test body".into());
842 assert_event_metadata(log).await;
843 }
844 {
845 let event = events.remove(0);
846 let log = event.as_log();
847 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
848 assert_event_metadata(log).await;
849 }
850 }
851
852 #[tokio::test]
853 async fn http_bytes_codec_preserves_newlines() {
854 let body = "foo\nbar";
855
856 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
857 let (rx, addr) = source(
858 vec![],
859 vec![],
860 "http_path",
861 "remote_ip",
862 "/",
863 "POST",
864 StatusCode::OK,
865 None,
866 true,
867 EventStatus::Delivered,
868 true,
869 Some(BytesDecoderConfig::new().into()),
870 None,
871 )
872 .await;
873
874 spawn_ok_collect_n(send(addr, body), rx, 1).await
875 })
876 .await;
877
878 assert_eq!(events.len(), 1);
879
880 {
881 let event = events.remove(0);
882 let log = event.as_log();
883 assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
884 assert_event_metadata(log).await;
885 }
886 }
887
888 #[tokio::test]
889 async fn http_json_parsing() {
890 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
891 let (rx, addr) = source(
892 vec![],
893 vec![],
894 "http_path",
895 "remote_ip",
896 "/",
897 "POST",
898 StatusCode::OK,
899 None,
900 true,
901 EventStatus::Delivered,
902 true,
903 None,
904 Some(JsonDeserializerConfig::default().into()),
905 )
906 .await;
907
908 spawn_collect_n(
909 async move {
910 assert_eq!(400, send(addr, "{").await); assert_eq!(400, send(addr, r#"{"key"}"#).await); assert_eq!(200, send(addr, "{}").await); assert_eq!(200, send(addr, "[{},{},{}]").await);
915 },
916 rx,
917 2,
918 )
919 .await
920 })
921 .await;
922
923 assert!(events.remove(1).as_log().get_timestamp().is_some());
924 assert!(events.remove(0).as_log().get_timestamp().is_some());
925 }
926
927 #[tokio::test]
928 async fn http_json_values() {
929 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
930 let (rx, addr) = source(
931 vec![],
932 vec![],
933 "http_path",
934 "remote_ip",
935 "/",
936 "POST",
937 StatusCode::OK,
938 None,
939 true,
940 EventStatus::Delivered,
941 true,
942 None,
943 Some(JsonDeserializerConfig::default().into()),
944 )
945 .await;
946
947 spawn_collect_n(
948 async move {
949 assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
950 assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
951 },
952 rx,
953 2,
954 )
955 .await
956 })
957 .await;
958
959 {
960 let event = events.remove(0);
961 let log = event.as_log();
962 assert_eq!(log["key"], "value".into());
963 assert_event_metadata(log).await;
964 }
965 {
966 let event = events.remove(0);
967 let log = event.as_log();
968 assert_eq!(log["key2"], "value2".into());
969 assert_event_metadata(log).await;
970 }
971 }
972
973 #[tokio::test]
974 async fn http_json_dotted_keys() {
975 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
976 let (rx, addr) = source(
977 vec![],
978 vec![],
979 "http_path",
980 "remote_ip",
981 "/",
982 "POST",
983 StatusCode::OK,
984 None,
985 true,
986 EventStatus::Delivered,
987 true,
988 None,
989 Some(JsonDeserializerConfig::default().into()),
990 )
991 .await;
992
993 spawn_collect_n(
994 async move {
995 assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
996 assert_eq!(
997 200,
998 send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
999 );
1000 },
1001 rx,
1002 2,
1003 )
1004 .await
1005 })
1006 .await;
1007
1008 {
1009 let event = events.remove(0);
1010 let log = event.as_log();
1011 assert_eq!(
1012 log.get(event_path!("dotted.key")).unwrap(),
1013 &Value::from("value")
1014 );
1015 }
1016 {
1017 let event = events.remove(0);
1018 let log = event.as_log();
1019 let mut map = ObjectMap::new();
1020 map.insert("dotted.key2".into(), Value::from("value2"));
1021 assert_eq!(log["nested"], map.into());
1022 }
1023 }
1024
1025 #[tokio::test]
1026 async fn http_ndjson() {
1027 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1028 let (rx, addr) = source(
1029 vec![],
1030 vec![],
1031 "http_path",
1032 "remote_ip",
1033 "/",
1034 "POST",
1035 StatusCode::OK,
1036 None,
1037 true,
1038 EventStatus::Delivered,
1039 true,
1040 None,
1041 Some(JsonDeserializerConfig::default().into()),
1042 )
1043 .await;
1044
1045 spawn_collect_n(
1046 async move {
1047 assert_eq!(
1048 200,
1049 send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
1050 );
1051
1052 assert_eq!(
1053 200,
1054 send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
1055 );
1056 },
1057 rx,
1058 4,
1059 )
1060 .await
1061 })
1062 .await;
1063
1064 {
1065 let event = events.remove(0);
1066 let log = event.as_log();
1067 assert_eq!(log["key1"], "value1".into());
1068 assert_event_metadata(log).await;
1069 }
1070 {
1071 let event = events.remove(0);
1072 let log = event.as_log();
1073 assert_eq!(log["key2"], "value2".into());
1074 assert_event_metadata(log).await;
1075 }
1076 {
1077 let event = events.remove(0);
1078 let log = event.as_log();
1079 assert_eq!(log["key1"], "value1".into());
1080 assert_event_metadata(log).await;
1081 }
1082 {
1083 let event = events.remove(0);
1084 let log = event.as_log();
1085 assert_eq!(log["key2"], "value2".into());
1086 assert_event_metadata(log).await;
1087 }
1088 }
1089
1090 async fn assert_event_metadata(log: &LogEvent) {
1091 assert!(log.get_timestamp().is_some());
1092
1093 let source_type_key_value = log
1094 .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1095 .unwrap()
1096 .as_str()
1097 .unwrap();
1098 assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1099 assert_eq!(log["http_path"], "/".into());
1100 }
1101
1102 #[tokio::test]
1103 async fn http_headers() {
1104 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1105 let mut headers = HeaderMap::new();
1106 headers.insert("User-Agent", "test_client".parse().unwrap());
1107 headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1108 headers.insert("X-Test-Header", "true".parse().unwrap());
1109
1110 let (rx, addr) = source(
1111 vec![
1112 "User-Agent".to_string(),
1113 "Upgrade-Insecure-Requests".to_string(),
1114 "X-*".to_string(),
1115 "AbsentHeader".to_string(),
1116 ],
1117 vec![],
1118 "http_path",
1119 "remote_ip",
1120 "/",
1121 "POST",
1122 StatusCode::OK,
1123 None,
1124 true,
1125 EventStatus::Delivered,
1126 true,
1127 None,
1128 Some(JsonDeserializerConfig::default().into()),
1129 )
1130 .await;
1131
1132 spawn_ok_collect_n(
1133 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1134 rx,
1135 1,
1136 )
1137 .await
1138 })
1139 .await;
1140
1141 {
1142 let event = events.remove(0);
1143 let log = event.as_log();
1144 assert_eq!(log["key1"], "value1".into());
1145 assert_eq!(log["\"User-Agent\""], "test_client".into());
1146 assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1147 assert_eq!(log["\"x-test-header\""], "true".into());
1148 assert_eq!(log["AbsentHeader"], Value::Null);
1149 assert_event_metadata(log).await;
1150 }
1151 }
1152
1153 #[tokio::test]
1154 async fn http_headers_wildcard() {
1155 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1156 let mut headers = HeaderMap::new();
1157 headers.insert("User-Agent", "test_client".parse().unwrap());
1158 headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1159 headers.insert("key1", "value_from_header".parse().unwrap());
1161
1162 let (rx, addr) = source(
1163 vec!["*".to_string()],
1164 vec![],
1165 "http_path",
1166 "remote_ip",
1167 "/",
1168 "POST",
1169 StatusCode::OK,
1170 None,
1171 true,
1172 EventStatus::Delivered,
1173 true,
1174 None,
1175 Some(JsonDeserializerConfig::default().into()),
1176 )
1177 .await;
1178
1179 spawn_ok_collect_n(
1180 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1181 rx,
1182 1,
1183 )
1184 .await
1185 })
1186 .await;
1187
1188 {
1189 let event = events.remove(0);
1190 let log = event.as_log();
1191 assert_eq!(log["key1"], "value1".into());
1192 assert_eq!(log["\"user-agent\""], "test_client".into());
1193 assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1194 assert_event_metadata(log).await;
1195 }
1196 }
1197
1198 #[tokio::test]
1199 async fn http_query() {
1200 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1201 let (rx, addr) = source(
1202 vec![],
1203 vec![
1204 "source".to_string(),
1205 "region".to_string(),
1206 "absent".to_string(),
1207 ],
1208 "http_path",
1209 "remote_ip",
1210 "/",
1211 "POST",
1212 StatusCode::OK,
1213 None,
1214 true,
1215 EventStatus::Delivered,
1216 true,
1217 None,
1218 Some(JsonDeserializerConfig::default().into()),
1219 )
1220 .await;
1221
1222 spawn_ok_collect_n(
1223 send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"),
1224 rx,
1225 1,
1226 )
1227 .await
1228 })
1229 .await;
1230
1231 {
1232 let event = events.remove(0);
1233 let log = event.as_log();
1234 assert_eq!(log["key1"], "value1".into());
1235 assert_eq!(log["source"], "staging".into());
1236 assert_eq!(log["region"], "gb".into());
1237 assert_eq!(log["absent"], Value::Null);
1238 assert_event_metadata(log).await;
1239 }
1240 }
1241
1242 #[tokio::test]
1243 async fn http_query_wildcard() {
1244 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1245 let (rx, addr) = source(
1246 vec![],
1247 vec!["*".to_string()],
1248 "http_path",
1249 "remote_ip",
1250 "/",
1251 "POST",
1252 StatusCode::OK,
1253 None,
1254 true,
1255 EventStatus::Delivered,
1256 true,
1257 None,
1258 Some(JsonDeserializerConfig::default().into()),
1259 )
1260 .await;
1261
1262 spawn_ok_collect_n(
1263 send_with_query(
1264 addr,
1265 "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1266 "source=staging®ion=gb&key1=value_from_query",
1267 ),
1268 rx,
1269 1,
1270 )
1271 .await
1272 })
1273 .await;
1274
1275 {
1276 let event = events.remove(0);
1277 let log = event.as_log();
1278 assert_eq!(log["key1"], "value_from_query".into());
1279 assert_eq!(log["key2"], "value2".into());
1280 assert_eq!(log["source"], "staging".into());
1281 assert_eq!(log["region"], "gb".into());
1282 assert_event_metadata(log).await;
1283 }
1284 }
1285
1286 #[tokio::test]
1287 async fn http_gzip_deflate() {
1288 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1289 let body = "test body";
1290
1291 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1292 encoder.write_all(body.as_bytes()).unwrap();
1293 let body = encoder.finish().unwrap();
1294
1295 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1296 encoder.write_all(body.as_slice()).unwrap();
1297 let body = encoder.finish().unwrap();
1298
1299 let mut headers = HeaderMap::new();
1300 headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1301
1302 let (rx, addr) = source(
1303 vec![],
1304 vec![],
1305 "http_path",
1306 "remote_ip",
1307 "/",
1308 "POST",
1309 StatusCode::OK,
1310 None,
1311 true,
1312 EventStatus::Delivered,
1313 true,
1314 None,
1315 None,
1316 )
1317 .await;
1318
1319 spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1320 })
1321 .await;
1322
1323 {
1324 let event = events.remove(0);
1325 let log = event.as_log();
1326 assert_eq!(*log.get_message().unwrap(), "test body".into());
1327 assert_event_metadata(log).await;
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn http_path() {
1333 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1334 let (rx, addr) = source(
1335 vec![],
1336 vec![],
1337 "vector_http_path",
1338 "vector_remote_ip",
1339 "/event/path",
1340 "POST",
1341 StatusCode::OK,
1342 None,
1343 true,
1344 EventStatus::Delivered,
1345 true,
1346 None,
1347 Some(JsonDeserializerConfig::default().into()),
1348 )
1349 .await;
1350
1351 spawn_ok_collect_n(
1352 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1353 rx,
1354 1,
1355 )
1356 .await
1357 })
1358 .await;
1359
1360 {
1361 let event = events.remove(0);
1362 let log = event.as_log();
1363 assert_eq!(log["key1"], "value1".into());
1364 assert_eq!(log["vector_http_path"], "/event/path".into());
1365 assert!(log.get_timestamp().is_some());
1366 assert_eq!(
1367 *log.get_source_type().unwrap(),
1368 SimpleHttpConfig::NAME.into()
1369 );
1370 }
1371 }
1372
1373 #[tokio::test]
1374 async fn http_path_no_restriction() {
1375 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1376 let (rx, addr) = source(
1377 vec![],
1378 vec![],
1379 "vector_http_path",
1380 "vector_remote_ip",
1381 "/event",
1382 "POST",
1383 StatusCode::OK,
1384 None,
1385 false,
1386 EventStatus::Delivered,
1387 true,
1388 None,
1389 Some(JsonDeserializerConfig::default().into()),
1390 )
1391 .await;
1392
1393 spawn_collect_n(
1394 async move {
1395 assert_eq!(
1396 200,
1397 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1398 );
1399 assert_eq!(
1400 200,
1401 send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1402 );
1403 },
1404 rx,
1405 2,
1406 )
1407 .await
1408 })
1409 .await;
1410
1411 {
1412 let event = events.remove(0);
1413 let log = event.as_log();
1414 assert_eq!(log["key1"], "value1".into());
1415 assert_eq!(log["vector_http_path"], "/event/path1".into());
1416 assert!(log.get_timestamp().is_some());
1417 assert_eq!(
1418 *log.get_source_type().unwrap(),
1419 SimpleHttpConfig::NAME.into()
1420 );
1421 }
1422 {
1423 let event = events.remove(0);
1424 let log = event.as_log();
1425 assert_eq!(log["key2"], "value2".into());
1426 assert_eq!(log["vector_http_path"], "/event/path2".into());
1427 assert!(log.get_timestamp().is_some());
1428 assert_eq!(
1429 *log.get_source_type().unwrap(),
1430 SimpleHttpConfig::NAME.into()
1431 );
1432 }
1433 }
1434
1435 #[tokio::test]
1436 async fn http_wrong_path() {
1437 components::init_test();
1438 let (_rx, addr) = source(
1439 vec![],
1440 vec![],
1441 "vector_http_path",
1442 "vector_remote_ip",
1443 "/",
1444 "POST",
1445 StatusCode::OK,
1446 None,
1447 true,
1448 EventStatus::Delivered,
1449 true,
1450 None,
1451 Some(JsonDeserializerConfig::default().into()),
1452 )
1453 .await;
1454
1455 assert_eq!(
1456 404,
1457 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1458 );
1459 }
1460
1461 #[tokio::test]
1462 async fn http_status_code() {
1463 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1464 let (rx, addr) = source(
1465 vec![],
1466 vec![],
1467 "http_path",
1468 "remote_ip",
1469 "/",
1470 "POST",
1471 StatusCode::ACCEPTED,
1472 None,
1473 true,
1474 EventStatus::Delivered,
1475 true,
1476 None,
1477 None,
1478 )
1479 .await;
1480
1481 spawn_collect_n(
1482 async move {
1483 assert_eq!(
1484 StatusCode::ACCEPTED,
1485 send(addr, "{\"key1\":\"value1\"}").await
1486 );
1487 },
1488 rx,
1489 1,
1490 )
1491 .await;
1492 })
1493 .await;
1494 }
1495
1496 #[tokio::test]
1497 async fn http_delivery_failure() {
1498 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1499 let (rx, addr) = source(
1500 vec![],
1501 vec![],
1502 "http_path",
1503 "remote_ip",
1504 "/",
1505 "POST",
1506 StatusCode::OK,
1507 None,
1508 true,
1509 EventStatus::Rejected,
1510 true,
1511 None,
1512 None,
1513 )
1514 .await;
1515
1516 spawn_collect_n(
1517 async move {
1518 assert_eq!(400, send(addr, "test body\n").await);
1519 },
1520 rx,
1521 1,
1522 )
1523 .await;
1524 })
1525 .await;
1526 }
1527
1528 #[tokio::test]
1529 async fn ignores_disabled_acknowledgements() {
1530 let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1531 let (rx, addr) = source(
1532 vec![],
1533 vec![],
1534 "http_path",
1535 "remote_ip",
1536 "/",
1537 "POST",
1538 StatusCode::OK,
1539 None,
1540 true,
1541 EventStatus::Rejected,
1542 false,
1543 None,
1544 None,
1545 )
1546 .await;
1547
1548 spawn_collect_n(
1549 async move {
1550 assert_eq!(200, send(addr, "test body\n").await);
1551 },
1552 rx,
1553 1,
1554 )
1555 .await
1556 })
1557 .await;
1558
1559 assert_eq!(events.len(), 1);
1560 }
1561
1562 #[tokio::test]
1563 async fn http_get_method() {
1564 components::init_test();
1565 let (_rx, addr) = source(
1566 vec![],
1567 vec![],
1568 "http_path",
1569 "remote_ip",
1570 "/",
1571 "GET",
1572 StatusCode::OK,
1573 None,
1574 true,
1575 EventStatus::Delivered,
1576 true,
1577 None,
1578 None,
1579 )
1580 .await;
1581
1582 assert_eq!(200, send_request(addr, "GET", "", "/").await);
1583 }
1584
1585 #[tokio::test]
1586 async fn returns_401_when_required_auth_is_missing() {
1587 components::init_test();
1588 let (_rx, addr) = source(
1589 vec![],
1590 vec![],
1591 "http_path",
1592 "remote_ip",
1593 "/",
1594 "GET",
1595 StatusCode::OK,
1596 Some(HttpServerAuthConfig::Basic {
1597 username: "test".to_string(),
1598 password: "test".to_string().into(),
1599 }),
1600 true,
1601 EventStatus::Delivered,
1602 true,
1603 None,
1604 None,
1605 )
1606 .await;
1607
1608 assert_eq!(401, send_request(addr, "GET", "", "/").await);
1609 }
1610
1611 #[tokio::test]
1612 async fn returns_401_when_required_auth_is_wrong() {
1613 components::init_test();
1614 let (_rx, addr) = source(
1615 vec![],
1616 vec![],
1617 "http_path",
1618 "remote_ip",
1619 "/",
1620 "POST",
1621 StatusCode::OK,
1622 Some(HttpServerAuthConfig::Basic {
1623 username: "test".to_string(),
1624 password: "test".to_string().into(),
1625 }),
1626 true,
1627 EventStatus::Delivered,
1628 true,
1629 None,
1630 None,
1631 )
1632 .await;
1633
1634 let mut headers = HeaderMap::new();
1635 headers.insert(
1636 AUTHORIZATION,
1637 Authorization::basic("wrong", "test").0.encode(),
1638 );
1639 assert_eq!(401, send_with_headers(addr, "", headers).await);
1640 }
1641
1642 #[tokio::test]
1643 async fn http_get_with_correct_auth() {
1644 components::init_test();
1645 let (_rx, addr) = source(
1646 vec![],
1647 vec![],
1648 "http_path",
1649 "remote_ip",
1650 "/",
1651 "POST",
1652 StatusCode::OK,
1653 Some(HttpServerAuthConfig::Basic {
1654 username: "test".to_string(),
1655 password: "test".to_string().into(),
1656 }),
1657 true,
1658 EventStatus::Delivered,
1659 true,
1660 None,
1661 None,
1662 )
1663 .await;
1664
1665 let mut headers = HeaderMap::new();
1666 headers.insert(
1667 AUTHORIZATION,
1668 Authorization::basic("test", "test").0.encode(),
1669 );
1670 assert_eq!(200, send_with_headers(addr, "", headers).await);
1671 }
1672
1673 #[test]
1674 fn output_schema_definition_vector_namespace() {
1675 let config = SimpleHttpConfig {
1676 log_namespace: Some(true),
1677 ..Default::default()
1678 };
1679
1680 let definitions = config
1681 .outputs(LogNamespace::Vector)
1682 .remove(0)
1683 .schema_definition(true);
1684
1685 let expected_definition =
1686 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1687 .with_meaning(OwnedTargetPath::event_root(), "message")
1688 .with_metadata_field(
1689 &owned_value_path!("vector", "source_type"),
1690 Kind::bytes(),
1691 None,
1692 )
1693 .with_metadata_field(
1694 &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1695 Kind::bytes(),
1696 None,
1697 )
1698 .with_metadata_field(
1699 &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1700 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1701 None,
1702 )
1703 .with_metadata_field(
1704 &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1705 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1706 None,
1707 )
1708 .with_metadata_field(
1709 &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1710 Kind::bytes().or_undefined(),
1711 None,
1712 )
1713 .with_metadata_field(
1714 &owned_value_path!("vector", "ingest_timestamp"),
1715 Kind::timestamp(),
1716 None,
1717 );
1718
1719 assert_eq!(definitions, Some(expected_definition))
1720 }
1721
1722 #[test]
1723 fn output_schema_definition_legacy_namespace() {
1724 let config = SimpleHttpConfig::default();
1725
1726 let definitions = config
1727 .outputs(LogNamespace::Legacy)
1728 .remove(0)
1729 .schema_definition(true);
1730
1731 let expected_definition = Definition::new_with_default_metadata(
1732 Kind::object(Collection::empty()),
1733 [LogNamespace::Legacy],
1734 )
1735 .with_event_field(
1736 &owned_value_path!("message"),
1737 Kind::bytes(),
1738 Some("message"),
1739 )
1740 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1741 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1742 .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1743 .with_event_field(
1744 &owned_value_path!("host"),
1745 Kind::bytes().or_undefined(),
1746 None,
1747 )
1748 .unknown_fields(Kind::bytes());
1749
1750 assert_eq!(definitions, Some(expected_definition))
1751 }
1752
1753 #[test]
1754 fn validate_remove_duplicates() {
1755 let mut list = vec![
1756 "a".to_owned(),
1757 "b".to_owned(),
1758 "c".to_owned(),
1759 "d".to_owned(),
1760 ];
1761
1762 {
1764 let list_dedup = remove_duplicates(list.clone(), "foo");
1765
1766 assert_eq!(list, list_dedup);
1767 }
1768
1769 list.push("b".to_owned());
1770
1771 {
1773 let list_dedup = remove_duplicates(list.clone(), "foo");
1774 assert_eq!(
1775 vec![
1776 "a".to_owned(),
1777 "b".to_owned(),
1778 "c".to_owned(),
1779 "d".to_owned()
1780 ],
1781 list_dedup
1782 );
1783 }
1784 }
1785
1786 #[test]
1787 fn inject_auth_enrichment_does_not_clobber_vector_namespace_builtin_fields() {
1788 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1789 use vector_lib::codecs::BytesDeserializerConfig;
1790 use vrl::value::KeyString;
1791
1792 let decoder = DecodingConfig::new(
1793 BytesDecoderConfig::new().into(),
1794 BytesDeserializerConfig::new().into(),
1795 LogNamespace::Vector,
1796 )
1797 .build()
1798 .unwrap()
1799 .with_log_namespace(LogNamespace::Vector);
1800
1801 let source = super::SimpleHttpSource {
1802 headers: vec![],
1803 query_parameters: vec![],
1804 path_key: OptionalValuePath::none(),
1805 host_key: OptionalValuePath::none(),
1806 decoder,
1807 log_namespace: LogNamespace::Vector,
1808 };
1809
1810 let mut log = LogEvent::default();
1811 log.insert(
1813 (
1814 PathPrefix::Metadata,
1815 path!(SimpleHttpConfig::NAME).concat(path!("path")),
1816 ),
1817 "/real/path",
1818 );
1819
1820 let mut events = vec![Event::Log(log)];
1821 let mut enrichment = ObjectMap::new();
1822 enrichment.insert(KeyString::from("path"), Value::from("/clobbered"));
1824 enrichment.insert(KeyString::from("tenant_id"), Value::from("t-123"));
1825
1826 source.inject_auth_enrichment(&mut events, enrichment);
1827
1828 let Event::Log(log) = &events[0] else {
1829 panic!("expected log event");
1830 };
1831 assert_eq!(
1832 log.get((
1833 PathPrefix::Metadata,
1834 path!(SimpleHttpConfig::NAME).concat(path!("path")),
1835 )),
1836 Some(&Value::from("/real/path")),
1837 "auth enrichment must not overwrite built-in source metadata"
1838 );
1839 assert_eq!(
1840 log.get((
1841 PathPrefix::Metadata,
1842 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1843 )),
1844 Some(&Value::from("t-123")),
1845 "new auth enrichment field must be injected"
1846 );
1847 }
1848
1849 #[test]
1850 fn inject_auth_enrichment_does_not_overwrite_existing_metadata_in_vector_namespace() {
1851 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1852 use vector_lib::codecs::BytesDeserializerConfig;
1853 use vrl::value::KeyString;
1854
1855 let decoder = DecodingConfig::new(
1856 BytesDecoderConfig::new().into(),
1857 BytesDeserializerConfig::new().into(),
1858 LogNamespace::Vector,
1859 )
1860 .build()
1861 .unwrap()
1862 .with_log_namespace(LogNamespace::Vector);
1863
1864 let source = super::SimpleHttpSource {
1865 headers: vec![],
1866 query_parameters: vec![],
1867 path_key: OptionalValuePath::none(),
1868 host_key: OptionalValuePath::none(),
1869 decoder,
1870 log_namespace: LogNamespace::Vector,
1871 };
1872
1873 let mut log = LogEvent::default();
1874 log.insert(
1876 (
1877 PathPrefix::Metadata,
1878 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1879 ),
1880 "existing",
1881 );
1882
1883 let mut events = vec![Event::Log(log)];
1884 let mut enrichment = ObjectMap::new();
1885 enrichment.insert(KeyString::from("tenant_id"), Value::from("auth-value"));
1886
1887 source.inject_auth_enrichment(&mut events, enrichment);
1888
1889 let Event::Log(log) = &events[0] else {
1890 panic!("expected log event");
1891 };
1892 assert_eq!(
1893 log.get((
1894 PathPrefix::Metadata,
1895 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1896 )),
1897 Some(&Value::from("existing")),
1898 "auth enrichment must not overwrite already-present metadata keys"
1899 );
1900 }
1901
1902 #[test]
1903 fn inject_auth_enrichment_applies_to_non_log_events_in_vector_namespace() {
1904 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1905 use vector_lib::{
1906 codecs::BytesDeserializerConfig,
1907 event::{Metric, MetricKind, MetricValue},
1908 };
1909 use vrl::value::KeyString;
1910
1911 let decoder = DecodingConfig::new(
1912 BytesDecoderConfig::new().into(),
1913 BytesDeserializerConfig::new().into(),
1914 LogNamespace::Vector,
1915 )
1916 .build()
1917 .unwrap()
1918 .with_log_namespace(LogNamespace::Vector);
1919
1920 let source = super::SimpleHttpSource {
1921 headers: vec![],
1922 query_parameters: vec![],
1923 path_key: OptionalValuePath::none(),
1924 host_key: OptionalValuePath::none(),
1925 decoder,
1926 log_namespace: LogNamespace::Vector,
1927 };
1928
1929 let metric = Metric::new(
1930 "requests",
1931 MetricKind::Incremental,
1932 MetricValue::Counter { value: 1.0 },
1933 );
1934 let mut events = vec![Event::Metric(metric)];
1935
1936 let mut enrichment = ObjectMap::new();
1937 enrichment.insert(KeyString::from("tenant_id"), Value::from("t-456"));
1938
1939 source.inject_auth_enrichment(&mut events, enrichment);
1940
1941 let Event::Metric(metric) = &events[0] else {
1942 panic!("expected metric event");
1943 };
1944 assert_eq!(
1945 metric
1946 .metadata()
1947 .value()
1948 .get(path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),),
1949 Some(&Value::from("t-456")),
1950 "auth enrichment must be written to non-log event metadata"
1951 );
1952 }
1953
1954 impl ValidatableComponent for SimpleHttpConfig {
1955 fn validation_configuration() -> ValidationConfiguration {
1956 let config = Self {
1957 decoding: Some(DeserializerConfig::Json(Default::default())),
1958 ..Default::default()
1959 };
1960
1961 let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1962
1963 let listen_addr_http = format!("http://{}/", config.address);
1964 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1965
1966 let external_resource = ExternalResource::new(
1967 ResourceDirection::Push,
1968 HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1969 config
1970 .get_decoding_config()
1971 .expect("should not fail to get decoding config"),
1972 );
1973
1974 ValidationConfiguration::from_source(
1975 Self::NAME,
1976 log_namespace,
1977 vec![ComponentTestCaseConfig::from_source(
1978 config,
1979 None,
1980 Some(external_resource),
1981 )],
1982 )
1983 }
1984 }
1985
1986 register_validatable_component!(SimpleHttpConfig);
1987}