vector/sinks/databricks_zerobus/
config.rs1use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8 prelude::*,
9 util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19 docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22 #[serde(rename = "oauth")]
24 OAuth {
25 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27 #[configurable(metadata(docs::examples = "abc123..."))]
28 client_id: SensitiveString,
29
30 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32 #[configurable(metadata(docs::examples = "secret123..."))]
33 client_secret: SensitiveString,
34 },
35}
36
37impl DatabricksAuthentication {
38 pub fn credentials(&self) -> (&str, &str) {
40 match self {
41 DatabricksAuthentication::OAuth {
42 client_id,
43 client_secret,
44 } => (client_id.inner(), client_secret.inner()),
45 }
46 }
47}
48
49#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ZerobusStreamOptions {
57 #[serde(default = "default_flush_timeout_ms")]
59 #[configurable(metadata(docs::examples = 30000))]
60 pub flush_timeout_ms: u64,
61
62 #[serde(default = "default_server_ack_timeout_ms")]
64 #[configurable(metadata(docs::examples = 60000))]
65 pub server_lack_of_ack_timeout_ms: u64,
66}
67
68impl Default for ZerobusStreamOptions {
69 fn default() -> Self {
70 Self {
71 flush_timeout_ms: default_flush_timeout_ms(),
72 server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
73 }
74 }
75}
76
77#[configurable_component(sink(
79 "databricks_zerobus",
80 "Stream observability data to Databricks Unity Catalog via Zerobus."
81))]
82#[derive(Clone, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ZerobusSinkConfig {
85 #[configurable(metadata(docs::examples = "https://ingest.dev.databricks.com"))]
89 #[configurable(metadata(docs::examples = "https://ingest.prod.databricks.com"))]
90 pub ingestion_endpoint: String,
91
92 #[configurable(metadata(docs::examples = "logging_platform.my_team.logs"))]
96 #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
97 pub table_name: String,
98
99 #[configurable(metadata(
103 docs::examples = "https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com"
104 ))]
105 #[configurable(metadata(docs::examples = "https://your-workspace.cloud.databricks.com"))]
106 pub unity_catalog_endpoint: String,
107
108 #[configurable(derived)]
110 pub auth: DatabricksAuthentication,
111
112 #[configurable(derived)]
113 #[serde(default)]
114 pub stream_options: ZerobusStreamOptions,
115
116 #[configurable(derived)]
117 #[serde(default)]
118 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
119
120 #[configurable(derived)]
121 #[serde(default)]
122 pub request: TowerRequestConfig,
123
124 #[configurable(derived)]
125 #[serde(
126 default,
127 deserialize_with = "crate::serde::bool_or_struct",
128 skip_serializing_if = "crate::serde::is_default"
129 )]
130 pub acknowledgements: AcknowledgementsConfig,
131}
132
133impl GenerateConfig for ZerobusSinkConfig {
134 fn generate_config() -> toml::Value {
135 toml::Value::try_from(Self {
136 ingestion_endpoint: "https://ingest.dev.databricks.com".to_string(),
137 table_name: "catalog.schema.table".to_string(),
138 unity_catalog_endpoint: "https://your-workspace.cloud.databricks.com".to_string(),
139 auth: DatabricksAuthentication::OAuth {
140 client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
141 client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
142 },
143 stream_options: ZerobusStreamOptions::default(),
144 batch: BatchConfig::default(),
145 request: TowerRequestConfig::default(),
146 acknowledgements: AcknowledgementsConfig::default(),
147 })
148 .unwrap()
149 }
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "databricks_zerobus")]
154impl SinkConfig for ZerobusSinkConfig {
155 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
156 self.validate()?;
157
158 let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
159 let healthcheck_service = service.clone();
160
161 let request_limits = self.request.into_settings();
162
163 let sink = ZerobusSink::new(service, request_limits, self.batch)?;
164
165 let healthcheck = async move {
166 healthcheck_service
167 .ensure_stream()
168 .await
169 .map_err(|e| e.into())
170 };
171
172 Ok((
173 VectorSink::from_event_streamsink(sink),
174 Box::pin(healthcheck),
175 ))
176 }
177
178 fn input(&self) -> Input {
179 Input::log()
180 }
181
182 fn acknowledgements(&self) -> &AcknowledgementsConfig {
183 &self.acknowledgements
184 }
185}
186
187impl ZerobusSinkConfig {
188 pub fn validate(&self) -> Result<(), ZerobusSinkError> {
189 if self.ingestion_endpoint.is_empty() {
190 return Err(ZerobusSinkError::ConfigError {
191 message: "ingestion_endpoint cannot be empty".to_string(),
192 });
193 }
194
195 if self.table_name.is_empty() {
196 return Err(ZerobusSinkError::ConfigError {
197 message: "table_name cannot be empty".to_string(),
198 });
199 }
200
201 let parts: Vec<&str> = self.table_name.split('.').collect();
202 if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
203 return Err(ZerobusSinkError::ConfigError {
204 message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
205 .to_string(),
206 });
207 }
208
209 if self.unity_catalog_endpoint.is_empty() {
210 return Err(ZerobusSinkError::ConfigError {
211 message: "unity_catalog_endpoint cannot be empty".to_string(),
212 });
213 }
214
215 match &self.auth {
217 DatabricksAuthentication::OAuth {
218 client_id,
219 client_secret,
220 } => {
221 if client_id.inner().is_empty() {
222 return Err(ZerobusSinkError::ConfigError {
223 message: "OAuth client_id cannot be empty".to_string(),
224 });
225 }
226 if client_secret.inner().is_empty() {
227 return Err(ZerobusSinkError::ConfigError {
228 message: "OAuth client_secret cannot be empty".to_string(),
229 });
230 }
231 }
232 }
233
234 if let Some(max_bytes) = self.batch.max_bytes {
235 if max_bytes > 10_000_000 {
241 return Err(ZerobusSinkError::ConfigError {
242 message: "max_bytes must be less than or equal to 10MB".to_string(),
243 });
244 }
245 }
246
247 Ok(())
248 }
249}
250
251const fn default_flush_timeout_ms() -> u64 {
253 30000
254}
255
256const fn default_server_ack_timeout_ms() -> u64 {
257 60000
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use vector_lib::sensitive_string::SensitiveString;
264
265 fn create_test_config() -> ZerobusSinkConfig {
266 ZerobusSinkConfig {
267 ingestion_endpoint: "https://test.databricks.com".to_string(),
268 table_name: "test.default.logs".to_string(),
269 unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
270 auth: DatabricksAuthentication::OAuth {
271 client_id: SensitiveString::from("test-client-id".to_string()),
272 client_secret: SensitiveString::from("test-client-secret".to_string()),
273 },
274 stream_options: ZerobusStreamOptions::default(),
275 batch: Default::default(),
276 request: Default::default(),
277 acknowledgements: Default::default(),
278 }
279 }
280
281 #[test]
282 fn test_config_validation_success() {
283 let config = create_test_config();
284 assert!(config.validate().is_ok());
285 }
286
287 #[test]
288 fn test_config_validation_empty_endpoint() {
289 let mut config = create_test_config();
290 config.ingestion_endpoint = "".to_string();
291
292 let result = config.validate();
293 assert!(result.is_err());
294
295 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
296 message,
297 }) = result
298 {
299 assert!(message.contains("ingestion_endpoint cannot be empty"));
300 } else {
301 panic!("Expected ConfigError for empty ingestion_endpoint");
302 }
303 }
304
305 #[test]
306 fn test_config_validation_empty_table_name() {
307 let mut config = create_test_config();
308 config.table_name = "".to_string();
309
310 let result = config.validate();
311 assert!(result.is_err());
312
313 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
314 message,
315 }) = result
316 {
317 assert!(message.contains("table_name cannot be empty"));
318 } else {
319 panic!("Expected ConfigError for empty table_name");
320 }
321 }
322
323 #[test]
324 fn test_config_validation_invalid_table_name() {
325 let mut config = create_test_config();
326 config.table_name = "invalid_table".to_string(); let result = config.validate();
329 assert!(result.is_err());
330
331 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
332 message,
333 }) = result
334 {
335 assert!(message.contains("catalog.schema.table"));
336 } else {
337 panic!("Expected ConfigError for invalid table_name format");
338 }
339 }
340
341 #[test]
342 fn test_config_validation_table_name_empty_segments() {
343 for bad in [
344 "catalog..table",
345 ".schema.table",
346 "catalog.schema.",
347 "..",
348 "catalog.schema.table.extra",
349 ] {
350 let mut config = create_test_config();
351 config.table_name = bad.to_string();
352 let result = config.validate();
353 assert!(result.is_err(), "expected error for table_name={bad:?}");
354 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355 message,
356 }) = result
357 {
358 assert!(message.contains("catalog.schema.table"));
359 } else {
360 panic!("Expected ConfigError for table_name={bad:?}");
361 }
362 }
363 }
364
365 #[test]
366 fn test_config_validation_empty_unity_catalog_endpoint() {
367 let mut config = create_test_config();
368 config.unity_catalog_endpoint = "".to_string();
369
370 let result = config.validate();
371 assert!(result.is_err());
372
373 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
374 message,
375 }) = result
376 {
377 assert!(message.contains("unity_catalog_endpoint cannot be empty"));
378 } else {
379 panic!("Expected ConfigError for empty unity_catalog_endpoint");
380 }
381 }
382
383 #[test]
384 fn test_config_validation_empty_oauth_credentials() {
385 let mut config = create_test_config();
386 config.auth = DatabricksAuthentication::OAuth {
387 client_id: SensitiveString::from("".to_string()),
388 client_secret: SensitiveString::from("test-secret".to_string()),
389 };
390
391 let result = config.validate();
392 assert!(result.is_err());
393
394 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
395 message,
396 }) = result
397 {
398 assert!(message.contains("OAuth client_id cannot be empty"));
399 } else {
400 panic!("Expected ConfigError for empty OAuth client_id");
401 }
402 }
403
404 #[test]
410 fn test_batch_max_bytes_none_defaults_to_10mb() {
411 let mut config = create_test_config();
412 config.batch.max_bytes = None;
413
414 let settings = config
415 .batch
416 .into_batcher_settings()
417 .expect("batch settings should build");
418
419 assert_eq!(settings.size_limit, 10_000_000);
420 }
421}