1
1
import logging
2
2
import sys
3
3
import typing
4
- from datetime import timezone
4
+ from datetime import datetime
5
+ from urllib .parse import urljoin
5
6
6
- import pydantic
7
7
import requests
8
8
from flag_engine import engine
9
- from flag_engine .context .mappers import map_environment_identity_to_context
10
- from flag_engine .environments .models import EnvironmentModel
11
- from flag_engine .identities .models import IdentityModel
12
- from flag_engine .identities .traits .models import TraitModel
13
- from flag_engine .identities .traits .types import TraitValue
14
9
from requests .adapters import HTTPAdapter
15
10
from requests .utils import default_user_agent
16
11
from urllib3 import Retry
17
12
18
13
from flagsmith .analytics import AnalyticsProcessor
19
14
from flagsmith .exceptions import FlagsmithAPIError , FlagsmithClientError
15
+ from flagsmith .mappers import (
16
+ map_context_and_identity_data_to_context ,
17
+ map_environment_document_to_context ,
18
+ map_environment_document_to_environment_updated_at ,
19
+ )
20
20
from flagsmith .models import DefaultFlag , Flags , Segment
21
- from flagsmith .offline_handlers import BaseOfflineHandler
21
+ from flagsmith .offline_handlers import OfflineHandler
22
22
from flagsmith .polling_manager import EnvironmentDataPollingManager
23
- from flagsmith .streaming_manager import EventStreamManager , StreamEvent
23
+ from flagsmith .streaming_manager import EventStreamManager
24
24
from flagsmith .types import (
25
25
ApplicationMetadata ,
26
26
JsonType ,
27
- TraitConfig ,
27
+ StreamEvent ,
28
28
TraitMapping ,
29
29
)
30
30
from flagsmith .utils .identities import generate_identity_data
@@ -72,7 +72,7 @@ def __init__(
72
72
] = None ,
73
73
proxies : typing .Optional [typing .Dict [str , str ]] = None ,
74
74
offline_mode : bool = False ,
75
- offline_handler : typing .Optional [BaseOfflineHandler ] = None ,
75
+ offline_handler : typing .Optional [OfflineHandler ] = None ,
76
76
enable_realtime_updates : bool = False ,
77
77
application_metadata : typing .Optional [ApplicationMetadata ] = None ,
78
78
):
@@ -112,8 +112,8 @@ def __init__(
112
112
self .default_flag_handler = default_flag_handler
113
113
self .enable_realtime_updates = enable_realtime_updates
114
114
self ._analytics_processor : typing .Optional [AnalyticsProcessor ] = None
115
- self ._environment : typing .Optional [EnvironmentModel ] = None
116
- self ._identity_overrides_by_identifier : typing .Dict [ str , IdentityModel ] = {}
115
+ self ._evaluation_context : typing .Optional [engine . EvaluationContext ] = None
116
+ self ._environment_updated_at : typing .Optional [ datetime ] = None
117
117
118
118
# argument validation
119
119
if offline_mode and not offline_handler :
@@ -129,7 +129,7 @@ def __init__(
129
129
)
130
130
131
131
if self .offline_handler :
132
- self ._environment = self .offline_handler .get_environment ()
132
+ self ._evaluation_context = self .offline_handler .get_evaluation_context ()
133
133
134
134
if not self .offline_mode :
135
135
if not environment_key :
@@ -159,9 +159,9 @@ def __init__(
159
159
self .request_timeout_seconds = request_timeout_seconds
160
160
self .session .mount (self .api_url , HTTPAdapter (max_retries = retries ))
161
161
162
- self .environment_flags_url = f" { self .api_url } flags/"
163
- self .identities_url = f" { self .api_url } identities/"
164
- self .environment_url = f" { self .api_url } environment-document/"
162
+ self .environment_flags_url = urljoin ( self .api_url , " flags/")
163
+ self .identities_url = urljoin ( self .api_url , " identities/")
164
+ self .environment_url = urljoin ( self .api_url , " environment-document/")
165
165
166
166
if self .enable_local_evaluation :
167
167
if not environment_key .startswith ("ser." ):
@@ -182,10 +182,13 @@ def _initialise_local_evaluation(self) -> None:
182
182
# method calls, update the environment manually.
183
183
self .update_environment ()
184
184
if self .enable_realtime_updates :
185
- if not self ._environment :
185
+ if not self ._evaluation_context :
186
186
raise ValueError ("Unable to get environment from API key" )
187
187
188
- stream_url = f"{ self .realtime_api_url } sse/environments/{ self ._environment .api_key } /stream"
188
+ stream_url = urljoin (
189
+ self .realtime_api_url ,
190
+ f"sse/environments/{ self ._evaluation_context ['environment' ]['key' ]} /stream" ,
191
+ )
189
192
190
193
self .event_stream_thread = EventStreamManager (
191
194
stream_url = stream_url ,
@@ -207,15 +210,11 @@ def _initialise_local_evaluation(self) -> None:
207
210
self .environment_data_polling_manager_thread .start ()
208
211
209
212
def handle_stream_event (self , event : StreamEvent ) -> None :
210
- if not self ._environment :
213
+ if not ( environment_updated_at := self ._environment_updated_at ) :
211
214
raise ValueError (
212
- "Unable to access environment. Environment should not be null "
215
+ "Cannot handle stream events before retrieving initial environment "
213
216
)
214
- environment_updated_at = self ._environment .updated_at
215
- if environment_updated_at .tzinfo is None :
216
- environment_updated_at = environment_updated_at .astimezone (timezone .utc )
217
-
218
- if event .updated_at > environment_updated_at :
217
+ if event ["updated_at" ] > environment_updated_at :
219
218
self .update_environment ()
220
219
221
220
def get_environment_flags (self ) -> Flags :
@@ -224,7 +223,9 @@ def get_environment_flags(self) -> Flags:
224
223
225
224
:return: Flags object holding all the flags for the current environment.
226
225
"""
227
- if (self .offline_mode or self .enable_local_evaluation ) and self ._environment :
226
+ if (
227
+ self .offline_mode or self .enable_local_evaluation
228
+ ) and self ._evaluation_context :
228
229
return self ._get_environment_flags_from_document ()
229
230
return self ._get_environment_flags_from_api ()
230
231
@@ -250,7 +251,9 @@ def get_identity_flags(
250
251
:return: Flags object holding all the flags for the given identity.
251
252
"""
252
253
traits = traits or {}
253
- if (self .offline_mode or self .enable_local_evaluation ) and self ._environment :
254
+ if (
255
+ self .offline_mode or self .enable_local_evaluation
256
+ ) and self ._evaluation_context :
254
257
return self ._get_identity_flags_from_document (identifier , traits )
255
258
return self ._get_identity_flags_from_api (
256
259
identifier ,
@@ -261,7 +264,7 @@ def get_identity_flags(
261
264
def get_identity_segments (
262
265
self ,
263
266
identifier : str ,
264
- traits : typing .Optional [typing .Mapping [str , TraitValue ]] = None ,
267
+ traits : typing .Optional [typing .Mapping [str , engine . ContextValue ]] = None ,
265
268
) -> typing .List [Segment ]:
266
269
"""
267
270
Get a list of segments that the given identity is in.
@@ -272,37 +275,44 @@ def get_identity_segments(
272
275
Flagsmith, e.g. {"num_orders": 10}
273
276
:return: list of Segment objects that the identity is part of.
274
277
"""
275
-
276
- if not self ._environment :
278
+ if not self ._evaluation_context :
277
279
raise FlagsmithClientError (
278
280
"Local evaluation required to obtain identity segments."
279
281
)
280
282
281
- traits = traits or {}
282
- identity_model = self ._get_identity_model (identifier , ** traits )
283
- context = map_environment_identity_to_context (
284
- environment = self ._environment ,
285
- identity = identity_model ,
286
- override_traits = None ,
283
+ context = map_context_and_identity_data_to_context (
284
+ context = self ._evaluation_context ,
285
+ identifier = identifier ,
286
+ traits = traits ,
287
287
)
288
+
288
289
evaluation_result = engine .get_evaluation_result (
289
290
context = context ,
290
291
)
291
292
return [
292
- Segment (id = int (sm ["key" ]), name = sm ["name" ])
293
- for sm in evaluation_result . get ( "segments" , [])
293
+ Segment (id = int (segment_result ["key" ]), name = segment_result ["name" ])
294
+ for segment_result in evaluation_result [ "segments" ]
294
295
]
295
296
296
297
def update_environment (self ) -> None :
297
298
try :
298
- self ._environment = self ._get_environment_from_api ()
299
- except (FlagsmithAPIError , pydantic .ValidationError ):
300
- logger .exception ("Error updating environment" )
299
+ environment_data = self ._get_json_response (
300
+ self .environment_url , method = "GET"
301
+ )
302
+ except FlagsmithAPIError :
303
+ logger .exception ("Error retrieving environment document from API" )
301
304
else :
302
- if overrides := self ._environment .identity_overrides :
303
- self ._identity_overrides_by_identifier = {
304
- identity .identifier : identity for identity in overrides
305
- }
305
+ try :
306
+ self ._evaluation_context = map_environment_document_to_context (
307
+ environment_data ,
308
+ )
309
+ self ._environment_updated_at = (
310
+ map_environment_document_to_environment_updated_at (
311
+ environment_data ,
312
+ )
313
+ )
314
+ except (KeyError , TypeError , ValueError ):
315
+ logger .exception ("Error parsing environment document" )
306
316
307
317
def _get_headers (
308
318
self ,
@@ -322,22 +332,11 @@ def _get_headers(
322
332
headers .update (custom_headers or {})
323
333
return headers
324
334
325
- def _get_environment_from_api (self ) -> EnvironmentModel :
326
- environment_data = self ._get_json_response (self .environment_url , method = "GET" )
327
- return EnvironmentModel .model_validate (environment_data )
328
-
329
335
def _get_environment_flags_from_document (self ) -> Flags :
330
- if self ._environment is None :
336
+ if self ._evaluation_context is None :
331
337
raise TypeError ("No environment present" )
332
- identity = self ._get_identity_model (identifier = "" , traits = None )
333
-
334
- context = map_environment_identity_to_context (
335
- environment = self ._environment ,
336
- identity = identity ,
337
- override_traits = None ,
338
- )
339
338
340
- evaluation_result = engine .get_evaluation_result (context = context )
339
+ evaluation_result = engine .get_evaluation_result (self . _evaluation_context )
341
340
342
341
return Flags .from_evaluation_result (
343
342
evaluation_result = evaluation_result ,
@@ -346,18 +345,18 @@ def _get_environment_flags_from_document(self) -> Flags:
346
345
)
347
346
348
347
def _get_identity_flags_from_document (
349
- self , identifier : str , traits : TraitMapping
348
+ self ,
349
+ identifier : str ,
350
+ traits : TraitMapping ,
350
351
) -> Flags :
351
- identity_model = self ._get_identity_model (identifier , ** traits )
352
- if self ._environment is None :
352
+ if self ._evaluation_context is None :
353
353
raise TypeError ("No environment present" )
354
354
355
- context = map_environment_identity_to_context (
356
- environment = self ._environment ,
357
- identity = identity_model ,
358
- override_traits = None ,
355
+ context = map_context_and_identity_data_to_context (
356
+ context = self ._evaluation_context ,
357
+ identifier = identifier ,
358
+ traits = traits ,
359
359
)
360
-
361
360
evaluation_result = engine .get_evaluation_result (
362
361
context = context ,
363
362
)
@@ -435,34 +434,6 @@ def _get_json_response(
435
434
"Unable to get valid response from Flagsmith API."
436
435
) from e
437
436
438
- def _get_identity_model (
439
- self ,
440
- identifier : str ,
441
- ** traits : typing .Union [TraitValue , TraitConfig ],
442
- ) -> IdentityModel :
443
- if not self ._environment :
444
- raise FlagsmithClientError (
445
- "Unable to build identity model when no local environment present."
446
- )
447
-
448
- trait_models = [
449
- TraitModel (
450
- trait_key = key ,
451
- trait_value = value ["value" ] if isinstance (value , dict ) else value ,
452
- )
453
- for key , value in traits .items ()
454
- ]
455
-
456
- if identity := self ._identity_overrides_by_identifier .get (identifier ):
457
- identity .update_traits (trait_models )
458
- return identity
459
-
460
- return IdentityModel (
461
- identifier = identifier ,
462
- environment_api_key = self ._environment .api_key ,
463
- identity_traits = trait_models ,
464
- )
465
-
466
437
def __del__ (self ) -> None :
467
438
if hasattr (self , "environment_data_polling_manager_thread" ):
468
439
self .environment_data_polling_manager_thread .stop ()
0 commit comments