14
14
"""Compliance credentials configuration."""
15
15
16
16
import logging
17
+ import os
18
+ import shlex
19
+ import subprocess # nosec B404
17
20
from collections import OrderedDict , namedtuple
18
21
from configparser import RawConfigParser
19
22
from os import environ
27
30
28
31
29
32
class OnePasswordBackend :
30
- pass
33
+ def __init__ (self , ** kwargs ):
34
+ self ._url = kwargs .get ("url" )
35
+ self ._vault = kwargs .get ("vault" , "auditree" )
36
+
37
+ def get_section (self , section ):
38
+ cmd = f"op item get --vault { self ._vault } --format json { section } "
39
+ subprocess .run ( # nosec B603
40
+ shlex .split (cmd ),
41
+ env = os .environ ,
42
+ stdout = subprocess .PIPE ,
43
+ stderr = subprocess .PIPE ,
44
+ check = True ,
45
+ )
46
+
47
+ def attribute_error_msg (self , section , attr ):
48
+ return (
49
+ f'Unable to locate field "{ attr } " '
50
+ f'in secure note "{ section } " '
51
+ f'at 1Password vault "{ self ._vault } "'
52
+ )
31
53
32
54
33
55
class ConfigParserBackend :
@@ -36,6 +58,21 @@ def __init__(self, **kwargs):
36
58
self ._cfg_file = kwargs .get ("cfg_file" )
37
59
self ._cfg .read (str (Path (self ._cfg_file ).expanduser ()))
38
60
61
+ def get_section (self , section ):
62
+ params = []
63
+ values = []
64
+ if self ._cfg .has_section (section ):
65
+ params = self ._cfg .options (section )
66
+ values = [self ._cfg .get (section , x ) for x in self ._cfg .options (section )]
67
+ return OrderedDict (zip (params , values ))
68
+
69
+ def attribute_error_msg (self , section , attr ):
70
+ return (
71
+ f'Unable to locate attribute "{ attr } " '
72
+ f'in section "{ section } " '
73
+ f'at config file "{ self ._cfg_file } "'
74
+ )
75
+
39
76
40
77
class Config :
41
78
"""Handle credentials configuration."""
@@ -46,14 +83,20 @@ def __init__(self, cfg_file="~/.credentials", backend_cfg=None):
46
83
"""
47
84
Create an instance of a dictionary-like configuration object.
48
85
49
- :param cfg_file: The path to the RawConfigParser compatible config file
86
+ :param cfg_file: The path to a config file for building a ConfigParserBackend.
87
+ :param backend_cfg: A dictionary with the backend config
50
88
"""
51
89
52
90
if backend_cfg is None :
53
91
backend_cfg = {"name" : "configparser" , "cfg_file" : cfg_file }
54
92
self ._init_backend (backend_cfg )
55
93
56
94
def _init_backend (self , backend_cfg ):
95
+ """
96
+ Create an instance of a dictionary-like configuration object.
97
+
98
+ :param cfg_file: The path to the RawConfigParser compatible config file
99
+ """
57
100
name = backend_cfg .get ("name" )
58
101
if backend_cfg .get ("name" ) not in Config .BACKENDS :
59
102
raise ValueError (f"Invalid credentials backend name: { name } " )
@@ -84,26 +127,16 @@ def _getattr_wrapper(t, attr):
84
127
try :
85
128
return t .__getattribute__ (attr )
86
129
except AttributeError as exc :
87
- exc .args = (
88
- (
89
- f'Unable to locate attribute "{ attr } " '
90
- f'in section "{ type (t ).__name__ } " '
91
- f'at config file "{ self ._cfg_file } "'
92
- ),
93
- )
130
+ exc .args = (self ._backend .attribute_error_msg (type (t ).__name__ , attr ),)
94
131
raise exc
95
132
96
133
env_vars = [k for k in environ .keys () if k .startswith (f"{ section .upper ()} _" )]
97
134
env_keys = [k .split (section .upper ())[1 ].lstrip ("_" ).lower () for k in env_vars ]
98
135
env_values = [environ [e ] for e in env_vars ]
99
136
if env_vars :
100
137
logger .debug (f'Loading credentials from ENV vars: { ", " .join (env_vars )} ' )
101
- params = []
102
- if self ._cfg .has_section (section ):
103
- params = self ._cfg .options (section )
104
- values = [self ._cfg .get (section , x ) for x in params ]
105
138
106
- d = OrderedDict ( zip ( params , values ) )
139
+ d = self . _backend . get_section ( section )
107
140
108
141
if env_vars :
109
142
d .update (zip (env_keys , env_values ))
0 commit comments