|
1 | 1 | from time import sleep |
2 | 2 |
|
| 3 | +from ddtrace.appsec._iast._iast_env import _get_iast_env |
3 | 4 | from ddtrace.appsec._iast._iast_request_context import get_iast_reporter |
| 5 | +from ddtrace.appsec._iast._iast_request_context_base import is_iast_request_enabled |
4 | 6 | from ddtrace.appsec._iast._taint_tracking._context import clear_all_request_context_slots |
5 | 7 | from ddtrace.appsec._iast._taint_tracking._context import debug_context_array_free_slots_number |
6 | 8 | from ddtrace.appsec._iast._taint_tracking._context import debug_context_array_size |
7 | 9 | from ddtrace.appsec._iast._taint_tracking._context import finish_request_context |
8 | 10 | from ddtrace.appsec._iast._taint_tracking._context import start_request_context |
9 | 11 | from ddtrace.appsec._iast.sampling.vulnerability_detection import reset_request_vulnerabilities |
| 12 | +from ddtrace.appsec._iast.taint_sinks.weak_hash import WeakHash |
10 | 13 | from ddtrace.settings.asm import config as asm_config |
11 | 14 |
|
12 | 15 |
|
@@ -46,30 +49,83 @@ def function_with_vulnerabilities_1(tracer): |
46 | 49 | def test_oce_max_vulnerabilities_per_request(iast_context_deduplication_enabled): |
47 | 50 | import hashlib |
48 | 51 |
|
| 52 | + # Reset deduplication cache to ensure clean state |
| 53 | + WeakHash._prepare_report._reset_cache() |
| 54 | + |
| 55 | + # Verify IAST context is enabled |
| 56 | + assert is_iast_request_enabled(), "IAST request context should be enabled" |
| 57 | + |
49 | 58 | m = hashlib.md5() |
50 | 59 | m.update(b"Nobody inspects") |
51 | | - m.digest() |
52 | | - m.digest() |
53 | | - m.digest() |
54 | | - m.digest() |
| 60 | + # Each digest() call must be on a different line to avoid deduplication |
| 61 | + result1 = m.digest() # vulnerability 1 |
| 62 | + result2 = m.digest() # vulnerability 2 |
| 63 | + result3 = m.digest() # This should not be reported (exceeds max) |
| 64 | + result4 = m.digest() # This should not be reported (exceeds max) |
| 65 | + |
| 66 | + # Ensure all digest calls completed |
| 67 | + assert result1 is not None and result2 is not None and result3 is not None and result4 is not None |
| 68 | + |
55 | 69 | span_report = get_iast_reporter() |
| 70 | + if span_report is None: |
| 71 | + # Debug: check if any vulnerabilities were attempted |
| 72 | + env = _get_iast_env() |
| 73 | + if env: |
| 74 | + print( |
| 75 | + f"DEBUG: vulnerability_budget={env.vulnerability_budget}, " |
| 76 | + f"vulnerabilities_request_limit={env.vulnerabilities_request_limit}" |
| 77 | + ) |
| 78 | + assert False, ( |
| 79 | + f"IAST reporter should be initialized after vulnerability detection. " |
| 80 | + f"IAST enabled: {is_iast_request_enabled()}, env: {env is not None}" |
| 81 | + ) |
56 | 82 |
|
57 | 83 | assert len(span_report.vulnerabilities) == asm_config._iast_max_vulnerabilities_per_requests |
58 | 84 |
|
59 | 85 |
|
60 | 86 | def test_oce_reset_vulnerabilities_report(iast_context_deduplication_enabled): |
61 | 87 | import hashlib |
62 | 88 |
|
| 89 | + # Reset deduplication cache to ensure clean state |
| 90 | + WeakHash._prepare_report._reset_cache() |
| 91 | + |
| 92 | + # Verify IAST context is enabled |
| 93 | + assert is_iast_request_enabled(), "IAST request context should be enabled" |
| 94 | + |
63 | 95 | m = hashlib.md5() |
64 | 96 | m.update(b"Nobody inspects") |
65 | | - m.digest() |
66 | | - m.digest() |
67 | | - m.digest() |
68 | | - reset_request_vulnerabilities() |
69 | | - m.digest() |
| 97 | + # Each digest() call must be on a different line to avoid deduplication |
| 98 | + result1 = m.digest() # vulnerability 1 |
| 99 | + result2 = m.digest() # vulnerability 2 |
| 100 | + result3 = m.digest() # This should not be reported (exceeds max) |
| 101 | + |
| 102 | + # Ensure all digest calls completed |
| 103 | + assert result1 is not None and result2 is not None and result3 is not None |
70 | 104 |
|
| 105 | + # Ensure reporter exists before reset |
71 | 106 | span_report = get_iast_reporter() |
| 107 | + if span_report is None: |
| 108 | + # Debug: check if any vulnerabilities were attempted |
| 109 | + env = _get_iast_env() |
| 110 | + if env: |
| 111 | + print( |
| 112 | + f"DEBUG: vulnerability_budget={env.vulnerability_budget}, " |
| 113 | + f"vulnerabilities_request_limit={env.vulnerabilities_request_limit}" |
| 114 | + ) |
| 115 | + assert ( |
| 116 | + False |
| 117 | + ), f"IAST reporter should exist before reset. IAST enabled: {is_iast_request_enabled()}, env: {env is not None}" |
| 118 | + |
| 119 | + initial_count = len(span_report.vulnerabilities) |
| 120 | + assert initial_count == asm_config._iast_max_vulnerabilities_per_requests |
72 | 121 |
|
| 122 | + reset_request_vulnerabilities() |
| 123 | + result4 = m.digest() # vulnerability 3 (after reset) |
| 124 | + assert result4 is not None |
| 125 | + |
| 126 | + span_report = get_iast_reporter() |
| 127 | + assert span_report is not None, "IAST reporter should still exist after reset" |
| 128 | + # After reset, we should have the original 2 vulnerabilities + 1 new one = 3 total |
73 | 129 | assert len(span_report.vulnerabilities) == asm_config._iast_max_vulnerabilities_per_requests + 1 |
74 | 130 |
|
75 | 131 |
|
|
0 commit comments