13
13
from typing import Iterable
14
14
from typing import List
15
15
from typing import Tuple
16
+ from urllib .parse import urljoin
16
17
17
18
import pytz
18
19
import saneyaml
31
32
from vulnerabilities .utils import build_description
32
33
from vulnerabilities .utils import get_advisory_url
33
34
from vulnerabilities .utils import get_cwe_id
35
+ from vulntotal .datasources .gitlab import get_casesensitive_slug
36
+ from vulntotal .datasources .gitlab_api import fetch_gitlab_advisories_for_purl
37
+ from vulntotal .datasources .gitlab_api import get_estimated_advisories_count
34
38
35
39
36
40
class GitLabImporterPipeline (VulnerableCodeBaseImporterPipeline ):
@@ -42,9 +46,16 @@ class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline):
42
46
license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"
43
47
importer_name = "GitLab Importer"
44
48
repo_url = "git+https://gitlab.com/gitlab-org/advisories-community/"
49
+ is_batch_run = True
45
50
46
51
@classmethod
47
52
def steps (cls ):
53
+ if not cls .is_batch_run :
54
+ return (
55
+ cls .collect_and_store_advisories ,
56
+ cls .import_new_advisories ,
57
+ )
58
+
48
59
return (
49
60
cls .clone ,
50
61
cls .collect_and_store_advisories ,
@@ -66,15 +77,57 @@ def steps(cls):
66
77
67
78
gitlab_scheme_by_purl_type = {v : k for k , v in purl_type_by_gitlab_scheme .items ()}
68
79
80
+ def __init__ (self , * args , purl = None , ** kwargs ):
81
+ super ().__init__ (* args , ** kwargs )
82
+ self .purl = purl
83
+ # If a purl is provided, we are running in package-first mode
84
+ if self .purl :
85
+ GitLabImporterPipeline .is_batch_run = False
86
+
69
87
def clone (self ):
70
88
self .log (f"Cloning `{ self .repo_url } `" )
71
89
self .vcs_response = fetch_via_vcs (self .repo_url )
72
90
73
91
def advisories_count (self ):
74
- root = Path (self .vcs_response .dest_dir )
75
- return sum (1 for _ in root .rglob ("*.yml" ))
92
+ if GitLabImporterPipeline .is_batch_run :
93
+ root = Path (self .vcs_response .dest_dir )
94
+ return sum (1 for _ in root .rglob ("*.yml" ))
95
+ else :
96
+ return get_estimated_advisories_count (
97
+ self .purl , self .purl_type_by_gitlab_scheme , get_casesensitive_slug
98
+ )
76
99
77
100
def collect_advisories (self ) -> Iterable [AdvisoryData ]:
101
+ if not self .is_batch_run :
102
+ advisories = fetch_gitlab_advisories_for_purl (
103
+ self .purl , self .purl_type_by_gitlab_scheme , get_casesensitive_slug
104
+ )
105
+
106
+ input_version = self .purl .version
107
+ vrc = RANGE_CLASS_BY_SCHEMES [self .purl .type ]
108
+ version_obj = vrc .version_class (input_version ) if input_version else None
109
+
110
+ for advisory in advisories :
111
+ advisory_data = self ._advisory_dict_to_advisory_data (advisory )
112
+ # If purl has version, we need to check if advisory affects the version
113
+ if input_version :
114
+ affected = False
115
+ for affected_package in advisory_data .affected_packages :
116
+ vrange = affected_package .affected_version_range
117
+ fixed_version = affected_package .fixed_version
118
+ if vrange and version_obj in vrange :
119
+ if fixed_version :
120
+ fixed_version_obj = vrc .version_class (str (fixed_version ))
121
+ if version_obj >= fixed_version_obj :
122
+ continue
123
+ affected = True
124
+ break
125
+ if affected :
126
+ yield advisory_data
127
+ else :
128
+ yield advisory_data
129
+ return
130
+
78
131
base_path = Path (self .vcs_response .dest_dir )
79
132
80
133
for file_path in base_path .rglob ("*.yml" ):
@@ -109,6 +162,135 @@ def clean_downloads(self):
109
162
def on_failure (self ):
110
163
self .clean_downloads ()
111
164
165
+ def _advisory_dict_to_advisory_data (self , advisory ):
166
+ return advisory_dict_to_advisory_data (
167
+ advisory = advisory ,
168
+ purl_type_by_gitlab_scheme = self .purl_type_by_gitlab_scheme ,
169
+ gitlab_scheme_by_purl_type = self .gitlab_scheme_by_purl_type ,
170
+ logger = self .log ,
171
+ purl = self .purl ,
172
+ )
173
+
174
+
175
+ def advisory_dict_to_advisory_data (
176
+ advisory : dict ,
177
+ purl_type_by_gitlab_scheme ,
178
+ gitlab_scheme_by_purl_type ,
179
+ logger ,
180
+ purl = None ,
181
+ advisory_url = None ,
182
+ ):
183
+ """
184
+ Convert a GitLab advisory dict to AdvisoryData.
185
+ """
186
+ aliases = advisory .get ("identifiers" , [])
187
+ identifier = advisory .get ("identifier" , "" )
188
+ summary = build_description (advisory .get ("title" ), advisory .get ("description" ))
189
+ urls = advisory .get ("urls" , [])
190
+ references = [Reference .from_url (u ) for u in urls ]
191
+
192
+ cwe_ids = advisory .get ("cwe_ids" ) or []
193
+ cwe_list = list (map (get_cwe_id , cwe_ids ))
194
+
195
+ date_published = dateparser .parse (advisory .get ("pubdate" ))
196
+ date_published = date_published .replace (tzinfo = pytz .UTC )
197
+
198
+ package_slug = advisory .get ("package_slug" )
199
+
200
+ # Determine purl if not provided
201
+ if not purl :
202
+ purl = get_purl (
203
+ package_slug = package_slug ,
204
+ purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
205
+ logger = logger ,
206
+ )
207
+
208
+ if not purl :
209
+ logger (
210
+ f"advisory_dict_to_advisory_data: purl is not valid: { package_slug !r} " ,
211
+ level = logging .ERROR ,
212
+ )
213
+ return AdvisoryData (
214
+ aliases = aliases ,
215
+ summary = summary ,
216
+ references = references ,
217
+ date_published = date_published ,
218
+ url = advisory_url ,
219
+ )
220
+
221
+ affected_version_range = None
222
+ fixed_versions = advisory .get ("fixed_versions" ) or []
223
+ affected_range = advisory .get ("affected_range" )
224
+ gitlab_native_schemes = set (["pypi" , "gem" , "npm" , "go" , "packagist" , "conan" ])
225
+ vrc : VersionRange = RANGE_CLASS_BY_SCHEMES [purl .type ]
226
+ gitlab_scheme = gitlab_scheme_by_purl_type [purl .type ]
227
+ try :
228
+ if affected_range :
229
+ if gitlab_scheme in gitlab_native_schemes :
230
+ affected_version_range = from_gitlab_native (
231
+ gitlab_scheme = gitlab_scheme , string = affected_range
232
+ )
233
+ else :
234
+ affected_version_range = vrc .from_native (affected_range )
235
+ except Exception as e :
236
+ logger (
237
+ f"advisory_dict_to_advisory_data: affected_range is not parsable: { affected_range !r} for: { purl !s} error: { e !r} \n { traceback .format_exc ()} " ,
238
+ level = logging .ERROR ,
239
+ )
240
+
241
+ parsed_fixed_versions = []
242
+ for fixed_version in fixed_versions :
243
+ try :
244
+ fixed_version = vrc .version_class (fixed_version )
245
+ parsed_fixed_versions .append (fixed_version )
246
+ except Exception as e :
247
+ logger (
248
+ f"advisory_dict_to_advisory_data: fixed_version is not parsable`: { fixed_version !r} error: { e !r} \n { traceback .format_exc ()} " ,
249
+ level = logging .ERROR ,
250
+ )
251
+
252
+ purl_without_version = get_purl (
253
+ package_slug = package_slug ,
254
+ purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
255
+ logger = logger ,
256
+ )
257
+
258
+ if parsed_fixed_versions :
259
+ affected_packages = list (
260
+ extract_affected_packages (
261
+ affected_version_range = affected_version_range ,
262
+ fixed_versions = parsed_fixed_versions ,
263
+ purl = purl_without_version ,
264
+ )
265
+ )
266
+ else :
267
+ if not affected_version_range :
268
+ affected_packages = []
269
+ else :
270
+ affected_packages = [
271
+ AffectedPackage (
272
+ package = purl_without_version ,
273
+ affected_version_range = affected_version_range ,
274
+ )
275
+ ]
276
+
277
+ # Determine advisory_url if not provided
278
+ if not advisory_url and package_slug and identifier :
279
+ advisory_url = urljoin (
280
+ "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/" ,
281
+ package_slug + "/" + identifier + ".yml" ,
282
+ )
283
+
284
+ return AdvisoryData (
285
+ aliases = aliases ,
286
+ summary = summary ,
287
+ references = references ,
288
+ date_published = date_published ,
289
+ affected_packages = affected_packages ,
290
+ weaknesses = cwe_list ,
291
+ url = advisory_url ,
292
+ )
293
+
112
294
113
295
def parse_advisory_path (base_path : Path , file_path : Path ) -> Tuple [str , str , str ]:
114
296
"""
@@ -219,94 +401,16 @@ def parse_gitlab_advisory(
219
401
)
220
402
return
221
403
222
- # refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json
223
- aliases = gitlab_advisory .get ("identifiers" )
224
- summary = build_description (gitlab_advisory .get ("title" ), gitlab_advisory .get ("description" ))
225
- urls = gitlab_advisory .get ("urls" )
226
- references = [Reference .from_url (u ) for u in urls ]
227
-
228
- cwe_ids = gitlab_advisory .get ("cwe_ids" ) or []
229
- cwe_list = list (map (get_cwe_id , cwe_ids ))
230
-
231
- date_published = dateparser .parse (gitlab_advisory .get ("pubdate" ))
232
- date_published = date_published .replace (tzinfo = pytz .UTC )
233
- package_slug = gitlab_advisory .get ("package_slug" )
234
404
advisory_url = get_advisory_url (
235
405
file = file ,
236
406
base_path = base_path ,
237
407
url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/" ,
238
408
)
239
- purl : PackageURL = get_purl (
240
- package_slug = package_slug ,
409
+
410
+ return advisory_dict_to_advisory_data (
411
+ advisory = gitlab_advisory ,
241
412
purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
413
+ gitlab_scheme_by_purl_type = gitlab_scheme_by_purl_type ,
242
414
logger = logger ,
243
- )
244
- if not purl :
245
- logger (
246
- f"parse_yaml_file: purl is not valid: { file !r} { package_slug !r} " , level = logging .ERROR
247
- )
248
- return AdvisoryData (
249
- aliases = aliases ,
250
- summary = summary ,
251
- references = references ,
252
- date_published = date_published ,
253
- url = advisory_url ,
254
- )
255
- affected_version_range = None
256
- fixed_versions = gitlab_advisory .get ("fixed_versions" ) or []
257
- affected_range = gitlab_advisory .get ("affected_range" )
258
- gitlab_native_schemes = set (["pypi" , "gem" , "npm" , "go" , "packagist" , "conan" ])
259
- vrc : VersionRange = RANGE_CLASS_BY_SCHEMES [purl .type ]
260
- gitlab_scheme = gitlab_scheme_by_purl_type [purl .type ]
261
- try :
262
- if affected_range :
263
- if gitlab_scheme in gitlab_native_schemes :
264
- affected_version_range = from_gitlab_native (
265
- gitlab_scheme = gitlab_scheme , string = affected_range
266
- )
267
- else :
268
- affected_version_range = vrc .from_native (affected_range )
269
- except Exception as e :
270
- logger (
271
- f"parse_yaml_file: affected_range is not parsable: { affected_range !r} for: { purl !s} error: { e !r} \n { traceback .format_exc ()} " ,
272
- level = logging .ERROR ,
273
- )
274
-
275
- parsed_fixed_versions = []
276
- for fixed_version in fixed_versions :
277
- try :
278
- fixed_version = vrc .version_class (fixed_version )
279
- parsed_fixed_versions .append (fixed_version )
280
- except Exception as e :
281
- logger (
282
- f"parse_yaml_file: fixed_version is not parsable`: { fixed_version !r} error: { e !r} \n { traceback .format_exc ()} " ,
283
- level = logging .ERROR ,
284
- )
285
-
286
- if parsed_fixed_versions :
287
- affected_packages = list (
288
- extract_affected_packages (
289
- affected_version_range = affected_version_range ,
290
- fixed_versions = parsed_fixed_versions ,
291
- purl = purl ,
292
- )
293
- )
294
- else :
295
- if not affected_version_range :
296
- affected_packages = []
297
- else :
298
- affected_packages = [
299
- AffectedPackage (
300
- package = purl ,
301
- affected_version_range = affected_version_range ,
302
- )
303
- ]
304
- return AdvisoryData (
305
- aliases = aliases ,
306
- summary = summary ,
307
- references = references ,
308
- date_published = date_published ,
309
- affected_packages = affected_packages ,
310
- weaknesses = cwe_list ,
311
- url = advisory_url ,
415
+ advisory_url = advisory_url ,
312
416
)
0 commit comments