Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 89 additions & 21 deletions hivemind_etl/mediawiki/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,89 @@
with workflow.unsafe.imports_passed_through():
from hivemind_etl.mediawiki.activities import (
get_hivemind_mediawiki_platforms,
extract_mediawiki,
transform_mediawiki_data,
load_mediawiki_data,
)


@workflow.defn
class ExtractMediaWikiWorkflow:
@workflow.run
async def run(self, mediawiki_platform: dict) -> None:
"""
Extract data from MediaWiki and store in S3.

Parameters
-----------
mediawiki_platform : dict
Dictionary containing platform information
"""
from hivemind_etl.mediawiki.activities import extract_mediawiki

await workflow.execute_activity(
extract_mediawiki,
mediawiki_platform,
start_to_close_timeout=timedelta(days=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=1),
maximum_attempts=3,
),
)


@workflow.defn
class TransformMediaWikiWorkflow:
@workflow.run
async def run(self, mediawiki_platform: dict) -> str:
"""
Transform the extracted data and store in S3.

Parameters
-----------
mediawiki_platform : dict
Dictionary containing platform information

Returns
--------
str
The key where transformed data is stored in S3
"""
from hivemind_etl.mediawiki.activities import transform_mediawiki_data

return await workflow.execute_activity(
transform_mediawiki_data,
mediawiki_platform,
start_to_close_timeout=timedelta(hours=6),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=5),
maximum_attempts=3,
),
)


@workflow.defn
class LoadMediaWikiWorkflow:
@workflow.run
async def run(self, mediawiki_platform: dict) -> None:
"""
Load the transformed data from S3.

Parameters
-----------
mediawiki_platform : dict
Dictionary containing platform information and transformed data key
"""
from hivemind_etl.mediawiki.activities import load_mediawiki_data

await workflow.execute_activity(
load_mediawiki_data,
mediawiki_platform,
start_to_close_timeout=timedelta(hours=3),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=1),
maximum_attempts=3,
),
)


@workflow.defn
class MediaWikiETLWorkflow:
@workflow.run
Expand Down Expand Up @@ -48,37 +125,28 @@ async def run(self, platform_id: str | None = None) -> None:
}

# Extract data from MediaWiki and store in S3
await workflow.execute_activity(
extract_mediawiki,
await workflow.execute_child_workflow(
ExtractMediaWikiWorkflow.run,
mediawiki_platform,
id=f"mediawiki:extract:{platform['community_id']}",
start_to_close_timeout=timedelta(days=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=1),
maximum_attempts=3,
),
)

# Transform the extracted data and store in S3
transformed_data_key = await workflow.execute_activity(
transform_mediawiki_data,
transformed_data_key = await workflow.execute_child_workflow(
TransformMediaWikiWorkflow.run,
mediawiki_platform,
id=f"mediawiki:transform:{platform['community_id']}",
start_to_close_timeout=timedelta(hours=6),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=5),
maximum_attempts=3,
),
)

mediawiki_platform["transformed_data_key"] = transformed_data_key
# Load the transformed data from S3
await workflow.execute_activity(
load_mediawiki_data,
await workflow.execute_child_workflow(
LoadMediaWikiWorkflow.run,
mediawiki_platform,
id=f"mediawiki:load:{platform['community_id']}",
start_to_close_timeout=timedelta(hours=3),
retry_policy=RetryPolicy(
initial_interval=timedelta(minutes=1),
maximum_attempts=3,
),
)

logging.info(
Expand Down