5858from  .exception  import  GracefulShutdownInterrupt , IgnorableVisit , InvalidVisitError , \
5959    NonRetriableError , RetriableError 
6060from  .middleware_interface  import  get_central_butler , \
61-     make_local_repo , make_local_cache , MiddlewareInterface 
61+     make_local_repo , make_local_cache , MiddlewareInterface , ButlerWriter , DirectButlerWriter 
62+ from  .kafka_butler_writer  import  KafkaButlerWriter 
6263from  .repo_tracker  import  LocalRepoTracker 
6364
6465# Platform that prompt processing will run on 
9697# The number of seconds to delay retrying connections to the Redis stream. 
9798redis_retry  =  float (os .environ .get ("REDIS_RETRY_DELAY" , 30 ))
9899
100+ # If '1', sends outputs to a service for transfer into the central Butler 
101+ # repository instead of writing to the database directly. 
102+ use_kafka_butler_writer  =  os .environ .get ("USE_KAFKA_BUTLER_WRITER" , "0" ) ==  "1" 
103+ if  use_kafka_butler_writer :
104+     # Hostname of the Kafka cluster used by the Butler writer. 
105+     butler_writer_kafka_cluster  =  os .environ ["BUTLER_WRITER_KAFKA_CLUSTER" ]
106+     # Username for authentication to BUTLER_WRITER_KAFKA_CLUSTER. 
107+     butler_writer_kafka_username  =  os .environ ["BUTLER_WRITER_KAFKA_USERNAME" ]
108+     # Password for authentication to BUTLER_WRITER_KAFKA_CLUSTER. 
109+     butler_writer_kafka_password  =  os .environ ["BUTLER_WRITER_KAFKA_PASSWORD" ]
110+     # Topic used to transfer output datasets to the central repository. 
111+     butler_writer_kafka_topic  =  os .environ ["BUTLER_WRITER_KAFKA_TOPIC" ]
112+     # URI to the path where output datasets will be written when using the Kafka 
113+     # writer to transfer outputs to the central Butler repository. 
114+     # This will generally be in the same S3 bucket used by the central Butler. 
115+     butler_writer_file_output_path  =  os .environ ["BUTLER_WRITER_FILE_OUTPUT_PATH" ]
116+ 
99117# Conditionally load keda environment variables 
100118if  platform  ==  "keda" :
101119    # Time to wait for fanned out messages before spawning new pod. 
@@ -163,6 +181,18 @@ def _get_consumer():
163181    })
164182
165183
184+ @functools .cache  
185+ def  _get_producer ():
186+     """Lazy initialization of Kafka Producer for Butler writer.""" 
187+     return  kafka .Producer ({
188+         "bootstrap.servers" : butler_writer_kafka_cluster ,
189+         "security.protocol" : "sasl_plaintext" ,
190+         "sasl.mechanism" : "SCRAM-SHA-512" ,
191+         "sasl.username" : butler_writer_kafka_username ,
192+         "sasl.password" : butler_writer_kafka_password 
193+     })
194+ 
195+ 
166196@functools .cache  
167197def  _get_storage_client ():
168198    """Lazy initialization of cloud storage reader.""" 
@@ -189,6 +219,19 @@ def _get_read_butler():
189219        return  _get_write_butler ()
190220
191221
222+ @functools .cache  
223+ def  _get_butler_writer () ->  ButlerWriter :
224+     """Lazy initialization of Butler writer.""" 
225+     if  use_kafka_butler_writer :
226+         return  KafkaButlerWriter (
227+             _get_producer (),
228+             output_topic = butler_writer_kafka_topic ,
229+             file_output_path = butler_writer_file_output_path 
230+         )
231+     else :
232+         return  DirectButlerWriter (_get_write_butler ())
233+ 
234+ 
192235@functools .cache  
193236def  _get_local_repo ():
194237    """Lazy initialization of local repo. 
@@ -461,7 +504,7 @@ def create_app():
461504        _get_consumer ()
462505        _get_storage_client ()
463506        _get_read_butler ()
464-         _get_write_butler ()
507+         _get_butler_writer ()
465508        _get_local_repo ()
466509
467510        app  =  flask .Flask (__name__ )
@@ -510,7 +553,7 @@ def keda_start():
510553        _get_consumer ()
511554        _get_storage_client ()
512555        _get_read_butler ()
513-         _get_write_butler ()
556+         _get_butler_writer ()
514557        _get_local_repo ()
515558
516559        redis_session  =  RedisStreamSession (
@@ -1002,7 +1045,7 @@ def process_visit(expected_visit: FannedOutVisit):
10021045                # Create a fresh MiddlewareInterface object to avoid accidental 
10031046                # "cross-talk" between different visits. 
10041047                mwi  =  MiddlewareInterface (_get_read_butler (),
1005-                                           _get_write_butler (),
1048+                                           _get_butler_writer (),
10061049                                          image_bucket ,
10071050                                          expected_visit ,
10081051                                          pre_pipelines ,
0 commit comments