|
| 1 | +""" |
| 2 | +Example written by Aaron Weaver <[email protected]> |
| 3 | +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects |
| 4 | +
|
| 5 | +Description: CI/CD example for DefectDojo |
| 6 | +""" |
| 7 | +from defectdojo_api import defectdojo |
| 8 | +from datetime import datetime, timedelta |
| 9 | +import os |
| 10 | +import argparse |
| 11 | + |
| 12 | +# Setup DefectDojo connection information |
| 13 | +host = 'http://localhost:8000' |
| 14 | +api_key = os.environ['DOJO_API_KEY'] |
| 15 | +user = 'admin' |
| 16 | + |
| 17 | +#Optionally, specify a proxy |
| 18 | +proxies = { |
| 19 | + 'http': 'http://localhost:8080', |
| 20 | + 'https': 'http://localhost:8080', |
| 21 | +} |
| 22 | +""" |
| 23 | +proxies=proxies |
| 24 | +""" |
| 25 | + |
| 26 | +def sum_severity(findings): |
| 27 | + severity = [0,0,0,0,0] |
| 28 | + for finding in findings.data["objects"]: |
| 29 | + if finding["severity"] == "Critical": |
| 30 | + severity[4] = severity[4] + 1 |
| 31 | + if finding["severity"] == "High": |
| 32 | + severity[3] = severity[3] + 1 |
| 33 | + if finding["severity"] == "Medium": |
| 34 | + severity[2] = severity[2] + 1 |
| 35 | + if finding["severity"] == "Info": |
| 36 | + severity[1] = severity[1] + 1 |
| 37 | + |
| 38 | + return severity |
| 39 | + |
| 40 | +def print_findings(findings): |
| 41 | + print "Critical: " + str(findings[4]) |
| 42 | + print "High: " + str(findings[3]) |
| 43 | + print "Medium: " + str(findings[2]) |
| 44 | + print "Low: " + str(findings[1]) |
| 45 | + print "Info: " + str(findings[0]) |
| 46 | + |
| 47 | +def create_findings(product_id, user_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0): |
| 48 | + # Instantiate the DefectDojo api wrapper |
| 49 | + dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, timeout=90, debug=False) |
| 50 | + |
| 51 | + # Workflow as follows: |
| 52 | + # 1. Scan tool is run against build |
| 53 | + # 2. Reports is saved from scan tool |
| 54 | + # 3. Call this script to load scan data, specifying scanner type |
| 55 | + # 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities |
| 56 | + |
| 57 | + #Specify the product id |
| 58 | + product_id = product_id |
| 59 | + engagement_id = None |
| 60 | + user_id = 1 |
| 61 | + |
| 62 | + # Check for a CI/CD engagement_id |
| 63 | + engagements = dd.list_engagements(product_in=product_id, status="In Progress") |
| 64 | + if engagements.success: |
| 65 | + for engagement in engagements.data["objects"]: |
| 66 | + if "Recurring CI/CD Integration" == engagement['name']: |
| 67 | + engagement_id = engagement['id'] |
| 68 | + |
| 69 | + # Engagement doesn't exist, create it |
| 70 | + if engagement_id == None: |
| 71 | + start_date = datetime.now() |
| 72 | + end_date = start_date+timedelta(days=180) |
| 73 | + |
| 74 | + engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, user_id, |
| 75 | + "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) |
| 76 | + |
| 77 | + # Upload the scanner export |
| 78 | + dir_path = os.path.dirname(os.path.realpath(__file__)) |
| 79 | + |
| 80 | + print "Uploading scanner data." |
| 81 | + date = datetime.now() |
| 82 | + upload_scan = dd.upload_scan(engagement_id, scanner, dir_path + file, "true", date.strftime("%Y/%m/%d"), "API") |
| 83 | + |
| 84 | + if upload_scan.success: |
| 85 | + test_id = upload_scan.id() |
| 86 | + else: |
| 87 | + print upload_scan.message |
| 88 | + |
| 89 | + findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true") |
| 90 | + print"==============================================" |
| 91 | + print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"]) |
| 92 | + print"==============================================" |
| 93 | + print_findings(sum_severity(findings)) |
| 94 | + print |
| 95 | + findings = dd.list_findings(test_id_in=test_id, duplicate="true") |
| 96 | + print"==============================================" |
| 97 | + print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"]) |
| 98 | + print"==============================================" |
| 99 | + print_findings(sum_severity(findings)) |
| 100 | + print |
| 101 | + findings = dd.list_findings(test_id_in=test_id, duplicate="false") |
| 102 | + print"==============================================" |
| 103 | + print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"]) |
| 104 | + print"==============================================" |
| 105 | + sum_new_findings = sum_severity(findings) |
| 106 | + print_findings(sum_new_findings) |
| 107 | + print |
| 108 | + print"==============================================" |
| 109 | + |
| 110 | + if sum_new_findings[4] > max_critical: |
| 111 | + print "Build Failed: Max Critical" |
| 112 | + elif sum_new_findings[3] > max_high: |
| 113 | + print "Build Failed: Max High" |
| 114 | + elif sum_new_findings[2] > max_medium: |
| 115 | + print "Build Failed: Max Medium" |
| 116 | + else: |
| 117 | + print "Build Passed!" |
| 118 | + print"==============================================" |
| 119 | + |
| 120 | +class Main: |
| 121 | + if __name__ == "__main__": |
| 122 | + parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo') |
| 123 | + parser.add_argument('--user', help="Dojo Product ID", required=True) |
| 124 | + parser.add_argument('--product', help="Dojo Product ID", required=True) |
| 125 | + parser.add_argument('--file', help="Scanner file", required=True) |
| 126 | + parser.add_argument('--scanner', help="Type of scanner", required=True) |
| 127 | + parser.add_argument('--engagement', help="Engagement ID (optional)", required=False) |
| 128 | + parser.add_argument('--critical', default=0, help="Maximum new critical vulns to pass the build.", required=False) |
| 129 | + parser.add_argument('--high', default=0, help="Maximum new high vulns to pass the build.", required=False) |
| 130 | + parser.add_argument('--medium', default=0, help="Maximum new medium vulns to pass the build.", required=False) |
| 131 | + |
| 132 | + #Parse out arguments |
| 133 | + args = vars(parser.parse_args()) |
| 134 | + user_id = args["user"] |
| 135 | + product_id = args["product"] |
| 136 | + file = args["file"] |
| 137 | + scanner = args["scanner"] |
| 138 | + engagement_id = args["engagement"] |
| 139 | + max_critical = args["critical"] |
| 140 | + max_high = args["high"] |
| 141 | + max_medium = args["medium"] |
| 142 | + |
| 143 | + create_findings(product_id, user_id, file, scanner, engagement_id, max_critical, max_high, max_medium) |
0 commit comments