11import json
22from datetime import datetime , timezone
3- from typing import Dict , List , Optional , Any
3+ from typing import Dict , List , Optional , Any , Literal
44
5+ from .identity_map_v3_input import IdentityMapV3Input
56from .unmapped_identity_reason import UnmappedIdentityReason
67
78
89class IdentityMapV3Response :
9- def __init__ (self , response : str , identity_map_input ):
10+ def __init__ (self , response : str , identity_map_input : IdentityMapV3Input ):
1011 self ._mapped_identities : Dict [str , MappedIdentity ] = {}
1112 self ._unmapped_identities : Dict [str , UnmappedIdentity ] = {}
13+
1214 response_json = json .loads (response )
13- self ._status = response_json ["status" ]
15+ api_response = ApiResponse .from_json (response_json )
16+ self ._status = api_response .status
1417
1518 if not self .is_success ():
1619 raise ValueError ("Got unexpected identity map status: " + self ._status )
1720
18- body = response_json ["body" ]
19- self ._populate_identities (body , identity_map_input )
21+ self ._populate_identities (api_response .body , identity_map_input )
2022
21- def _populate_identities (self , api_response : Dict [str , List [Dict ]], identity_map_input ) :
23+ def _populate_identities (self , api_response : Dict [Literal [ 'email_hash' , 'phone_hash' ], List ['ApiIdentity' ]], identity_map_input : IdentityMapV3Input ) -> None :
2224 for identity_type , identities in api_response .items ():
2325 self ._populate_identities_for_type (identity_map_input , identity_type , identities )
2426
25- def _populate_identities_for_type (self , identity_map_input , identity_type : str , identities : List [Dict ]):
26- for i , api_identity_data in enumerate (identities ):
27- api_identity = ApiIdentity .from_json (api_identity_data )
27+ def _populate_identities_for_type (self , identity_map_input : IdentityMapV3Input , identity_type : Literal ['email_hash' , 'phone_hash' ], identities : List ['ApiIdentity' ]) -> None :
28+ for i , api_identity in enumerate (identities ):
2829 input_diis = identity_map_input .get_input_diis (identity_type , i )
2930
3031 for input_dii in input_diis :
@@ -49,6 +50,22 @@ def status(self) -> str:
4950 return self ._status
5051
5152
53+ class ApiResponse :
54+ def __init__ (self , status : str , body : Dict [Literal ['email_hash' , 'phone_hash' ], List ['ApiIdentity' ]]):
55+ self .status = status
56+ self .body = body
57+
58+ @classmethod
59+ def from_json (cls , data ) -> 'ApiResponse' :
60+ api_body : Dict [Literal ['email_hash' , 'phone_hash' ], List ['ApiIdentity' ]] = {
61+ 'email_hash' : [ApiIdentity .from_json (item ) for item in data .get ('body' ).get ('email_hash' , [])] if data .get ('body' ).get ('email_hash' ) else [],
62+ 'phone_hash' : [ApiIdentity .from_json (item ) for item in data .get ('body' ).get ('phone_hash' , [])] if data .get ('body' ).get ('phone_hash' ) else [],
63+ }
64+ return cls (
65+ status = data .get ("status" ),
66+ body = api_body
67+ )
68+
5269class ApiIdentity :
5370 def __init__ (self , current_uid : Optional [str ], previous_uid : Optional [str ],
5471 refresh_from_seconds : Optional [int ], error : Optional [str ]):
@@ -74,7 +91,9 @@ def __init__(self, current_uid: str, previous_uid: Optional[str], refresh_from_s
7491 self ._refresh_from = refresh_from_seconds
7592
7693 @classmethod
77- def from_api_identity (cls , api_identity : ApiIdentity ):
94+ def from_api_identity (cls , api_identity : ApiIdentity ) -> 'MappedIdentity' :
95+ if api_identity .current_uid is None or api_identity .refresh_from_seconds is None :
96+ raise ValueError ("Mapped identity cannot be created from API identity with missing current_uid or refresh_from_seconds" )
7897 return cls (api_identity .current_uid ,
7998 api_identity .previous_uid ,
8099 datetime .fromtimestamp (api_identity .refresh_from_seconds , tz = timezone .utc ))
0 commit comments