From c7c6fa5dcadcb75f5f2f30dcc05d61ec2fff2302 Mon Sep 17 00:00:00 2001 From: Philip Lee Date: Thu, 21 Aug 2025 10:06:13 -0400 Subject: [PATCH] test credentials --- jobs/revenue-forecasting/.dockerignore | 7 + jobs/revenue-forecasting/.flake8 | 2 + jobs/revenue-forecasting/.gitignore | 4 + .../03_compare_with_previous_period.py | 311 ++++++++++++++++++ jobs/revenue-forecasting/03_test.py | 218 ++++++++++++ jobs/revenue-forecasting/05_upload_data.py | 36 ++ ...b_subscription_services_v3_forecasting.sql | 48 +++ ...bscription_services_v3_forecasting_ltv.sql | 16 + jobs/revenue-forecasting/Dockerfile | 26 ++ jobs/revenue-forecasting/README.md | 45 +++ jobs/revenue-forecasting/ci_job.yaml | 15 + jobs/revenue-forecasting/ci_workflow.yaml | 13 + jobs/revenue-forecasting/docker-compose.yml | 17 + jobs/revenue-forecasting/pytest.ini | 3 + jobs/revenue-forecasting/requirements.txt | 6 + .../revenue_forecasting/main.py | 11 + jobs/revenue-forecasting/setup.py | 15 + jobs/revenue-forecasting/tests/__init__.py | 0 jobs/revenue-forecasting/tests/test_main.py | 11 + 19 files changed, 804 insertions(+) create mode 100644 jobs/revenue-forecasting/.dockerignore create mode 100644 jobs/revenue-forecasting/.flake8 create mode 100644 jobs/revenue-forecasting/.gitignore create mode 100644 jobs/revenue-forecasting/03_compare_with_previous_period.py create mode 100644 jobs/revenue-forecasting/03_test.py create mode 100644 jobs/revenue-forecasting/05_upload_data.py create mode 100644 jobs/revenue-forecasting/05b_subscription_services_v3_forecasting.sql create mode 100644 jobs/revenue-forecasting/05c_subscription_services_v3_forecasting_ltv.sql create mode 100644 jobs/revenue-forecasting/Dockerfile create mode 100644 jobs/revenue-forecasting/README.md create mode 100644 jobs/revenue-forecasting/ci_job.yaml create mode 100644 jobs/revenue-forecasting/ci_workflow.yaml create mode 100644 jobs/revenue-forecasting/docker-compose.yml create mode 100644 jobs/revenue-forecasting/pytest.ini create mode 100644 jobs/revenue-forecasting/requirements.txt create mode 100644 jobs/revenue-forecasting/revenue_forecasting/main.py create mode 100644 jobs/revenue-forecasting/setup.py create mode 100644 jobs/revenue-forecasting/tests/__init__.py create mode 100644 jobs/revenue-forecasting/tests/test_main.py diff --git a/jobs/revenue-forecasting/.dockerignore b/jobs/revenue-forecasting/.dockerignore new file mode 100644 index 00000000..cff5d6ab --- /dev/null +++ b/jobs/revenue-forecasting/.dockerignore @@ -0,0 +1,7 @@ +.ci_job.yaml +.ci_workflow.yaml +.DS_Store +*.pyc +.pytest_cache/ +__pycache__/ +venv/ diff --git a/jobs/revenue-forecasting/.flake8 b/jobs/revenue-forecasting/.flake8 new file mode 100644 index 00000000..2bcd70e3 --- /dev/null +++ b/jobs/revenue-forecasting/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 88 diff --git a/jobs/revenue-forecasting/.gitignore b/jobs/revenue-forecasting/.gitignore new file mode 100644 index 00000000..2e9942c0 --- /dev/null +++ b/jobs/revenue-forecasting/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +*.pyc +__pycache__/ +venv/ diff --git a/jobs/revenue-forecasting/03_compare_with_previous_period.py b/jobs/revenue-forecasting/03_compare_with_previous_period.py new file mode 100644 index 00000000..39d93b5e --- /dev/null +++ b/jobs/revenue-forecasting/03_compare_with_previous_period.py @@ -0,0 +1,311 @@ +import marimo + +__generated_with = "0.13.14" +app = marimo.App(width="medium") + + + +def _(): + from google.cloud import bigquery + import marimo as mo + import numpy as np + import pandas as pd + + return bigquery, pd + + + +def _(): + SQL_TABLE = 'mozdata.revenue_cat2_analysis.daily_active_logical_subscriptions_rebuilt_20250611' + + YEARLY_PLAN = '12' + MONTHLY_PLAN = '1' + PLAN_TYPES = [ + YEARLY_PLAN, + MONTHLY_PLAN + ] + + MOZILLA_VPN = 'Mozilla VPN' + MOZILLA_MONITOR = 'Mozilla Monitor Plus' + PRODUCTS = [ + (MOZILLA_VPN, YEARLY_PLAN), + (MOZILLA_VPN, MONTHLY_PLAN), + (MOZILLA_MONITOR, YEARLY_PLAN), + (MOZILLA_MONITOR, MONTHLY_PLAN), + ('Relay Premium', YEARLY_PLAN), + ('Relay Premium', MONTHLY_PLAN), + ('Mozilla VPN & Firefox Relay', YEARLY_PLAN), + ('MDN Plus', YEARLY_PLAN), + ('MDN Plus', MONTHLY_PLAN), + ] + + DB_START_DATE = '2022-01-01' + DB_END_DATE = '2025-06-01' + OUTPUT_TSV = '03_compare_with_previous_period.tsv' + return ( + DB_END_DATE, + DB_START_DATE, + MONTHLY_PLAN, + MOZILLA_VPN, + OUTPUT_TSV, + PLAN_TYPES, + PRODUCTS, + SQL_TABLE, + YEARLY_PLAN, + ) + + + +def _(client, pd): + def run(sql_statement: str) -> pd.DataFrame: + query = client.query(sql_statement) + return query.result().to_dataframe() + + return (run,) + + + +def _(bigquery): + client = bigquery.Client('mozdata') + return (client,) + + + +def _(DB_START_DATE, SQL_TABLE, pd, run): + def load_subscription_data() -> pd.DataFrame: + df = run(f""" + SELECT + product_name AS product_name, + plan_interval_months AS plan_type, + active_date AS t, + is_renewed_subscription AS is_renewal, + SUM(total_period_amount_usd) AS revenue, + COUNT(*) AS accounts + FROM `{SQL_TABLE}` + WHERE + DATE('{DB_START_DATE}') <= active_date AND is_first_day_of_period + GROUP BY 1, 2, 3, 4 + ORDER BY 1, 2, 3, 4 + """) + df['t'] = pd.to_datetime(df['t']) + df['plan_type'] = df['plan_type'].astype(str) + return df + + df_raw = load_subscription_data() + + return (df_raw,) + + + +def _(df_raw): + df_raw + return + + + +def _(df_raw): + # Sanity check + # Should be around $25m for all services from March 2023 onward + # Should be around $37m for all services from January 2022 onward + + print('Total revenue = ${:,.2f}'.format(sum(df_raw['revenue']))) + return + + + +def _(df_raw): + # List of products + + for product_name in df_raw['product_name'].unique(): + print(product_name) + return + + + +def _(df_raw): + df_raw.groupby(['product_name', 'plan_type']).aggregate({ + 'revenue': 'sum', + 'accounts': 'sum', + }).sort_values(by=['revenue'], ascending=False) + return + + + +def _(MOZILLA_VPN, df_raw): + df_service = df_raw[(df_raw['product_name'] == MOZILLA_VPN) & (df_raw['plan_type'] == '12')].reset_index(drop=True) + df_service['t'].min() + return + + + +def _( + DB_END_DATE, + MONTHLY_PLAN, + MOZILLA_VPN, + PLAN_TYPES, + YEARLY_PLAN, + df_raw, + pd, +): + # Align two years of data to compute year-of-year ratio + def compare_current_vs_previous_periods(df_raw: pd.DataFrame, product_name: str, plan_type: int) -> pd.DataFrame: + print('product_name = {}'.format(product_name)) + print('plan_type = {}'.format(plan_type)) + assert(plan_type in PLAN_TYPES) + df_service = df_raw[(df_raw['product_name'] == product_name) & (df_raw['plan_type'] == plan_type)] + + # Baseline period + current_end_date = pd.to_datetime(DB_END_DATE) - pd.Timedelta(days=1) + + # Reference or "previous" period + if plan_type == YEARLY_PLAN: + current_start_date = df_service['t'].min() + pd.DateOffset(years=1) + previous_start_date = current_start_date - pd.DateOffset(years=1) + previous_end_date = current_end_date - pd.DateOffset(years=1) + else: + current_start_date = df_service['t'].min() + pd.DateOffset(months=1) + previous_start_date = current_start_date - pd.DateOffset(months=1) + previous_end_date = current_end_date - pd.DateOffset(months=1) + + # Extract data for current and previous periods + df_current = df_service[df_service['t'].between(current_start_date, current_end_date)].reset_index(drop=True) + df_previous = df_service[df_service['t'].between(previous_start_date, previous_end_date)].reset_index(drop=True) + + # Split current period into new subscriptions and renewals + df_current_new = df_current[df_current['is_renewal'] == False].drop(columns='is_renewal').reset_index(drop=True) + df_current_renewal = df_current[df_current['is_renewal'] == True].drop(columns='is_renewal').reset_index(drop=True) + print('Current period new subscription = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_current_new['revenue']), + df_current_new['t'].nunique(), + min(df_current_new['t']), + max(df_current_new['t']) + )) + print(' Current period renewals = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_current_renewal['revenue']), + df_current_renewal['t'].nunique(), + min(df_current_renewal['t']), + max(df_current_renewal['t']) + )) + + # Count all previous period + df_previous_all = df_previous.groupby(['t']).agg({ + 'revenue': 'sum', + 'accounts': 'sum', + 'product_name': 'first', + 'plan_type': 'first', + }).reset_index() + print(' Previous period = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_previous_all['revenue']), + df_previous_all['t'].nunique(), + min(df_previous_all['t']), + max(df_previous_all['t']) + )) + + # Shift previous period onto the current period + if plan_type == YEARLY_PLAN: + df_previous_all['t'] = df_previous_all['t'].apply(lambda t: t + pd.DateOffset(years=1)) + else: + df_previous_all['t'] = df_previous_all['t'].apply(lambda t: t + pd.DateOffset(months=1)) + df_previous_all = df_previous_all.groupby(['t']).agg({ + 'revenue': 'sum', + 'accounts': 'sum', + 'product_name': 'first', + 'plan_type': 'first', + }).reset_index() + print(' Previous period shifted = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_previous_all['revenue']), + df_previous_all['t'].nunique(), + min(df_previous_all['t']), + max(df_previous_all['t']) + )) + + # Merge into a single table + df_current_new.rename(columns={ + 'revenue': 'current_new_revenue', + 'accounts': 'current_new_accounts', + }, inplace=True) + df_current_renewal.rename(columns={ + 'revenue': 'current_renewal_revenue', + 'accounts': 'current_renewal_accounts', + }, inplace=True) + df_previous_all.rename(columns={ + 'revenue': 'previous_all_revenue', + 'accounts': 'previous_all_accounts', + }, inplace=True) + df = pd.merge(df_current_renewal, df_current_new, on=['t', 'product_name', 'plan_type'], how='inner') + df = pd.merge(df, df_previous_all, on=['t', 'product_name', 'plan_type'], how='inner') + + # Fill in missing values + all_dates = pd.date_range(start=current_start_date, end=current_end_date) + all_date_indexes = pd.Index(all_dates, name='t') + df = df.set_index('t').reindex(all_date_indexes).fillna({ + 'product_name': product_name, + 'plan_type': plan_type, + 'current_new_revenue': 0.0, + 'current_new_accounts': 0.0, + 'current_renewal_revenue': 0.0, + 'current_renewal_accounts': 0.0, + 'previous_all_revenue': 0.0, + 'previous_all_accounts': 0.0, + }).reset_index() + df = df.sort_values(by='t') + + # Calculate % renewal rate + df['renewal_weight'] = 365 * (df['current_renewal_accounts'] + df['previous_all_accounts']) / (sum(df['current_renewal_accounts']) + sum(df['previous_all_accounts'])) + df['renewal_ratio'] = df['current_renewal_accounts'] / df['previous_all_accounts'] + print('Renewal rate: {:.4f}%'.format(100 / 365 * sum(df['renewal_ratio'] * df['renewal_weight']))) + return df + + df_compare_vpn_yearly = compare_current_vs_previous_periods(df_raw, MOZILLA_VPN, YEARLY_PLAN) + df_compare_vpn_monthly = compare_current_vs_previous_periods(df_raw, MOZILLA_VPN, MONTHLY_PLAN) + return ( + compare_current_vs_previous_periods, + df_compare_vpn_monthly, + df_compare_vpn_yearly, + ) + + + +def _(df_compare_vpn_yearly): + df_compare_vpn_yearly + return + + + +def _(df_compare_vpn_monthly): + df_compare_vpn_monthly + return + + + +def _(PRODUCTS, compare_current_vs_previous_periods, df_raw, pd): + def compare_all_services(df_raw: pd.DataFrame) -> pd.DataFrame: + df_all = pd.DataFrame() + for (product_name, plan_type) in PRODUCTS: + df = compare_current_vs_previous_periods(df_raw, product_name, plan_type) + df_all = pd.concat([df_all, df], ignore_index=True) + return df_all + + df_compare_all = compare_all_services(df_raw) + return (df_compare_all,) + + + +def _(df_compare_all): + df_compare_all + return + + + +def _(OUTPUT_TSV, df_compare_all): + df_compare_all.to_csv(OUTPUT_TSV, sep='\t', index=False) + return + + + +def _(): + return + + +if __name__ == "__main__": + app.run() \ No newline at end of file diff --git a/jobs/revenue-forecasting/03_test.py b/jobs/revenue-forecasting/03_test.py new file mode 100644 index 00000000..210d82f1 --- /dev/null +++ b/jobs/revenue-forecasting/03_test.py @@ -0,0 +1,218 @@ +from google.cloud import bigquery +import numpy as np +import pandas as pd + +def main(): + SQL_TABLE = 'mozdata.revenue_cat2_analysis.daily_active_logical_subscriptions_rebuilt_20250611' + YEARLY_PLAN = '12' + MONTHLY_PLAN = '1' + PLAN_TYPES = [YEARLY_PLAN, MONTHLY_PLAN] + + MOZILLA_VPN = 'Mozilla VPN' + MOZILLA_MONITOR = 'Mozilla Monitor Plus' + PRODUCTS = [ + (MOZILLA_VPN, YEARLY_PLAN), + (MOZILLA_VPN, MONTHLY_PLAN), + (MOZILLA_MONITOR, YEARLY_PLAN), + (MOZILLA_MONITOR, MONTHLY_PLAN), + ('Relay Premium', YEARLY_PLAN), + ('Relay Premium', MONTHLY_PLAN), + ('Mozilla VPN & Firefox Relay', YEARLY_PLAN), + ('MDN Plus', YEARLY_PLAN), + ('MDN Plus', MONTHLY_PLAN), + ] + + DB_START_DATE = '2022-01-01' + DB_END_DATE = '2025-06-01' + OUTPUT_TSV = '03_compare_with_previous_period.tsv' + + client = bigquery.Client('mozdata') + + def run(sql_statement: str) -> pd.DataFrame: + query = client.query(sql_statement) + return query.result().to_dataframe() + + # Load subscription data + def load_subscription_data() -> pd.DataFrame: + df = run(f""" + SELECT + product_name AS product_name, + plan_interval_months AS plan_type, + active_date AS t, + is_renewed_subscription AS is_renewal, + SUM(total_period_amount_usd) AS revenue, + COUNT(*) AS accounts + FROM `{SQL_TABLE}` + WHERE + DATE('{DB_START_DATE}') <= active_date AND is_first_day_of_period + GROUP BY 1, 2, 3, 4 + ORDER BY 1, 2, 3, 4 + """) + df['t'] = pd.to_datetime(df['t']) + df['plan_type'] = df['plan_type'].astype(str) + return df + + df_raw = load_subscription_data() + + # # Print summary information + # print('Total revenue = ${:,.2f}'.format(sum(df_raw['revenue']))) + + # print("\nList of products:") + # for product_name in df_raw['product_name'].unique(): + # print(product_name) + + # print("\nRevenue by product and plan type:") + # print(df_raw.groupby(['product_name', 'plan_type']).aggregate({ + # 'revenue': 'sum', + # 'accounts': 'sum', + # }).sort_values(by=['revenue'], ascending=False)) + + # Align two years of data to compute year-of-year ratio + def compare_current_vs_previous_periods(df_raw: pd.DataFrame, product_name: str, plan_type: int) -> pd.DataFrame: + print(f'\nproduct_name = {product_name}') + print(f'plan_type = {plan_type}') + assert plan_type in PLAN_TYPES + df_service = df_raw[(df_raw['product_name'] == product_name) & (df_raw['plan_type'] == plan_type)] + + if df_service.empty: + print(f"No data found for {product_name} with plan type {plan_type}") + return pd.DataFrame() + + # Baseline period + current_end_date = pd.to_datetime(DB_END_DATE) - pd.Timedelta(days=1) + + # Reference or "previous" period + if plan_type == YEARLY_PLAN: + current_start_date = df_service['t'].min() + pd.DateOffset(years=1) + previous_start_date = current_start_date - pd.DateOffset(years=1) + previous_end_date = current_end_date - pd.DateOffset(years=1) + else: + current_start_date = df_service['t'].min() + pd.DateOffset(months=1) + previous_start_date = current_start_date - pd.DateOffset(months=1) + previous_end_date = current_end_date - pd.DateOffset(months=1) + + # Extract data for current and previous periods + df_current = df_service[df_service['t'].between(current_start_date, current_end_date)].reset_index(drop=True) + df_previous = df_service[df_service['t'].between(previous_start_date, previous_end_date)].reset_index(drop=True) + + # Split current period into new subscriptions and renewals + df_current_new = df_current[df_current['is_renewal'] == False].drop(columns='is_renewal').reset_index(drop=True) + df_current_renewal = df_current[df_current['is_renewal'] == True].drop(columns='is_renewal').reset_index(drop=True) + + # print('Current period new subscription = ${:>14,.2f} in {} days from {} to {}'.format( + # sum(df_current_new['revenue']), + # df_current_new['t'].nunique(), + # min(df_current_new['t']), + # max(df_current_new['t']) + # )) + # print(' Current period renewals = ${:>14,.2f} in {} days from {} to {}'.format( + # sum(df_current_renewal['revenue']), + # df_current_renewal['t'].nunique(), + # min(df_current_renewal['t']), + # max(df_current_renewal['t']) + # )) + + # Count all previous period + df_previous_all = df_previous.groupby(['t']).agg({ + 'revenue': 'sum', + 'accounts': 'sum', + 'product_name': 'first', + 'plan_type': 'first', + }).reset_index() + print(' Previous period = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_previous_all['revenue']), + df_previous_all['t'].nunique(), + min(df_previous_all['t']), + max(df_previous_all['t']) + )) + + # Shift previous period onto the current period + if plan_type == YEARLY_PLAN: + df_previous_all['t'] = df_previous_all['t'].apply(lambda t: t + pd.DateOffset(years=1)) + else: + df_previous_all['t'] = df_previous_all['t'].apply(lambda t: t + pd.DateOffset(months=1)) + df_previous_all = df_previous_all.groupby(['t']).agg({ + 'revenue': 'sum', + 'accounts': 'sum', + 'product_name': 'first', + 'plan_type': 'first', + }).reset_index() + print(' Previous period shifted = ${:>14,.2f} in {} days from {} to {}'.format( + sum(df_previous_all['revenue']), + df_previous_all['t'].nunique(), + min(df_previous_all['t']), + max(df_previous_all['t']) + )) + + # Merge into a single table + df_current_new.rename(columns={ + 'revenue': 'current_new_revenue', + 'accounts': 'current_new_accounts', + }, inplace=True) + df_current_renewal.rename(columns={ + 'revenue': 'current_renewal_revenue', + 'accounts': 'current_renewal_accounts', + }, inplace=True) + df_previous_all.rename(columns={ + 'revenue': 'previous_all_revenue', + 'accounts': 'previous_all_accounts', + }, inplace=True) + + df = pd.merge(df_current_renewal, df_current_new, on=['t', 'product_name', 'plan_type'], how='inner') + df = pd.merge(df, df_previous_all, on=['t', 'product_name', 'plan_type'], how='inner') + + # Fill in missing values + all_dates = pd.date_range(start=current_start_date, end=current_end_date) + all_date_indexes = pd.Index(all_dates, name='t') + df = df.set_index('t').reindex(all_date_indexes).fillna({ + 'product_name': product_name, + 'plan_type': plan_type, + 'current_new_revenue': 0.0, + 'current_new_accounts': 0.0, + 'current_renewal_revenue': 0.0, + 'current_renewal_accounts': 0.0, + 'previous_all_revenue': 0.0, + 'previous_all_accounts': 0.0, + }).reset_index() + df = df.sort_values(by='t') + + # Calculate % renewal rate + df['renewal_weight'] = 365 * (df['current_renewal_accounts'] + df['previous_all_accounts']) / (sum(df['current_renewal_accounts']) + sum(df['previous_all_accounts'])) + df['renewal_ratio'] = df['current_renewal_accounts'] / df['previous_all_accounts'] + print('Renewal rate: {:.4f}%'.format(100 / 365 * sum(df['renewal_ratio'] * df['renewal_weight']))) + + return df + + # Compare VPN yearly and monthly + df_compare_vpn_yearly = compare_current_vs_previous_periods(df_raw, MOZILLA_VPN, YEARLY_PLAN) + df_compare_vpn_monthly = compare_current_vs_previous_periods(df_raw, MOZILLA_VPN, MONTHLY_PLAN) + + # # Display results + # if not df_compare_vpn_yearly.empty: + # print("\nVPN Yearly Comparison:") + # print(df_compare_vpn_yearly.head()) + + # if not df_compare_vpn_monthly.empty: + # print("\nVPN Monthly Comparison:") + # print(df_compare_vpn_monthly.head()) + + # Compare all services + def compare_all_services(df_raw: pd.DataFrame) -> pd.DataFrame: + df_all = pd.DataFrame() + for (product_name, plan_type) in PRODUCTS: + df = compare_current_vs_previous_periods(df_raw, product_name, plan_type) + if not df.empty: + df_all = pd.concat([df_all, df], ignore_index=True) + return df_all + + df_compare_all = compare_all_services(df_raw) + + # Save results to TSV + if not df_compare_all.empty: + df_compare_all.to_csv(OUTPUT_TSV, sep='\t', index=False) + print(f"\nResults saved to {OUTPUT_TSV}") + else: + print("\nNo comparison data was generated") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/jobs/revenue-forecasting/05_upload_data.py b/jobs/revenue-forecasting/05_upload_data.py new file mode 100644 index 00000000..55baf456 --- /dev/null +++ b/jobs/revenue-forecasting/05_upload_data.py @@ -0,0 +1,36 @@ +# from google.cloud import bigquery +# import pandas as pd +import os +print(os.getenv('GOOGLE_APPLICATION_CREDENTIALS')) +# client = bigquery.Client('mozdata') + +# def run(sql_statement: str) -> pd.DataFrame: +# query = client.query(sql_statement) +# return query.result().to_dataframe() + +# # Load data from disk +# df_all_models = pd.read_csv('04_all_models.tsv', sep='\t') +# df_all_models['plan_type'] = df_all_models['plan_type'].astype(str) + +# df = df_all_models.copy() +# df['new_revenue_today'] = df['total_revenue'].combine_first(df['total_revenue_truth']) +# df['new_accounts_today'] = df['total_accounts'].combine_first(df['total_accounts_truth']) +# #df = df[df['model_version'] == '2025-05-01'] + +# # Retain only relevant fields +# df = df[['model_version', 't', 'product_name', 'plan_type', 'new_revenue_today', 'new_accounts_today']].reset_index(drop=True) + + +# job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE) +# job = client.load_table_from_dataframe(df, 'mozdata.revenue_cat2_analysis.subscription_services_v3_models', job_config=job_config) +# job.result() + +# with open('05b_subscription_services_v3_forecasting.sql') as f: +# sql2 = f.read() + +# run(sql2) + +# with open('05c_subscription_services_v3_forecasting_ltv.sql') as f: +# sql3 = f.read() + +# run(sql3) \ No newline at end of file diff --git a/jobs/revenue-forecasting/05b_subscription_services_v3_forecasting.sql b/jobs/revenue-forecasting/05b_subscription_services_v3_forecasting.sql new file mode 100644 index 00000000..637c5b06 --- /dev/null +++ b/jobs/revenue-forecasting/05b_subscription_services_v3_forecasting.sql @@ -0,0 +1,48 @@ +CREATE OR REPLACE TABLE mozdata.revenue_cat2_analysis.subscription_services_v3_forecasting AS ( + + WITH expanded AS ( + SELECT + DATE(model_version) AS model_version, + product_name, + DATE(t) AS original_date, + DATE_ADD(DATE(t), INTERVAL OFFSET DAY) AS t, + new_revenue_today / 365.0 AS daily_amortized_revenue, + new_accounts_today AS daily_amortized_accounts + FROM mozdata.revenue_cat2_analysis.subscription_services_v3_models, UNNEST(GENERATE_ARRAY(0, 364)) AS offset + WHERE plan_type = '12' + + UNION ALL + + SELECT + DATE(model_version) AS model_version, + product_name, + DATE(t) AS original_date, + DATE_ADD(DATE(t), INTERVAL OFFSET DAY) AS t, + new_revenue_today / 30.0 AS daily_amortized_revenue, + new_accounts_today AS daily_amortized_accounts + FROM mozdata.revenue_cat2_analysis.subscription_services_v3_models, UNNEST(GENERATE_ARRAY(0, 30)) AS offset + WHERE plan_type = '1' + + ), + + amortized AS ( + SELECT + model_version, + product_name, + t, + SUM(daily_amortized_revenue) AS amortized_revenue, + SUM(daily_amortized_accounts) AS amortized_accounts + FROM expanded + GROUP BY product_name, model_version, t + ) + + SELECT + model_version, + product_name, + t, + amortized_revenue, + amortized_accounts + FROM amortized + WHERE t >= model_version + ORDER BY model_version, product_name, t +); diff --git a/jobs/revenue-forecasting/05c_subscription_services_v3_forecasting_ltv.sql b/jobs/revenue-forecasting/05c_subscription_services_v3_forecasting_ltv.sql new file mode 100644 index 00000000..a7c43c52 --- /dev/null +++ b/jobs/revenue-forecasting/05c_subscription_services_v3_forecasting_ltv.sql @@ -0,0 +1,16 @@ +CREATE OR REPLACE TABLE mozdata.revenue_cat2_analysis.subscription_services_v3_forecasting_ltv AS ( + SELECT + model_version, + product_name, + t, + amortized_revenue, + amortized_accounts, + SUM(amortized_revenue) OVER ( + PARTITION BY model_version, product_name + ORDER BY t + ROWS BETWEEN CURRENT ROW AND 729 FOLLOWING + ) AS ltv_revenue + + FROM mozdata.revenue_cat2_analysis.subscription_services_v3_forecasting + ORDER BY model_version, product_name, t +); \ No newline at end of file diff --git a/jobs/revenue-forecasting/Dockerfile b/jobs/revenue-forecasting/Dockerfile new file mode 100644 index 00000000..d64fc969 --- /dev/null +++ b/jobs/revenue-forecasting/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.8 +LABEL maintainer="Jason Chuang " + +# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md +ARG USER_ID="10001" +ARG GROUP_ID="app" +ARG HOME="/app" + +ENV HOME=${HOME} +RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ + useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} + +WORKDIR ${HOME} + +RUN pip install --upgrade pip + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +RUN pip install . + +# Drop root and change ownership of the application folder to the user +RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} +USER ${USER_ID} diff --git a/jobs/revenue-forecasting/README.md b/jobs/revenue-forecasting/README.md new file mode 100644 index 00000000..0688fb07 --- /dev/null +++ b/jobs/revenue-forecasting/README.md @@ -0,0 +1,45 @@ +# Python Template Job + +This is an example of a dockerized Python job. + +## Usage + +This script is intended to be run in a docker container. +Build the docker image with: + +```sh +docker build -t python-template-job . +``` + +To run locally, install dependencies with: + +```sh +pip install -r requirements.txt +``` + +Run the script with + +```sh +python3 -m python_template_job.main +``` + +## Development + +Run tests with: + +```sh +pytest +``` + +`flake8` and `black` are included for code linting and formatting: + +```sh +pytest --black --flake8 +``` + +or + +```sh +flake8 python_template_job/ tests/ +black --diff python_template_job/ tests/ +``` diff --git a/jobs/revenue-forecasting/ci_job.yaml b/jobs/revenue-forecasting/ci_job.yaml new file mode 100644 index 00000000..441e8c93 --- /dev/null +++ b/jobs/revenue-forecasting/ci_job.yaml @@ -0,0 +1,15 @@ +build-job-revenue-forecasting: + docker: + - image: << pipeline.parameters.git-image >> + steps: + - checkout + - compare-branch: + pattern: ^jobs/revenue-forecasting/ + - setup_remote_docker: + version: << pipeline.parameters.docker-version >> + - run: + name: Build Docker image + command: docker build -t app:build jobs/revenue-forecasting/ + - run: + name: Test Code + command: docker run app:build pytest --flake8 --black \ No newline at end of file diff --git a/jobs/revenue-forecasting/ci_workflow.yaml b/jobs/revenue-forecasting/ci_workflow.yaml new file mode 100644 index 00000000..55440878 --- /dev/null +++ b/jobs/revenue-forecasting/ci_workflow.yaml @@ -0,0 +1,13 @@ +job-revenue-forecasting: + jobs: + - build-job-revenue-forecasting + - gcp-gcr/build-and-push-image: + context: data-eng-airflow-gcr + docker-context: jobs/revenue-forecasting/ + path: jobs/revenue-forecasting/ + image: revenue-forecasting_docker_etl + requires: + - build-job-revenue-forecasting + filters: + branches: + only: main \ No newline at end of file diff --git a/jobs/revenue-forecasting/docker-compose.yml b/jobs/revenue-forecasting/docker-compose.yml new file mode 100644 index 00000000..693b8f52 --- /dev/null +++ b/jobs/revenue-forecasting/docker-compose.yml @@ -0,0 +1,17 @@ +version: "3.4" + +services: + app: + build: + context: . + volumes: + - ./:/app + # Mount the local gcloud sdk configuration when developing. Note that this + # will modify the contents on the host. + - /Users/plee/.config/gcloud/application_default_credentials.json:/gcp/creds.json + environment: + - CLOUDSDK_CONFIG=/tmp/.config/gcloud + - CLOUDSDK_CORE_PROJECT + - GOOGLE_APPLICATION_CREDENTIALS=/gcp/creds.json + command: > + sh -c "python 05_upload_data.py" diff --git a/jobs/revenue-forecasting/pytest.ini b/jobs/revenue-forecasting/pytest.ini new file mode 100644 index 00000000..e618d7a5 --- /dev/null +++ b/jobs/revenue-forecasting/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = + tests diff --git a/jobs/revenue-forecasting/requirements.txt b/jobs/revenue-forecasting/requirements.txt new file mode 100644 index 00000000..bc7600a7 --- /dev/null +++ b/jobs/revenue-forecasting/requirements.txt @@ -0,0 +1,6 @@ +click==8.1.3 +google-api-core==2.25.1 +google-auth==2.40.3 +google-cloud-bigquery>=3.0.0 +google-cloud-core==2.4.3 +pandas>=2.0.0 diff --git a/jobs/revenue-forecasting/revenue_forecasting/main.py b/jobs/revenue-forecasting/revenue_forecasting/main.py new file mode 100644 index 00000000..65826be7 --- /dev/null +++ b/jobs/revenue-forecasting/revenue_forecasting/main.py @@ -0,0 +1,11 @@ +import click + + +@click.command() +@click.option("--arg", default="test") +def main(arg): + print(arg) + + +if __name__ == "__main__": + main() diff --git a/jobs/revenue-forecasting/setup.py b/jobs/revenue-forecasting/setup.py new file mode 100644 index 00000000..0557ae75 --- /dev/null +++ b/jobs/revenue-forecasting/setup.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +readme = open("README.md").read() + +setup( + name="docker-etl-job", # TODO: change placeholder name + version="0.1.0", + author="Mozilla Corporation", + packages=find_packages(include=["docker_etl"]), # TODO: change placeholder name + long_description=readme, + include_package_data=True, + license="MPL 2.0", +) diff --git a/jobs/revenue-forecasting/tests/__init__.py b/jobs/revenue-forecasting/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/jobs/revenue-forecasting/tests/test_main.py b/jobs/revenue-forecasting/tests/test_main.py new file mode 100644 index 00000000..55ee4239 --- /dev/null +++ b/jobs/revenue-forecasting/tests/test_main.py @@ -0,0 +1,11 @@ +import pytest + + +@pytest.fixture +def example_dependency(): + return "test" + + +class TestMain: + def test_something(self, example_dependency): + assert example_dependency == "test"