Skip to content

Commit 2e14055

Browse files
committed
zoomeye
Signed-off-by: pranjalg1331 <[email protected]>
1 parent d4ddeb4 commit 2e14055

19 files changed

+550
-45
lines changed

tests/api_app/analyzers_manager/unit_tests/observable_analyzers/base_test_class.py

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -157,48 +157,3 @@ def test_analyzer_on_supported_observables(self):
157157
self.fail(
158158
f"Unexpected exception for {observable_type}: {type(e).__name__}: {str(e)}"
159159
)
160-
161-
def test_analyzer_error_handling(self):
162-
if self.analyzer_class is None:
163-
self.skipTest("analyzer_class is not set")
164-
165-
configs = AnalyzerConfig.objects.filter(
166-
python_module=self.analyzer_class.python_module
167-
)
168-
169-
if not configs.exists():
170-
self.skipTest(
171-
f"No AnalyzerConfig found for {self.analyzer_class.python_module}"
172-
)
173-
174-
config = configs.first()
175-
176-
invalid_observables = {
177-
"domain": "invalid..domain",
178-
"ip": "999.999.999.999",
179-
"url": "not-a-url",
180-
"hash": "tooshort",
181-
}
182-
183-
for observable_type in config.observable_supported:
184-
if observable_type in invalid_observables:
185-
with self.subTest(observable_type=f"{observable_type}_invalid"):
186-
patches = self.get_mocked_response()
187-
with self._apply_patches(patches):
188-
invalid_value = invalid_observables[observable_type]
189-
analyzer = self._setup_analyzer(
190-
config, observable_type, invalid_value
191-
)
192-
193-
try:
194-
response = analyzer.run()
195-
logger.warning(
196-
"Analyzer ran with invalid input: %s = %s",
197-
observable_type,
198-
invalid_value,
199-
)
200-
self.assertTrue(response)
201-
except (AnalyzerRunException, ValueError) as e:
202-
logger.info(
203-
"Expected failure for %s: %s", observable_type, e
204-
)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from unittest.mock import mock_open, patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.tor import Tor
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class TorTestCase(BaseAnalyzerTest):
11+
analyzer_class = Tor
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
tor_db_content = "93.95.230.253\n1.2.3.4\n8.8.8.8\n"
16+
file_mock = mock_open(read_data=tor_db_content)
17+
18+
return [
19+
patch(
20+
"requests.get",
21+
return_value=MockUpResponse(
22+
{},
23+
200,
24+
content=b"""ExitNode D2A4BEE6754A9711EB0FAC47F3059BE6FC0D72C7
25+
Published 2022-08-17 18:11:11
26+
LastStatus 2022-08-18 14:00:00
27+
ExitAddress 93.95.230.253 2022-08-18 14:44:33""",
28+
),
29+
),
30+
patch("builtins.open", file_mock),
31+
patch("os.path.exists", return_value=True),
32+
patch("os.path.isfile", return_value=True),
33+
]
34+
35+
@classmethod
36+
def get_extra_config(cls) -> dict:
37+
return {}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from unittest.mock import mock_open, patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.tor_nodes_danmeuk import (
4+
TorNodesDanMeUK,
5+
)
6+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
7+
BaseAnalyzerTest,
8+
)
9+
10+
11+
class TorNodesDanMeUKTestCase(BaseAnalyzerTest):
12+
analyzer_class = TorNodesDanMeUK
13+
14+
@staticmethod
15+
def get_mocked_response():
16+
mock_db_content = (
17+
"100.10.37.131\n" "100.14.156.183\n" "8.8.8.8\n" "100.16.153.149\n"
18+
)
19+
20+
return [
21+
patch(
22+
"api_app.analyzers_manager.observable_analyzers.tor_nodes_danmeuk.os.path.isfile",
23+
return_value=True,
24+
),
25+
patch(
26+
"api_app.analyzers_manager.observable_analyzers.tor_nodes_danmeuk.os.path.exists",
27+
return_value=True,
28+
),
29+
patch("builtins.open", mock_open(read_data=mock_db_content)),
30+
]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.tranco import Tranco
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class TrancoTestCase(BaseAnalyzerTest):
11+
analyzer_class = Tranco
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
mock_data = {"rank": 12345, "domain": "example.com", "date": "2025-07-18"}
16+
17+
return patch("requests.get", return_value=MockUpResponse(mock_data, 200))
18+
19+
@classmethod
20+
def get_extra_config(cls) -> dict:
21+
return {}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import json
2+
from unittest.mock import mock_open, patch
3+
4+
from api_app.analyzers_manager.observable_analyzers.tweetfeeds import TweetFeeds
5+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
6+
BaseAnalyzerTest,
7+
)
8+
from tests.mock_utils import MockUpResponse
9+
10+
11+
class TweetFeedsTestCase(BaseAnalyzerTest):
12+
analyzer_class = TweetFeeds
13+
14+
@staticmethod
15+
def get_mocked_response():
16+
mock_data = [
17+
{
18+
"date": "2024-03-19 00:31:36",
19+
"user": "Metemcyber",
20+
"type": "url",
21+
"value": "http://210.56.49.214",
22+
"tags": ["#phishing"],
23+
"tweet": "https://twitter.com/Metemcyber/status/1769884392477077774",
24+
}
25+
]
26+
27+
return [
28+
patch("builtins.open", mock_open(read_data=json.dumps(mock_data))),
29+
patch("os.path.exists", return_value=True),
30+
patch("requests.get", return_value=MockUpResponse(mock_data, 200)),
31+
]
32+
33+
@classmethod
34+
def get_extra_config(cls) -> dict:
35+
return {"filter1": "Metemcyber", "time": "month"}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.urlhaus import URLHaus
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class URLHausTestCase(BaseAnalyzerTest):
11+
analyzer_class = URLHaus
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
return patch(
16+
"requests.post",
17+
return_value=MockUpResponse({"query_status": "ok", "data": []}, 200),
18+
)
19+
20+
@classmethod
21+
def get_extra_config(cls):
22+
return {"_api_key_name": "dummy_key"} # Used in authentication_header
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.urldna import UrlDNA
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class UrlDNATestCase(BaseAnalyzerTest):
11+
analyzer_class = UrlDNA
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
post_scan_response = {"id": "scan123"}
16+
get_result_response = {"scan": {"status": "DONE"}, "result": {"score": "clean"}}
17+
search_response = {"results": [{"domain": "example.com"}]}
18+
19+
return [
20+
patch(
21+
"requests.Session.post",
22+
side_effect=[
23+
MockUpResponse(search_response, 200), # For SEARCH
24+
MockUpResponse(
25+
post_scan_response, 200
26+
), # For NEW_SCAN - scan request
27+
],
28+
),
29+
patch(
30+
"requests.Session.get",
31+
return_value=MockUpResponse(get_result_response, 200),
32+
),
33+
]
34+
35+
@classmethod
36+
def get_extra_config(cls):
37+
return {
38+
"urldna_analysis": "SEARCH", # change to "NEW_SCAN" if testing that mode
39+
"_api_key_name": "Bearer dummykey",
40+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.urlscan import UrlScan
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class UrlScanTestCase(BaseAnalyzerTest):
11+
analyzer_class = UrlScan
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
return [
16+
patch(
17+
"requests.Session.post",
18+
return_value=MockUpResponse(
19+
{"api": "https://urlscan.io/result/abc"}, 200
20+
),
21+
),
22+
patch(
23+
"requests.Session.get",
24+
return_value=MockUpResponse({"results": [{"task": "completed"}]}, 200),
25+
),
26+
]
27+
28+
@classmethod
29+
def get_extra_config(cls):
30+
return {
31+
"urlscan_analysis": "search", # or "submit_result"
32+
"visibility": "public",
33+
"search_size": 10,
34+
"_api_key_name": "dummy_api_key",
35+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.validin import Validin
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class ValidinTestCase(BaseAnalyzerTest):
11+
analyzer_class = Validin
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
response = {
16+
"key": "191.121.10.0",
17+
"effective_opts": {"type": "ip4", "limit": 100, "wildcard": False},
18+
"status": "finished",
19+
"query_key": "191.121.10.0",
20+
"records": {},
21+
"records_returned": 0,
22+
"limited": False,
23+
"error": None,
24+
}
25+
return patch("requests.get", return_value=MockUpResponse(response, 200))
26+
27+
@classmethod
28+
def get_extra_config(cls):
29+
return {
30+
"scan_choice": "default", # or a specific key like "a_records" if needed
31+
"_api_key_name": "dummy_validin_token",
32+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.virushee import VirusheeCheckHash
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class VirusheeCheckHashTestCase(BaseAnalyzerTest):
11+
analyzer_class = VirusheeCheckHash
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
return patch(
16+
"requests.Session.get", return_value=MockUpResponse({"success": True}, 200)
17+
)
18+
19+
@classmethod
20+
def get_extra_config(cls):
21+
return {"_api_key_name": "dummy_api_key"}

0 commit comments

Comments
 (0)