Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions tests/test_identity_map_client_unit_tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import base64
import json
import unittest
import datetime as dt
from unittest.mock import patch, MagicMock

from uid2_client import IdentityMapClient, get_datetime_utc_iso_format
from uid2_client import IdentityMapClient, get_datetime_utc_iso_format, Uid2Response, Envelope


class IdentityMapUnitTests(unittest.TestCase):
Expand Down Expand Up @@ -34,23 +33,22 @@ def test_get_datetime_utc_iso_format_timestamp(self):
iso_format_timestamp = get_datetime_utc_iso_format(timestamp)
self.assertEqual(expected_timestamp, iso_format_timestamp)

@patch('uid2_client.identity_map_client.make_v2_request')
@patch('uid2_client.identity_map_client.post')
@patch('uid2_client.identity_map_client.parse_v2_response')
def test_identity_buckets_request(self, mock_parse_v2_response, mock_post, mock_make_v2_request):
@patch('uid2_client.identity_map_client.create_envelope')
@patch('uid2_client.identity_map_client.make_request')
@patch('uid2_client.identity_map_client.parse_response')
def test_identity_buckets_request(self, mock_parse_response, mock_make_request, mock_create_envelope):
expected_req = b'{"since_timestamp": "2024-07-02T14:30:15.123456"}'
test_cases = ["2024-07-02T14:30:15.123456+00:00", "2024-07-02 09:30:15.123456-05:00",
"2024-07-02T08:30:15.123456-06:00", "2024-07-02T10:30:15.123456-04:00",
"2024-07-02T06:30:15.123456-08:00", "2024-07-02T23:30:15.123456+09:00",
"2024-07-03T00:30:15.123456+10:00", "2024-07-02T20:00:15.123456+05:30"]
mock_req = b'mocked_request_data'
mock_nonce = 'mocked_nonce'
mock_make_v2_request.return_value = (mock_req, mock_nonce)
mock_response = MagicMock()
mock_response.read.return_value = b'{"mocked": "response"}'
mock_post.return_value = mock_response
mock_parse_v2_response.return_value = b'{"body":[],"status":"success"}'
mock_create_envelope.return_value = Envelope(mock_req, mock_nonce)
mock_response = '{"mocked": "response"}'
mock_make_request.return_value = Uid2Response.from_string(mock_response)
mock_parse_response.return_value = b'{"body":[],"status":"success"}'
for timestamp in test_cases:
self.identity_map_client.get_identity_buckets(dt.datetime.fromisoformat(timestamp))
called_args, called_kwargs = mock_make_v2_request.call_args
called_args, called_kwargs = mock_create_envelope.call_args
self.assertEqual(expected_req, called_args[2])
235 changes: 235 additions & 0 deletions tests/test_identity_map_v3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import os
import unittest

from datetime import datetime, timedelta, timezone
from urllib.error import URLError, HTTPError

from uid2_client import IdentityMapV3Client, IdentityMapV3Input, IdentityMapV3Response, normalize_and_hash_email, normalize_and_hash_phone
from uid2_client.unmapped_identity_reason import UnmappedIdentityReason


@unittest.skipIf(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good idea, we should break if those are missing instead. Otherwise we may have those tests skipped permanently in the pipeline without anyone noticing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are integration tests, are they running in the pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't look like they've been running in the pipeline

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, what prevents them from running in the pipeline? (if not this skipIf ) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed all the @unittest.skipIf(), now the tests require the env variables to set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, what prevents them from running in the pipeline? (if not this skipIf ) ?

Ah its because we've set the 'vulnerability_scan_only' param to true in our workflow so it will only run the vulnerability scan. I think if we want to turn this off and run the tests in the pipeline, we'll need to keep the unittest.skipIf or set the env variables in the workflow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to turn off vulnerability_scan_only ? (and keep skipIf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried it but it didn't work because the shared testing pipeline only works for java tests. I'm thinking to keep the vulnerability_scan_only on but then write a custom testing job in this repo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right I found UID2-3648 for that so we can leave it for now (skipIf should remain though)

os.getenv("UID2_BASE_URL") == None
or os.getenv("UID2_API_KEY") == None
or os.getenv("UID2_SECRET_KEY") == None,
reason="Environment variables UID2_BASE_URL, UID2_API_KEY, and UID2_SECRET_KEY must be set",
)
class IdentityMapV3IntegrationTests(unittest.TestCase):
UID2_BASE_URL = None
UID2_API_KEY = None
UID2_SECRET_KEY = None

identity_map_client = None

@classmethod
def setUpClass(cls):
cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL")
cls.UID2_API_KEY = os.getenv("UID2_API_KEY")
cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY")

if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY:
cls.identity_map_client = IdentityMapV3Client(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
else:
raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first")

def test_identity_map_emails(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]", "[email protected]"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_mapped(response, "[email protected]")
self.assert_mapped(response, "[email protected]")

self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "[email protected]")

def test_identity_map_nothing_unmapped(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_mapped(response, "[email protected]")
self.assert_mapped(response, "[email protected]")

def test_identity_map_nothing_mapped(self):
identity_map_input = IdentityMapV3Input.from_emails(["[email protected]"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "[email protected]")

def test_identity_map_invalid_email(self):
self.assertRaises(ValueError, IdentityMapV3Input.from_emails,
["[email protected]", "this is not an email"])

def test_identity_map_invalid_phone(self):
self.assertRaises(ValueError, IdentityMapV3Input.from_phones,
["+12345678901", "this is not a phone number"])

def test_identity_map_invalid_hashed_email(self):
identity_map_input = IdentityMapV3Input.from_hashed_emails(["this is not a hashed email"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_unmapped(response, UnmappedIdentityReason.INVALID_IDENTIFIER, "this is not a hashed email")

def test_identity_map_invalid_hashed_phone(self):
identity_map_input = IdentityMapV3Input.from_hashed_phones(["this is not a hashed phone"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_unmapped(response, UnmappedIdentityReason.INVALID_IDENTIFIER, "this is not a hashed phone")

def test_identity_map_hashed_emails(self):
hashed_email1 = normalize_and_hash_email("[email protected]")
hashed_email2 = normalize_and_hash_email("[email protected]")
hashed_opted_out_email = normalize_and_hash_email("[email protected]")
identity_map_input = IdentityMapV3Input.from_hashed_emails([hashed_email1, hashed_email2, hashed_opted_out_email])

response = self.identity_map_client.generate_identity_map(identity_map_input)

self.assert_mapped(response, hashed_email1)
self.assert_mapped(response, hashed_email2)

self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_email)

def test_identity_map_duplicate_emails(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]", "[email protected]", "[email protected]",
"[email protected]"])
response = self.identity_map_client.generate_identity_map(identity_map_input)

mapped_identities = response.mapped_identities
self.assertEqual(4, len(mapped_identities))

raw_uid = mapped_identities.get("[email protected]").current_raw_uid
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").current_raw_uid)
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").current_raw_uid)
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").current_raw_uid)

def test_identity_map_duplicate_hashed_emails(self):
hashed_email = normalize_and_hash_email("[email protected]")
duplicate_hashed_email = hashed_email
hashed_opted_out_email = normalize_and_hash_email("[email protected]")
duplicate_hashed_opted_out_email = hashed_opted_out_email

identity_map_input = IdentityMapV3Input.from_hashed_emails(
[hashed_email, duplicate_hashed_email, hashed_opted_out_email, duplicate_hashed_opted_out_email])
response = self.identity_map_client.generate_identity_map(identity_map_input)

self.assert_mapped(response, hashed_email)
self.assert_mapped(response, duplicate_hashed_email)

self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_email)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, duplicate_hashed_opted_out_email)

def test_identity_map_empty_input(self):
identity_map_input = IdentityMapV3Input.from_emails([])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assertTrue(len(response.mapped_identities) == 0)
self.assertTrue(len(response.unmapped_identities) == 0)

def test_identity_map_phones(self):
identity_map_input = IdentityMapV3Input.from_phones(["+12345678901", "+98765432109", "+00000000000"])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_mapped(response, "+12345678901")
self.assert_mapped(response, "+98765432109")

self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "+00000000000")

def test_identity_map_hashed_phones(self):
hashed_phone1 = normalize_and_hash_phone("+12345678901")
hashed_phone2 = normalize_and_hash_phone("+98765432109")
hashed_opted_out_phone = normalize_and_hash_phone("+00000000000")
identity_map_input = IdentityMapV3Input.from_hashed_phones([hashed_phone1, hashed_phone2, hashed_opted_out_phone])
response = self.identity_map_client.generate_identity_map(identity_map_input)
self.assert_mapped(response, hashed_phone1)
self.assert_mapped(response, hashed_phone2)

self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_phone)

def test_identity_map_all_identity_types_in_one_request(self):
mapped_email = "[email protected]"
optout_email = "[email protected]"
mapped_phone = "+12345678901"
optout_phone = "+00000000000"

mapped_email_hash = normalize_and_hash_email("[email protected]")
optout_email_hash = normalize_and_hash_email(optout_email)
mapped_phone_hash = normalize_and_hash_phone(mapped_phone)
optout_phone_hash = normalize_and_hash_phone(optout_phone)

identity_map_input = (IdentityMapV3Input.from_emails([mapped_email, optout_email])
.with_hashed_emails([mapped_email_hash, optout_email_hash])
.with_phones([mapped_phone, optout_phone])
.with_hashed_phones([mapped_phone_hash, optout_phone_hash]))

response = self.identity_map_client.generate_identity_map(identity_map_input)

# Test mapped identities
self.assert_mapped(response, mapped_email)
self.assert_mapped(response, mapped_email_hash)
self.assert_mapped(response, mapped_phone)
self.assert_mapped(response, mapped_phone_hash)

# Test unmapped identities
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email_hash)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone_hash)

def test_identity_map_all_identity_types_added_one_by_one(self):
mapped_email = "[email protected]"
optout_phone = "+00000000000"
mapped_phone_hash = normalize_and_hash_phone("+12345678901")
optout_email_hash = normalize_and_hash_email("[email protected]")

identity_map_input = IdentityMapV3Input()
identity_map_input.with_email(mapped_email)
identity_map_input.with_phone(optout_phone)
identity_map_input.with_hashed_phone(mapped_phone_hash)
identity_map_input.with_hashed_email(optout_email_hash)

response = self.identity_map_client.generate_identity_map(identity_map_input)

# Test mapped identities
self.assert_mapped(response, mapped_email)
self.assert_mapped(response, mapped_phone_hash)

# Test unmapped identities
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone)
self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email_hash)

def test_identity_map_client_bad_url(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]", "[email protected]"])
client = IdentityMapV3Client("https://operator-bad-url.uidapi.com", os.getenv("UID2_API_KEY"), os.getenv("UID2_SECRET_KEY"))
self.assertRaises(URLError, client.generate_identity_map, identity_map_input)

def test_identity_map_client_bad_api_key(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]", "[email protected]"])
client = IdentityMapV3Client(os.getenv("UID2_BASE_URL"), "bad-api-key", os.getenv("UID2_SECRET_KEY"))
self.assertRaises(HTTPError, client.generate_identity_map, identity_map_input)

def test_identity_map_client_bad_secret(self):
identity_map_input = IdentityMapV3Input.from_emails(
["[email protected]", "[email protected]", "[email protected]"])

client = IdentityMapV3Client(os.getenv("UID2_BASE_URL"), os.getenv("UID2_API_KEY"), "wJ0hP19QU4hmpB64Y3fV2dAed8t/mupw3sjN5jNRFzg=")
self.assertRaises(HTTPError, client.generate_identity_map, identity_map_input)

def assert_mapped(self, response: IdentityMapV3Response, dii):
mapped_identity = response.mapped_identities.get(dii)
self.assertIsNotNone(mapped_identity)
self.assertIsNotNone(mapped_identity.current_raw_uid)

# Refresh from should be now or in the future, allow some slack for time between request and this assertion
one_minute_ago = datetime.now(timezone.utc) - timedelta(seconds=60)
self.assertTrue(mapped_identity.refresh_from > one_minute_ago)

unmapped_identity = response.unmapped_identities.get(dii)
self.assertIsNone(unmapped_identity)

def assert_unmapped(self, response, reason, dii):
unmapped_identity = response.unmapped_identities.get(dii)
self.assertEqual(reason, unmapped_identity.reason)

mapped_identity = response.mapped_identities.get(dii)
self.assertIsNone(mapped_identity)



if __name__ == '__main__':
unittest.main()
Loading