Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/sample_generate_identity_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from uid2_client import IdentityMapClient, IdentityMapInput


# !! Note: This is for an older version of identity map. For the latest version, see sample_generate_identity_map_v3.py
# this sample client takes email addresses as input and generates an IdentityMapResponse object which contains raw uid
# or the reason why it is unmapped

Expand Down
42 changes: 42 additions & 0 deletions examples/sample_generate_identity_map_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import sys

from uid2_client import IdentityMapV3Client, IdentityMapV3Input

# !! Note: This is for the newest version of identity map. For the previous version, see sample_generate_identity_map.py
# this sample client takes email addresses as input and generates an IdentityMapV3Response object which contains raw uid
# or the reason why it is unmapped

def _usage():
print('Usage: python3 sample_generate_identity_map_v3.py <base_url> <api_key> <client_secret> <email_1> <email_2> ... <email_n>'
, file=sys.stderr)
sys.exit(1)


if len(sys.argv) <= 4:
_usage()

base_url = sys.argv[1]
api_key = sys.argv[2]
client_secret = sys.argv[3]
email_list = sys.argv[4:]
first_email = sys.argv[4]

client = IdentityMapV3Client(base_url, api_key, client_secret)

identity_map_response = client.generate_identity_map(IdentityMapV3Input.from_emails(email_list))

mapped_identities = identity_map_response.mapped_identities
unmapped_identities = identity_map_response.unmapped_identities

mapped_identity = mapped_identities.get(first_email)
if mapped_identity is not None:
current_uid = mapped_identity.current_raw_uid
previous_uid = mapped_identity.previous_raw_uid
refresh_from = mapped_identity.refresh_from
print('current_uid =', current_uid)
print('previous_uid =', str(previous_uid))
print('refresh_from =', str(refresh_from))
else:
unmapped_identity = unmapped_identities.get(first_email)
reason = unmapped_identity.reason
print('reason =', reason)
318 changes: 318 additions & 0 deletions tests/test_doc_sample_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
import unittest
import os
from datetime import datetime, timezone, timedelta
from unittest.mock import Mock, patch

# Import all the classes we'll be testing from the documentation samples
from uid2_client import (
Uid2PublisherClient, IdentityMapV3Client, IdentityMapV3Input, IdentityMapV3Response,
IdentityMapClient, IdentityMapInput, BidstreamClient, SharingClient,
TokenGenerateInput, IdentityTokens, UnmappedIdentityReason, EncryptionStatus
)

# !!!!! Do not refactor this code if you're not intending to change the SDK docs samples !!!!!

# Tests for sample code as used in https://unifiedid.com/docs/sdks/sdk-ref-python
# The tests are designed to have sections of almost exactly copy/pasted code samples so there are
# unused variables, unnecessary comments, redundant repetition... since those are used in docs for illustration.
# If a test breaks in this file, likely the change breaks one of the samples on the docs site


@unittest.skipIf(
os.getenv("UID2_BASE_URL") is None or
os.getenv("UID2_API_KEY") is None or
os.getenv("UID2_SECRET_KEY") is None,
"Environment variables UID2_BASE_URL, UID2_API_KEY, and UID2_SECRET_KEY must be set"
)
class TestDocSampleCode(unittest.TestCase):

# Test data constants
UID2_BASE_URL = os.getenv("UID2_BASE_URL", "")
UID2_API_KEY = os.getenv("UID2_API_KEY", "")
UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY", "")

# Test email addresses - these should be configured in your test environment
mapped_email = "[email protected]"
mapped_email2 = "[email protected]"
optout_email = "[email protected]"
mapped_phone = "+12345678901"
mapped_phone2 = "+12345678902"

def setUp(self):
# Setup clients used across multiple tests
self.identity_map_v3_client = IdentityMapV3Client(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
self.publisher_client = Uid2PublisherClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
self.identity_map_client = IdentityMapClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

def test_publisher_basic_usage_example(self):
# Documentation sdk-ref-python.md Line 142: Create an instance of Uid2PublisherClient
client = Uid2PublisherClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 147: Generate token from email
token_generate_response = client.generate_token(TokenGenerateInput.from_email("[email protected]").do_not_generate_tokens_for_opted_out())

self.assertIsNotNone(token_generate_response)

def test_publisher_client_server_integration_example(self):
"""Test Publisher client-server integration from documentation"""
client = Uid2PublisherClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
token_generate_response = client.generate_token(TokenGenerateInput.from_email("[email protected]").do_not_generate_tokens_for_opted_out())

# Documentation sdk-ref-python.md Line 165: Get identity JSON string
identity_json_string = token_generate_response.get_identity_json_string()

self.assertIsNotNone(identity_json_string)

def test_publisher_server_side_integration_example(self):
"""Test Publisher server-side integration from documentation"""
client = Uid2PublisherClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
token_generate_response = client.generate_token(TokenGenerateInput.from_email(self.mapped_email).do_not_generate_tokens_for_opted_out())

# Documentation sdk-ref-python.md Line 176: Store identity JSON string
identity_json_string = token_generate_response.get_identity_json_string()

# Documentation sdk-ref-python.md Line 182: Get identity and advertising token
identity = token_generate_response.get_identity()
if identity:
advertising_token = identity.get_advertising_token()
self.assertIsNotNone(advertising_token)

# Documentation sdk-ref-python.md Line 193: Create IdentityTokens from JSON string
identity = IdentityTokens.from_json_string(identity_json_string)

# Documentation sdk-ref-python.md Line 198: Check if identity can be refreshed
if not identity or not identity.is_refreshable():
pass

# Documentation sdk-ref-python.md Line 203: Check if refresh is needed
if identity and identity.is_due_for_refresh():
# Documentation sdk-ref-python.md Line 208: Refresh the token
token_refresh_response = client.refresh_token(identity)

# Documentation sdk-ref-python.md Line 212: Store new identity JSON string
new_identity_json_string = token_refresh_response.get_identity_json_string()
if new_identity_json_string is None:
# User has opted out - documentation sdk-ref-python.md Line 214
is_optout = token_refresh_response.is_optout()
self.assertTrue(is_optout)

def test_identity_map_v3_basic_usage_example(self):
"""Test IdentityMapV3Client basic usage from documentation sdk-ref-python.md Map DII to Raw UID2s section"""
# Documentation sdk-ref-python.md Line 226: Create IdentityMapV3Client
identity_map_v3_client = IdentityMapV3Client(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 231: Create IdentityMapV3Input with emails
input = IdentityMapV3Input.from_emails(["[email protected]", "[email protected]"])

# Documentation sdk-ref-python.md Line 245: Generate identity map
identity_map_response = identity_map_v3_client.generate_identity_map(input)

# Documentation sdk-ref-python.md Line 249: Get mapped and unmapped results
mapped_identities = identity_map_response.mapped_identities
unmapped_identities = identity_map_response.unmapped_identities

# Verify basic structure
self.assertIsNotNone(mapped_identities)
self.assertIsNotNone(unmapped_identities)
self.assertTrue(len(mapped_identities) + len(unmapped_identities) == 2)

def test_identity_map_v3_multi_identity_type_example(self):
"""Test IdentityMapV3Client with multiple identity types from documentation"""
# Documentation sdk-ref-python.md Line 235: Multi-identity type input
input = IdentityMapV3Input() \
.with_email("[email protected]") \
.with_phone("+12345678901") \
.with_hashed_email("pre_hashed_email") \
.with_hashed_phone("pre_hashed_phone")

response = self.identity_map_v3_client.generate_identity_map(input)

# Verify multi-identity type response
self.assertIsNotNone(response)
self.assertIsNotNone(response.mapped_identities)
self.assertIsNotNone(response.unmapped_identities)

def test_identity_map_v3_response_handling_example(self):
"""Test IdentityMapV3Response handling from documentation"""
input = IdentityMapV3Input.from_emails([self.mapped_email])
response = self.identity_map_v3_client.generate_identity_map(input)

# Documentation sdk-ref-python.md Line 254: Process mapped identity results
mapped_identity = response.mapped_identities.get("[email protected]")
if mapped_identity is not None:
current_uid = mapped_identity.current_raw_uid # Current raw UID2
previous_uid = mapped_identity.previous_raw_uid # Previous raw UID2 (Optional, only available for 90 days after rotation)
refresh_from = mapped_identity.refresh_from # When to refresh this identity

self.assertIsNotNone(current_uid)
self.assertIsNotNone(refresh_from)
else:
unmapped_identity = response.unmapped_identities.get("[email protected]")
if unmapped_identity:
reason = unmapped_identity.reason # OPTOUT, INVALID_IDENTIFIER, or UNKNOWN
self.assertIsNotNone(reason)

def test_identity_map_v3_complete_usage_example(self):
"""Test complete usage example from documentation sdk-ref-python.md Usage Example section"""

# Documentation sdk-ref-python.md Line 272: Example 1: Single identity type
email_input = IdentityMapV3Input.from_emails(["[email protected]", "[email protected]"])
email_response = self.identity_map_v3_client.generate_identity_map(email_input)

# Documentation sdk-ref-python.md Line 276: Process email results
for email, identity in email_response.mapped_identities.items():
print("Email: " + email)
print("Current UID: " + identity.current_raw_uid)
print("Previous UID: " + str(identity.previous_raw_uid))
print("Refresh from: " + str(identity.refresh_from))

for email, identity in email_response.unmapped_identities.items():
unmapped_output = "Unmapped email: " + email + " - Reason: " + str(identity.reason)
self.assertIsNotNone(unmapped_output)

# Documentation sdk-ref-python.md Line 285: Example 2: Mixed identity types
mixed_input = IdentityMapV3Input() \
.with_email("[email protected]") \
.with_phone("+12345678901") \
.with_hashed_email("pre_hashed_email_value") \
.with_hashed_phone("pre_hashed_phone_value")

# Documentation sdk-ref-python.md Line 291: Generate identity map
mixed_response = self.identity_map_v3_client.generate_identity_map(mixed_input)
self.assertIsNotNone(mixed_response)

def test_migration_examples(self):
"""Test migration examples from documentation sdk-ref-python.md Required Changes section"""

# Documentation sdk-ref-python.md Line 322: Change client class
client = IdentityMapV3Client(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
# Documentation sdk-ref-python.md Line 334: Update input construction
input = IdentityMapV3Input.from_emails(["[email protected]"])

# Documentation sdk-ref-python.md Line 337: Mix identity types (new capability)
input = IdentityMapV3Input() \
.with_email("[email protected]") \
.with_phone("+12345678901")

# Documentation sdk-ref-python.md Line 346: Update response handling
response = client.generate_identity_map(input)
mapped = response.mapped_identities.get("[email protected]")
current_uid = mapped.current_raw_uid
previous_uid = mapped.previous_raw_uid
refresh_from = mapped.refresh_from

self.assertIsNotNone(current_uid)
self.assertIsNotNone(refresh_from)

input = IdentityMapV3Input.from_emails([self.optout_email])
response = self.identity_map_v3_client.generate_identity_map(input)

# Documentation sdk-ref-python.md Line 358: Update error handling
unmapped = response.unmapped_identities.get("[email protected]")
if unmapped:
reason = unmapped.reason # Enum - OPTOUT, INVALID_IDENTIFIER, UNKNOWN
raw_reason = unmapped.raw_reason # String version

self.assertIsNotNone(reason)
self.assertIsNotNone(raw_reason)

def test_v2_legacy_identity_map_example(self):
"""Test V2 Identity Map legacy usage from documentation sdk-ref-python.md Previous Version section"""
# Documentation sdk-ref-python.md Line 379: Create V2 IdentityMapClient
client = IdentityMapClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 383: Generate identity map with V2 client
identity_map_response = client.generate_identity_map(IdentityMapInput.from_emails(["[email protected]", "[email protected]"]))

# Documentation sdk-ref-python.md Line 390: Get V2 mapped and unmapped results
mapped_identities = identity_map_response.mapped_identities
unmapped_identities = identity_map_response.unmapped_identities

# Documentation sdk-ref-python.md Line 396: V2 response processing
mapped_identity = mapped_identities.get("[email protected]")
if mapped_identity is not None:
raw_uid = mapped_identity.get_raw_uid()
self.assertIsNotNone(raw_uid)
else:
unmapped_identity = unmapped_identities.get("[email protected]")
reason = unmapped_identity.get_reason()
self.assertIsNotNone(reason)

def test_v2_salt_bucket_monitoring_example(self):
"""Test V2 salt bucket monitoring from documentation sdk-ref-python.md Monitor Rotated Salt Buckets section"""
# Documentation sdk-ref-python.md Line 410: Create or reuse IdentityMapClient
client = IdentityMapClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 418: Get identity buckets
since_timestamp = '2024-08-18T14:30:15+00:00'
identity_buckets_response = client.get_identity_buckets(datetime.fromisoformat(since_timestamp))

# Documentation sdk-ref-python.md Line 424: Process bucket results
if identity_buckets_response.buckets:
for bucket in identity_buckets_response.buckets:
bucket_id = bucket.get_bucket_id() # example "bucket_id": "a30od4mNRd"
last_updated = bucket.get_last_updated() # example "last_updated" "2024-08-19T22:52:03.109"
self.assertIsNotNone(bucket_id)
self.assertIsNotNone(last_updated)
else:
print("No bucket was returned")

def test_dsp_usage_example(self):
"""Test DSP client usage from documentation sdk-ref-python.md Usage for DSPs section"""
# Documentation sdk-ref-python.md Line 451: Create BidstreamClient
client = BidstreamClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 455: Refresh client
client.refresh()

uid_token = "mock_token"
domainOrAppName = "example.com"

# Documentation sdk-ref-python.md Line 464: Decrypt token
decrypted = client.decrypt_token_into_raw_uid(uid_token, domainOrAppName)
# If decryption succeeded, use the raw UID2.
if decrypted.success:
# Use decrypted.uid
used_uid = decrypted.uid
self.assertIsNotNone(used_uid)
else:
# Check decrypted.status for the failure reason.
self.assertIsNotNone(decrypted.status)

def test_sharing_client_usage_example(self):
"""Test Sharing client usage from documentation sdk-ref-python.md Usage for UID2 Sharers section"""
# Documentation sdk-ref-python.md Line 491: Create SharingClient
client = SharingClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)

# Documentation sdk-ref-python.md Line 495: Refresh client
client.refresh()

raw_uid = "mock_raw_uid"

# Documentation sdk-ref-python.md Line 499: Encrypt raw UID (sender)
encrypted = client.encrypt_raw_uid_into_token(raw_uid)
# If encryption succeeded, send the UID2 token to the receiver.
if encrypted.success:
# Send encrypted.encrypted_data to receiver
sent_data = encrypted.encrypted_data
self.assertIsNotNone(sent_data)
else:
# Check encrypted.status for the failure reason.
self.assertIsNotNone(encrypted.status)

uid_token = "mock_token" # Mock token for testing

# Documentation sdk-ref-python.md Line 508: Decrypt token (receiver)
decrypted = client.decrypt_token_into_raw_uid(uid_token)
# If decryption succeeded, use the raw UID2.
if decrypted.success:
# Use decrypted.uid
used_uid = decrypted.uid
self.assertIsNotNone(used_uid)
else:
# Check decrypted.status for the failure reason.
self.assertIsNotNone(decrypted.status)


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion uid2_client/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def encrypt(uid2, identity_scope, keys, keyset_id=None, **kwargs):
Keyword Args:
now (Datetime): the datettime to use for now. Defaults to utc now

Returns (str): Sharing Token
Returns (EncryptionDataResponse): Sharing Token

"""
now = kwargs.get("now")
Expand Down
2 changes: 1 addition & 1 deletion uid2_client/encryption_data_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def make_error(encryption_status):

@property
def success(self):
return self._encryption_status == EncryptionStatus.SUCCESS
return self.status == EncryptionStatus.SUCCESS

@property
def encrypted_data(self):
Expand Down
3 changes: 2 additions & 1 deletion uid2_client/sharing_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def encrypt_raw_uid_into_token(self, uid2, keyset_id=None):
uid2: the UID2 or EUID to be encrypted
keyset_id (int) : An optional keyset id to use for the encryption. Will use default keyset if left blank

Returns (str): Sharing Token
Returns:
EncryptionDataResponse: Sharing Token
"""
return self._encrypt_raw_uid_into_token(uid2, keyset_id, dt.datetime.now(tz=dt.timezone.utc))

Expand Down