From a38dd775fafb71914fd94469a1d4ae15a31d7c8f Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 4 Jun 2025 16:48:01 +0330 Subject: [PATCH] feat: converted mediawiki activities to workflows! --- hivemind_etl/mediawiki/workflows.py | 110 ++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 21 deletions(-) diff --git a/hivemind_etl/mediawiki/workflows.py b/hivemind_etl/mediawiki/workflows.py index c23856b..ff5bf97 100644 --- a/hivemind_etl/mediawiki/workflows.py +++ b/hivemind_etl/mediawiki/workflows.py @@ -7,12 +7,89 @@ with workflow.unsafe.imports_passed_through(): from hivemind_etl.mediawiki.activities import ( get_hivemind_mediawiki_platforms, - extract_mediawiki, - transform_mediawiki_data, - load_mediawiki_data, ) +@workflow.defn +class ExtractMediaWikiWorkflow: + @workflow.run + async def run(self, mediawiki_platform: dict) -> None: + """ + Extract data from MediaWiki and store in S3. + + Parameters + ----------- + mediawiki_platform : dict + Dictionary containing platform information + """ + from hivemind_etl.mediawiki.activities import extract_mediawiki + + await workflow.execute_activity( + extract_mediawiki, + mediawiki_platform, + start_to_close_timeout=timedelta(days=5), + retry_policy=RetryPolicy( + initial_interval=timedelta(minutes=1), + maximum_attempts=3, + ), + ) + + +@workflow.defn +class TransformMediaWikiWorkflow: + @workflow.run + async def run(self, mediawiki_platform: dict) -> str: + """ + Transform the extracted data and store in S3. + + Parameters + ----------- + mediawiki_platform : dict + Dictionary containing platform information + + Returns + -------- + str + The key where transformed data is stored in S3 + """ + from hivemind_etl.mediawiki.activities import transform_mediawiki_data + + return await workflow.execute_activity( + transform_mediawiki_data, + mediawiki_platform, + start_to_close_timeout=timedelta(hours=6), + retry_policy=RetryPolicy( + initial_interval=timedelta(minutes=5), + maximum_attempts=3, + ), + ) + + +@workflow.defn +class LoadMediaWikiWorkflow: + @workflow.run + async def run(self, mediawiki_platform: dict) -> None: + """ + Load the transformed data from S3. + + Parameters + ----------- + mediawiki_platform : dict + Dictionary containing platform information and transformed data key + """ + from hivemind_etl.mediawiki.activities import load_mediawiki_data + + await workflow.execute_activity( + load_mediawiki_data, + mediawiki_platform, + start_to_close_timeout=timedelta(hours=3), + retry_policy=RetryPolicy( + initial_interval=timedelta(minutes=1), + maximum_attempts=3, + ), + ) + + @workflow.defn class MediaWikiETLWorkflow: @workflow.run @@ -48,37 +125,28 @@ async def run(self, platform_id: str | None = None) -> None: } # Extract data from MediaWiki and store in S3 - await workflow.execute_activity( - extract_mediawiki, + await workflow.execute_child_workflow( + ExtractMediaWikiWorkflow.run, mediawiki_platform, + id=f"mediawiki:extract:{platform['community_id']}", start_to_close_timeout=timedelta(days=5), - retry_policy=RetryPolicy( - initial_interval=timedelta(minutes=1), - maximum_attempts=3, - ), ) # Transform the extracted data and store in S3 - transformed_data_key = await workflow.execute_activity( - transform_mediawiki_data, + transformed_data_key = await workflow.execute_child_workflow( + TransformMediaWikiWorkflow.run, mediawiki_platform, + id=f"mediawiki:transform:{platform['community_id']}", start_to_close_timeout=timedelta(hours=6), - retry_policy=RetryPolicy( - initial_interval=timedelta(minutes=5), - maximum_attempts=3, - ), ) mediawiki_platform["transformed_data_key"] = transformed_data_key # Load the transformed data from S3 - await workflow.execute_activity( - load_mediawiki_data, + await workflow.execute_child_workflow( + LoadMediaWikiWorkflow.run, mediawiki_platform, + id=f"mediawiki:load:{platform['community_id']}", start_to_close_timeout=timedelta(hours=3), - retry_policy=RetryPolicy( - initial_interval=timedelta(minutes=1), - maximum_attempts=3, - ), ) logging.info(