@@ -7,9 +7,59 @@ use serde::{Deserialize, Serialize};
7
7
use tokio:: { io:: AsyncWriteExt , process:: Command , time} ;
8
8
use tokio_util:: codec;
9
9
use vector_lib:: configurable:: { component:: GenerateConfig , configurable_component} ;
10
+ use vrl:: value:: Value ;
10
11
11
12
use crate :: { config:: SecretBackend , signal} ;
12
13
14
+ /// Configuration for the command that will be `exec`ed
15
+ #[ configurable_component( secrets( "exec" ) ) ]
16
+ #[ configurable( metadata( docs:: enum_tag_description = "The protocol version." ) ) ]
17
+ #[ derive( Clone , Debug ) ]
18
+ #[ serde( rename_all = "snake_case" , tag = "version" ) ]
19
+ pub enum ExecVersion {
20
+ /// Expect the command to fetch the configuration options itself.
21
+ V1 ,
22
+
23
+ /// Configuration options to the command are to be curried upon each request.
24
+ V1_1 {
25
+ /// The name of the backend. This is `type` field in the backend request.
26
+ backend_type : String ,
27
+ /// The configuration to pass to the secrets executable. This is the `config` field in the
28
+ /// backend request. Refer to the documentation of your `backend_type `to see which options
29
+ /// are required to be set.
30
+ backend_config : Value ,
31
+ } ,
32
+ }
33
+
34
+ impl ExecVersion {
35
+ fn new_query ( & self , secrets : HashSet < String > ) -> ExecQuery {
36
+ match & self {
37
+ ExecVersion :: V1 => ExecQuery {
38
+ version : "1.0" . to_string ( ) ,
39
+ secrets,
40
+ r#type : None ,
41
+ config : None ,
42
+ } ,
43
+ ExecVersion :: V1_1 {
44
+ backend_type,
45
+ backend_config,
46
+ ..
47
+ } => ExecQuery {
48
+ version : "1.1" . to_string ( ) ,
49
+ secrets,
50
+ r#type : Some ( backend_type. clone ( ) ) ,
51
+ config : Some ( backend_config. clone ( ) ) ,
52
+ } ,
53
+ }
54
+ }
55
+ }
56
+
57
+ impl GenerateConfig for ExecVersion {
58
+ fn generate_config ( ) -> toml:: Value {
59
+ toml:: Value :: try_from ( ExecVersion :: V1 ) . unwrap ( )
60
+ }
61
+ }
62
+
13
63
/// Configuration for the `exec` secrets backend.
14
64
#[ configurable_component( secrets( "exec" ) ) ]
15
65
#[ derive( Clone , Debug ) ]
@@ -22,13 +72,18 @@ pub struct ExecBackend {
22
72
/// The timeout, in seconds, to wait for the command to complete.
23
73
#[ serde( default = "default_timeout_secs" ) ]
24
74
pub timeout : u64 ,
75
+
76
+ /// Settings for the protocol between Vector and the secrets executable.
77
+ #[ serde( default = "default_protocol_version" ) ]
78
+ pub protocol : ExecVersion ,
25
79
}
26
80
27
81
impl GenerateConfig for ExecBackend {
28
82
fn generate_config ( ) -> toml:: Value {
29
83
toml:: Value :: try_from ( ExecBackend {
30
84
command : vec ! [ String :: from( "/path/to/script" ) ] ,
31
85
timeout : 5 ,
86
+ protocol : ExecVersion :: V1 ,
32
87
} )
33
88
. unwrap ( )
34
89
}
@@ -38,17 +93,20 @@ const fn default_timeout_secs() -> u64 {
38
93
5
39
94
}
40
95
41
- #[ derive( Clone , Debug , Deserialize , Serialize ) ]
96
+ const fn default_protocol_version ( ) -> ExecVersion {
97
+ ExecVersion :: V1
98
+ }
99
+
100
+ #[ derive( Clone , Debug , Serialize ) ]
42
101
struct ExecQuery {
102
+ // Fields in all versions starting from v1
43
103
version : String ,
44
104
secrets : HashSet < String > ,
45
- }
46
-
47
- fn new_query ( secrets : HashSet < String > ) -> ExecQuery {
48
- ExecQuery {
49
- version : "1.0" . to_string ( ) ,
50
- secrets,
51
- }
105
+ // Fields added in v1.1
106
+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
107
+ r#type : Option < String > ,
108
+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
109
+ config : Option < Value > ,
52
110
}
53
111
54
112
#[ derive( Clone , Debug , Deserialize , Serialize ) ]
@@ -66,7 +124,7 @@ impl SecretBackend for ExecBackend {
66
124
let mut output = executor:: block_on ( async {
67
125
query_backend (
68
126
& self . command ,
69
- new_query ( secret_keys. clone ( ) ) ,
127
+ self . protocol . new_query ( secret_keys. clone ( ) ) ,
70
128
self . timeout ,
71
129
signal_rx,
72
130
)
@@ -159,3 +217,79 @@ async fn query_backend(
159
217
let response = serde_json:: from_slice :: < HashMap < String , ExecResponse > > ( & output) ?;
160
218
Ok ( response)
161
219
}
220
+
221
+ #[ cfg( test) ]
222
+ mod tests {
223
+ use crate :: {
224
+ config:: SecretBackend ,
225
+ secrets:: exec:: { ExecBackend , ExecVersion } ,
226
+ } ;
227
+ use rstest:: rstest;
228
+ use std:: {
229
+ collections:: { HashMap , HashSet } ,
230
+ path:: PathBuf ,
231
+ } ;
232
+ use tokio:: sync:: broadcast;
233
+ use vrl:: value;
234
+
235
+ fn make_test_backend ( protocol : ExecVersion ) -> ExecBackend {
236
+ let command_path = PathBuf :: from ( env ! ( "CARGO_MANIFEST_DIR" ) )
237
+ . join ( "tests/behavior/secrets/mock_secrets_exec.py" ) ;
238
+ ExecBackend {
239
+ command : [ "python" , command_path. to_str ( ) . unwrap ( ) ]
240
+ . map ( String :: from)
241
+ . to_vec ( ) ,
242
+ timeout : 5 ,
243
+ protocol,
244
+ }
245
+ }
246
+
247
+ #[ tokio:: test( flavor = "multi_thread" ) ]
248
+ #[ rstest(
249
+ protocol,
250
+ case( ExecVersion :: V1 ) ,
251
+ case( ExecVersion :: V1_1 {
252
+ backend_type: "file.json" . to_string( ) ,
253
+ backend_config: value!( { "file_path" : "/abc.json" } ) ,
254
+ } )
255
+ ) ]
256
+ async fn test_exec_backend ( protocol : ExecVersion ) {
257
+ let mut backend = make_test_backend ( protocol) ;
258
+ let ( _tx, mut rx) = broadcast:: channel ( 1 ) ;
259
+ // These fake secrets are statically contained in mock_secrets_exec.py
260
+ let fake_secret_values: HashMap < String , String > = [
261
+ ( "fake_secret_1" , "123456" ) ,
262
+ ( "fake_secret_2" , "123457" ) ,
263
+ ( "fake_secret_3" , "123458" ) ,
264
+ ( "fake_secret_4" , "123459" ) ,
265
+ ( "fake_secret_5" , "123460" ) ,
266
+ ]
267
+ . into_iter ( )
268
+ . map ( |( k, v) | ( k. to_string ( ) , v. to_string ( ) ) )
269
+ . collect ( ) ;
270
+ // Calling the mock_secrets_exec.py program with the expected secret keys should provide
271
+ // the values expected above in `fake_secret_values`
272
+ let fetched_keys = backend
273
+ . retrieve ( fake_secret_values. keys ( ) . cloned ( ) . collect ( ) , & mut rx)
274
+ . await
275
+ . unwrap ( ) ;
276
+ // Assert response is as expected
277
+ assert_eq ! ( fetched_keys. len( ) , 5 ) ;
278
+ for ( fake_secret_key, fake_secret_value) in fake_secret_values {
279
+ assert_eq ! ( fetched_keys. get( & fake_secret_key) , Some ( & fake_secret_value) ) ;
280
+ }
281
+ }
282
+
283
+ #[ tokio:: test( flavor = "multi_thread" ) ]
284
+ async fn test_exec_backend_missing_secrets ( ) {
285
+ let mut backend = make_test_backend ( ExecVersion :: V1 ) ;
286
+ let ( _tx, mut rx) = broadcast:: channel ( 1 ) ;
287
+ let query_secrets: HashSet < String > =
288
+ [ "fake_secret_900" ] . into_iter ( ) . map ( String :: from) . collect ( ) ;
289
+ let fetched_keys = backend. retrieve ( query_secrets. clone ( ) , & mut rx) . await ;
290
+ assert_eq ! (
291
+ format!( "{}" , fetched_keys. unwrap_err( ) ) ,
292
+ "secret for key 'fake_secret_900' was not retrieved: backend does not provide secret key"
293
+ ) ;
294
+ }
295
+ }
0 commit comments