vector/sinks/databricks_zerobus/
config.rs

1//! Configuration for the Zerobus sink.
2
3use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8    prelude::*,
9    util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14/// Authentication configuration for Databricks.
15#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19    docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22    /// Authenticate using OAuth 2.0 client credentials.
23    #[serde(rename = "oauth")]
24    OAuth {
25        /// OAuth 2.0 client ID.
26        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27        #[configurable(metadata(docs::examples = "abc123..."))]
28        client_id: SensitiveString,
29
30        /// OAuth 2.0 client secret.
31        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32        #[configurable(metadata(docs::examples = "secret123..."))]
33        client_secret: SensitiveString,
34    },
35}
36
37impl DatabricksAuthentication {
38    /// Extract the client ID and client secret as string references.
39    pub fn credentials(&self) -> (&str, &str) {
40        match self {
41            DatabricksAuthentication::OAuth {
42                client_id,
43                client_secret,
44            } => (client_id.inner(), client_secret.inner()),
45        }
46    }
47}
48
49/// Zerobus stream configuration options.
50///
51/// This is a thin wrapper around the SDK's `StreamConfigurationOptions` with Vector-specific
52/// configuration attributes and custom defaults suitable for Vector's use case.
53#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ZerobusStreamOptions {
57    /// Timeout in milliseconds for flush operations.
58    #[serde(default = "default_flush_timeout_ms")]
59    #[configurable(metadata(docs::examples = 30000))]
60    pub flush_timeout_ms: u64,
61
62    /// Timeout in milliseconds for server acknowledgements.
63    #[serde(default = "default_server_ack_timeout_ms")]
64    #[configurable(metadata(docs::examples = 60000))]
65    pub server_lack_of_ack_timeout_ms: u64,
66}
67
68impl Default for ZerobusStreamOptions {
69    fn default() -> Self {
70        Self {
71            flush_timeout_ms: default_flush_timeout_ms(),
72            server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
73        }
74    }
75}
76
77/// Configuration for the Databricks Zerobus sink.
78#[configurable_component(sink(
79    "databricks_zerobus",
80    "Stream observability data to Databricks Unity Catalog via Zerobus."
81))]
82#[derive(Clone, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ZerobusSinkConfig {
85    /// The Zerobus ingestion endpoint URL.
86    ///
87    /// This should be the full URL to the Zerobus ingestion service.
88    #[configurable(metadata(docs::examples = "https://ingest.dev.databricks.com"))]
89    #[configurable(metadata(docs::examples = "https://ingest.prod.databricks.com"))]
90    pub ingestion_endpoint: String,
91
92    /// The Unity Catalog table name to write to.
93    ///
94    /// This should be in the format `catalog.schema.table`.
95    #[configurable(metadata(docs::examples = "logging_platform.my_team.logs"))]
96    #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
97    pub table_name: String,
98
99    /// The Unity Catalog endpoint URL.
100    ///
101    /// This is used for authentication and table metadata.
102    #[configurable(metadata(
103        docs::examples = "https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com"
104    ))]
105    #[configurable(metadata(docs::examples = "https://your-workspace.cloud.databricks.com"))]
106    pub unity_catalog_endpoint: String,
107
108    /// Databricks authentication configuration.
109    #[configurable(derived)]
110    pub auth: DatabricksAuthentication,
111
112    #[configurable(derived)]
113    #[serde(default)]
114    pub stream_options: ZerobusStreamOptions,
115
116    #[configurable(derived)]
117    #[serde(default)]
118    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
119
120    #[configurable(derived)]
121    #[serde(default)]
122    pub request: TowerRequestConfig,
123
124    #[configurable(derived)]
125    #[serde(
126        default,
127        deserialize_with = "crate::serde::bool_or_struct",
128        skip_serializing_if = "crate::serde::is_default"
129    )]
130    pub acknowledgements: AcknowledgementsConfig,
131}
132
133impl GenerateConfig for ZerobusSinkConfig {
134    fn generate_config() -> toml::Value {
135        toml::Value::try_from(Self {
136            ingestion_endpoint: "https://ingest.dev.databricks.com".to_string(),
137            table_name: "catalog.schema.table".to_string(),
138            unity_catalog_endpoint: "https://your-workspace.cloud.databricks.com".to_string(),
139            auth: DatabricksAuthentication::OAuth {
140                client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
141                client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
142            },
143            stream_options: ZerobusStreamOptions::default(),
144            batch: BatchConfig::default(),
145            request: TowerRequestConfig::default(),
146            acknowledgements: AcknowledgementsConfig::default(),
147        })
148        .unwrap()
149    }
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "databricks_zerobus")]
154impl SinkConfig for ZerobusSinkConfig {
155    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
156        self.validate()?;
157
158        let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
159        let healthcheck_service = service.clone();
160
161        let request_limits = self.request.into_settings();
162
163        let sink = ZerobusSink::new(service, request_limits, self.batch)?;
164
165        let healthcheck = async move {
166            healthcheck_service
167                .ensure_stream()
168                .await
169                .map_err(|e| e.into())
170        };
171
172        Ok((
173            VectorSink::from_event_streamsink(sink),
174            Box::pin(healthcheck),
175        ))
176    }
177
178    fn input(&self) -> Input {
179        Input::log()
180    }
181
182    fn acknowledgements(&self) -> &AcknowledgementsConfig {
183        &self.acknowledgements
184    }
185}
186
187impl ZerobusSinkConfig {
188    pub fn validate(&self) -> Result<(), ZerobusSinkError> {
189        if self.ingestion_endpoint.is_empty() {
190            return Err(ZerobusSinkError::ConfigError {
191                message: "ingestion_endpoint cannot be empty".to_string(),
192            });
193        }
194
195        if self.table_name.is_empty() {
196            return Err(ZerobusSinkError::ConfigError {
197                message: "table_name cannot be empty".to_string(),
198            });
199        }
200
201        let parts: Vec<&str> = self.table_name.split('.').collect();
202        if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
203            return Err(ZerobusSinkError::ConfigError {
204                message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
205                    .to_string(),
206            });
207        }
208
209        if self.unity_catalog_endpoint.is_empty() {
210            return Err(ZerobusSinkError::ConfigError {
211                message: "unity_catalog_endpoint cannot be empty".to_string(),
212            });
213        }
214
215        // Validate authentication credentials
216        match &self.auth {
217            DatabricksAuthentication::OAuth {
218                client_id,
219                client_secret,
220            } => {
221                if client_id.inner().is_empty() {
222                    return Err(ZerobusSinkError::ConfigError {
223                        message: "OAuth client_id cannot be empty".to_string(),
224                    });
225                }
226                if client_secret.inner().is_empty() {
227                    return Err(ZerobusSinkError::ConfigError {
228                        message: "OAuth client_secret cannot be empty".to_string(),
229                    });
230                }
231            }
232        }
233
234        if let Some(max_bytes) = self.batch.max_bytes {
235            // Zerobus SDK limits max bytes to 10MB. This cap is a conservative safety limit:
236            // it's measured against Vector's pre-serialization sizing, not the protobuf bytes
237            // the SDK actually sends. Vector's pre-serialization size is generally larger than
238            // the SDK's protobuf-encoded size, so enforcing the 10MB cap here ensures the SDK's
239            // 10MB limit cannot be exceeded at runtime.
240            if max_bytes > 10_000_000 {
241                return Err(ZerobusSinkError::ConfigError {
242                    message: "max_bytes must be less than or equal to 10MB".to_string(),
243                });
244            }
245        }
246
247        Ok(())
248    }
249}
250
251// Default value functions
252const fn default_flush_timeout_ms() -> u64 {
253    30000
254}
255
256const fn default_server_ack_timeout_ms() -> u64 {
257    60000
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use vector_lib::sensitive_string::SensitiveString;
264
265    fn create_test_config() -> ZerobusSinkConfig {
266        ZerobusSinkConfig {
267            ingestion_endpoint: "https://test.databricks.com".to_string(),
268            table_name: "test.default.logs".to_string(),
269            unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
270            auth: DatabricksAuthentication::OAuth {
271                client_id: SensitiveString::from("test-client-id".to_string()),
272                client_secret: SensitiveString::from("test-client-secret".to_string()),
273            },
274            stream_options: ZerobusStreamOptions::default(),
275            batch: Default::default(),
276            request: Default::default(),
277            acknowledgements: Default::default(),
278        }
279    }
280
281    #[test]
282    fn test_config_validation_success() {
283        let config = create_test_config();
284        assert!(config.validate().is_ok());
285    }
286
287    #[test]
288    fn test_config_validation_empty_endpoint() {
289        let mut config = create_test_config();
290        config.ingestion_endpoint = "".to_string();
291
292        let result = config.validate();
293        assert!(result.is_err());
294
295        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
296            message,
297        }) = result
298        {
299            assert!(message.contains("ingestion_endpoint cannot be empty"));
300        } else {
301            panic!("Expected ConfigError for empty ingestion_endpoint");
302        }
303    }
304
305    #[test]
306    fn test_config_validation_empty_table_name() {
307        let mut config = create_test_config();
308        config.table_name = "".to_string();
309
310        let result = config.validate();
311        assert!(result.is_err());
312
313        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
314            message,
315        }) = result
316        {
317            assert!(message.contains("table_name cannot be empty"));
318        } else {
319            panic!("Expected ConfigError for empty table_name");
320        }
321    }
322
323    #[test]
324    fn test_config_validation_invalid_table_name() {
325        let mut config = create_test_config();
326        config.table_name = "invalid_table".to_string(); // Missing dots
327
328        let result = config.validate();
329        assert!(result.is_err());
330
331        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
332            message,
333        }) = result
334        {
335            assert!(message.contains("catalog.schema.table"));
336        } else {
337            panic!("Expected ConfigError for invalid table_name format");
338        }
339    }
340
341    #[test]
342    fn test_config_validation_table_name_empty_segments() {
343        for bad in [
344            "catalog..table",
345            ".schema.table",
346            "catalog.schema.",
347            "..",
348            "catalog.schema.table.extra",
349        ] {
350            let mut config = create_test_config();
351            config.table_name = bad.to_string();
352            let result = config.validate();
353            assert!(result.is_err(), "expected error for table_name={bad:?}");
354            if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355                message,
356            }) = result
357            {
358                assert!(message.contains("catalog.schema.table"));
359            } else {
360                panic!("Expected ConfigError for table_name={bad:?}");
361            }
362        }
363    }
364
365    #[test]
366    fn test_config_validation_empty_unity_catalog_endpoint() {
367        let mut config = create_test_config();
368        config.unity_catalog_endpoint = "".to_string();
369
370        let result = config.validate();
371        assert!(result.is_err());
372
373        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
374            message,
375        }) = result
376        {
377            assert!(message.contains("unity_catalog_endpoint cannot be empty"));
378        } else {
379            panic!("Expected ConfigError for empty unity_catalog_endpoint");
380        }
381    }
382
383    #[test]
384    fn test_config_validation_empty_oauth_credentials() {
385        let mut config = create_test_config();
386        config.auth = DatabricksAuthentication::OAuth {
387            client_id: SensitiveString::from("".to_string()),
388            client_secret: SensitiveString::from("test-secret".to_string()),
389        };
390
391        let result = config.validate();
392        assert!(result.is_err());
393
394        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
395            message,
396        }) = result
397        {
398            assert!(message.contains("OAuth client_id cannot be empty"));
399        } else {
400            panic!("Expected ConfigError for empty OAuth client_id");
401        }
402    }
403
404    /// When `batch.max_bytes` is `None` (user omitted the field or set it to `null`),
405    /// `into_batcher_settings()` must merge it against
406    /// `RealtimeSizeBasedDefaultBatchSettings::MAX_BYTES` (10MB) — never unbounded.
407    /// This guarantees the Zerobus SDK's 10MB limit cannot be exceeded at runtime
408    /// even without an explicit user cap.
409    #[test]
410    fn test_batch_max_bytes_none_defaults_to_10mb() {
411        let mut config = create_test_config();
412        config.batch.max_bytes = None;
413
414        let settings = config
415            .batch
416            .into_batcher_settings()
417            .expect("batch settings should build");
418
419        assert_eq!(settings.size_limit, 10_000_000);
420    }
421}