Skip to content

Commit c3aad42

Browse files
authored
Merge pull request #54 from TogetherCrew/feat/mediawiki-workflows
feat: converted mediawiki activities to workflows!
2 parents e62400f + a38dd77 commit c3aad42

File tree

1 file changed

+89
-21
lines changed

1 file changed

+89
-21
lines changed

hivemind_etl/mediawiki/workflows.py

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,89 @@
77
with workflow.unsafe.imports_passed_through():
88
from hivemind_etl.mediawiki.activities import (
99
get_hivemind_mediawiki_platforms,
10-
extract_mediawiki,
11-
transform_mediawiki_data,
12-
load_mediawiki_data,
1310
)
1411

1512

13+
@workflow.defn
14+
class ExtractMediaWikiWorkflow:
15+
@workflow.run
16+
async def run(self, mediawiki_platform: dict) -> None:
17+
"""
18+
Extract data from MediaWiki and store in S3.
19+
20+
Parameters
21+
-----------
22+
mediawiki_platform : dict
23+
Dictionary containing platform information
24+
"""
25+
from hivemind_etl.mediawiki.activities import extract_mediawiki
26+
27+
await workflow.execute_activity(
28+
extract_mediawiki,
29+
mediawiki_platform,
30+
start_to_close_timeout=timedelta(days=5),
31+
retry_policy=RetryPolicy(
32+
initial_interval=timedelta(minutes=1),
33+
maximum_attempts=3,
34+
),
35+
)
36+
37+
38+
@workflow.defn
39+
class TransformMediaWikiWorkflow:
40+
@workflow.run
41+
async def run(self, mediawiki_platform: dict) -> str:
42+
"""
43+
Transform the extracted data and store in S3.
44+
45+
Parameters
46+
-----------
47+
mediawiki_platform : dict
48+
Dictionary containing platform information
49+
50+
Returns
51+
--------
52+
str
53+
The key where transformed data is stored in S3
54+
"""
55+
from hivemind_etl.mediawiki.activities import transform_mediawiki_data
56+
57+
return await workflow.execute_activity(
58+
transform_mediawiki_data,
59+
mediawiki_platform,
60+
start_to_close_timeout=timedelta(hours=6),
61+
retry_policy=RetryPolicy(
62+
initial_interval=timedelta(minutes=5),
63+
maximum_attempts=3,
64+
),
65+
)
66+
67+
68+
@workflow.defn
69+
class LoadMediaWikiWorkflow:
70+
@workflow.run
71+
async def run(self, mediawiki_platform: dict) -> None:
72+
"""
73+
Load the transformed data from S3.
74+
75+
Parameters
76+
-----------
77+
mediawiki_platform : dict
78+
Dictionary containing platform information and transformed data key
79+
"""
80+
from hivemind_etl.mediawiki.activities import load_mediawiki_data
81+
82+
await workflow.execute_activity(
83+
load_mediawiki_data,
84+
mediawiki_platform,
85+
start_to_close_timeout=timedelta(hours=3),
86+
retry_policy=RetryPolicy(
87+
initial_interval=timedelta(minutes=1),
88+
maximum_attempts=3,
89+
),
90+
)
91+
92+
1693
@workflow.defn
1794
class MediaWikiETLWorkflow:
1895
@workflow.run
@@ -48,37 +125,28 @@ async def run(self, platform_id: str | None = None) -> None:
48125
}
49126

50127
# Extract data from MediaWiki and store in S3
51-
await workflow.execute_activity(
52-
extract_mediawiki,
128+
await workflow.execute_child_workflow(
129+
ExtractMediaWikiWorkflow.run,
53130
mediawiki_platform,
131+
id=f"mediawiki:extract:{platform['community_id']}",
54132
start_to_close_timeout=timedelta(days=5),
55-
retry_policy=RetryPolicy(
56-
initial_interval=timedelta(minutes=1),
57-
maximum_attempts=3,
58-
),
59133
)
60134

61135
# Transform the extracted data and store in S3
62-
transformed_data_key = await workflow.execute_activity(
63-
transform_mediawiki_data,
136+
transformed_data_key = await workflow.execute_child_workflow(
137+
TransformMediaWikiWorkflow.run,
64138
mediawiki_platform,
139+
id=f"mediawiki:transform:{platform['community_id']}",
65140
start_to_close_timeout=timedelta(hours=6),
66-
retry_policy=RetryPolicy(
67-
initial_interval=timedelta(minutes=5),
68-
maximum_attempts=3,
69-
),
70141
)
71142

72143
mediawiki_platform["transformed_data_key"] = transformed_data_key
73144
# Load the transformed data from S3
74-
await workflow.execute_activity(
75-
load_mediawiki_data,
145+
await workflow.execute_child_workflow(
146+
LoadMediaWikiWorkflow.run,
76147
mediawiki_platform,
148+
id=f"mediawiki:load:{platform['community_id']}",
77149
start_to_close_timeout=timedelta(hours=3),
78-
retry_policy=RetryPolicy(
79-
initial_interval=timedelta(minutes=1),
80-
maximum_attempts=3,
81-
),
82150
)
83151

84152
logging.info(

0 commit comments

Comments
 (0)