diff --git a/jobs/webcompat-kb/data/sql/webcompat_knowledge_base/tables/webcompat_topline_metric_daily/meta.toml b/jobs/webcompat-kb/data/sql/webcompat_knowledge_base/tables/webcompat_topline_metric_daily/meta.toml index a7afa028..6e847029 100644 --- a/jobs/webcompat-kb/data/sql/webcompat_knowledge_base/tables/webcompat_topline_metric_daily/meta.toml +++ b/jobs/webcompat-kb/data/sql/webcompat_knowledge_base/tables/webcompat_topline_metric_daily/meta.toml @@ -1,2 +1,2 @@ name = "webcompat_topline_metric_daily" -etl = ["metric", "metric-backfill"] \ No newline at end of file +etl = ["metric"] \ No newline at end of file diff --git a/jobs/webcompat-kb/pyproject.toml b/jobs/webcompat-kb/pyproject.toml index 2103b806..d56617c9 100644 --- a/jobs/webcompat-kb/pyproject.toml +++ b/jobs/webcompat-kb/pyproject.toml @@ -48,6 +48,7 @@ test = [ [project.scripts] webcompat-etl = "webcompat_kb.main:main" webcompat-backfill-history = "webcompat_kb.utils:backfill_history" +webcompat-backfill-metric = "webcompat_kb.commands.backfill_metric:main" webcompat-check-templates = "webcompat_kb.commands.checkdata:main" webcompat-render = "webcompat_kb.commands.render:main" webcompat-validate = "webcompat_kb.commands.validate:main" diff --git a/jobs/webcompat-kb/test.sh b/jobs/webcompat-kb/test.sh index 9f37b049..a295ad49 100755 --- a/jobs/webcompat-kb/test.sh +++ b/jobs/webcompat-kb/test.sh @@ -5,4 +5,4 @@ set -ex uv sync --extra=test uv run mypy webcompat_kb uv run pytest --ruff --ruff-format . -uv run webcompat-check-templates --bq-project-id="moz-fx-dev-dschubert-wckb" +uv run webcompat-check-templates --bq-project="moz-fx-dev-dschubert-wckb" diff --git a/jobs/webcompat-kb/webcompat_kb/base.py b/jobs/webcompat-kb/webcompat_kb/base.py index 479879fe..2210f213 100644 --- a/jobs/webcompat-kb/webcompat_kb/base.py +++ b/jobs/webcompat-kb/webcompat_kb/base.py @@ -1,10 +1,12 @@ import argparse +import logging import re import pathlib import os +import sys from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, MutableMapping +from typing import Any, MutableMapping, Optional from .bqhelpers import BigQuery, SchemaId from .config import Config @@ -39,6 +41,80 @@ def dataset_arg(value: str) -> str: return value +class Command(ABC): + def argument_parser(self) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument( + "--log-level", + choices=["debug", "info", "warn", "error"], + default="info", + help="Log level", + ) + + parser.add_argument( + "--bq-project", + dest="bq_project_id", + type=project_arg, + help="BigQuery project id", + ) + + parser.add_argument( + "--data-path", + action="store", + type=pathlib.Path, + default=DEFAULT_DATA_DIR, + help="Path to directory containing sql to deploy", + ) + + parser.add_argument( + "--stage", + action="store_true", + help="Write to staging location (currently same project with _test suffix on dataset names)", + ) + + parser.add_argument( + "--no-write", + dest="write", + action="store_false", + default=True, + help="Don't write updates to BigQuery", + ) + + parser.add_argument( + "--github-token", + default=os.environ.get("GH_TOKEN"), + help="GitHub token", + ) + + parser.add_argument( + "--pdb", action="store_true", help="Drop into debugger on execption" + ) + return parser + + @abstractmethod + def main(self, args: argparse.Namespace) -> Optional[int]: ... + + def __call__(self) -> None: + parser = self.argument_parser() + args = parser.parse_args() + + logging.basicConfig() + log_level = args.log_level.upper() if "log_level" in args else "INFO" + logging.getLogger().setLevel(logging.getLevelNamesMapping()[log_level]) + + try: + rv = self.main(args) + except Exception: + if "pdb" in args and args.pdb: + import pdb + + pdb.post_mortem() + else: + raise + if rv: + sys.exit(rv) + + @dataclass class Context: args: argparse.Namespace diff --git a/jobs/webcompat-kb/webcompat_kb/commands/backfill_metric.py b/jobs/webcompat-kb/webcompat_kb/commands/backfill_metric.py new file mode 100644 index 00000000..bdb83fdf --- /dev/null +++ b/jobs/webcompat-kb/webcompat_kb/commands/backfill_metric.py @@ -0,0 +1,104 @@ +import argparse +import logging +import os +from typing import Optional + +from .. import projectdata +from ..base import Command +from ..bqhelpers import BigQuery, DatasetId, get_client +from ..config import Config +from ..projectdata import Project + + +def backfill_metric_daily( + project: Project, + client: BigQuery, + write: bool, + metric_name: str, +) -> None: + metric_dfns, metric_types = project.data.metric_dfns, project.data.metric_types + daily_metric_types = [ + metric_type for metric_type in metric_types if "daily" in metric_type.contexts + ] + + metric = None + for metric in metric_dfns: + if metric.name == metric_name: + break + else: + raise ValueError(f"Metric named {metric_name} not found") + + select_fields = [] + field_names = [] + conditions = [] + for metric_type in daily_metric_types: + field_name = f"{metric_type.name}_{metric.name}" + field_names.append(field_name) + select_fields.append( + f"{metric_type.agg_function('bugs', metric)} AS {field_name}" + ) + conditions.append(f"metric_daily.{field_name} IS NULL") + select_query = f""" +SELECT + date, + {",\n ".join(select_fields)} +FROM + `{project["webcompat_knowledge_base"]["scored_site_reports"]}` AS bugs + JOIN `{project["webcompat_knowledge_base"]["webcompat_topline_metric_daily"]}` as metric_daily +ON + DATE(bugs.creation_time) <= metric_daily.date + AND IF (bugs.resolved_time IS NOT NULL, DATE(bugs.resolved_time) >= date, TRUE) +WHERE + {metric.condition("bugs")} AND {" AND ".join(conditions)} +GROUP BY + date +ORDER BY date""" + + update_query = f""" +UPDATE `{project["webcompat_knowledge_base"]["webcompat_topline_metric_daily"]}` AS metric_daily +SET + {",\n ".join(f"metric_daily.{field_name}=new_data.{field_name}" for field_name in field_names)} +FROM ({select_query}) AS new_data +WHERE new_data.date = metric_daily.date +""" + + if write: + result = client.query(update_query) + else: + logging.info(f"Would run query:\n{update_query}") + result = client.query(select_query) + logging.info(f"Would set {list(result)}") + + +class BackfillMetric(Command): + def argument_parser(self) -> argparse.ArgumentParser: + parser = super().argument_parser() + parser.add_argument("metric", action="store", help="Metric name to update") + return parser + + def main(self, args: argparse.Namespace) -> Optional[int]: + client = get_client(args.bq_project_id) + config = Config(write=args.write, stage=args.stage) + project = projectdata.load( + client, args.bq_project_id, os.path.normpath(args.data_path), set(), config + ) + if args.metric not in project.data.metric_dfns: + raise ValueError(f"Unknown metric {args.metric}") + + bq_client = BigQuery( + client, + DatasetId(args.bq_project_id, "webcompat_knowledge_base"), + args.write, + None, + ) + + backfill_metric_daily( + project, + bq_client, + config.write, + args.metric, + ) + return None + + +main = BackfillMetric() diff --git a/jobs/webcompat-kb/webcompat_kb/commands/checkdata.py b/jobs/webcompat-kb/webcompat_kb/commands/checkdata.py index 43505980..54b859e6 100644 --- a/jobs/webcompat-kb/webcompat_kb/commands/checkdata.py +++ b/jobs/webcompat-kb/webcompat_kb/commands/checkdata.py @@ -1,11 +1,10 @@ import argparse import logging import os -import pathlib -import sys +from typing import Optional from .. import projectdata -from ..base import ALL_JOBS, DEFAULT_DATA_DIR +from ..base import ALL_JOBS, Command from ..bqhelpers import get_client from ..config import Config from ..projectdata import lint_templates @@ -15,26 +14,15 @@ here = os.path.dirname(__file__) -def main() -> None: - parser = argparse.ArgumentParser() - parser.add_argument("--bq-project-id", action="store", help="BigQuery project ID") - parser.add_argument("--pdb", action="store_true", help="Run debugger on failure") - parser.add_argument( - "--path", - action="store", - type=pathlib.Path, - default=DEFAULT_DATA_DIR, - help="Path to directory containing data", - ) - try: +class CheckData(Command): + def main(self, args: argparse.Namespace) -> Optional[int]: # This should be unused client = get_client("test") - args = parser.parse_args() project = projectdata.load( client, args.bq_project_id, - os.path.normpath(args.path), + os.path.normpath(args.data_path), set(), Config(write=False, stage=False), ) @@ -43,17 +31,15 @@ def main() -> None: project.data.templates_by_dataset.values(), ): logging.error("Lint failed") - sys.exit(1) + return 1 try: creator = SchemaCreator(project) creator.create() - except Exception: - logging.error("Creating schemas failed") - raise - except Exception: - if args.pdb: - import pdb - - pdb.post_mortem() - raise + except Exception as e: + logging.error(f"Creating schemas failed: {e}") + return 1 + return None + + +main = CheckData() diff --git a/jobs/webcompat-kb/webcompat_kb/commands/validate.py b/jobs/webcompat-kb/webcompat_kb/commands/validate.py index 4bf47c89..8a5b8f25 100644 --- a/jobs/webcompat-kb/webcompat_kb/commands/validate.py +++ b/jobs/webcompat-kb/webcompat_kb/commands/validate.py @@ -1,43 +1,35 @@ import argparse import os -import pathlib -import sys +from typing import Optional from google.auth.exceptions import RefreshError + from .. import projectdata -from ..base import DEFAULT_DATA_DIR +from ..base import Command from ..config import Config from ..bqhelpers import BigQuery, DatasetId, SchemaId, SchemaType, get_client from ..update_schema import render_schemas -def main() -> None: - parser = argparse.ArgumentParser() - parser.add_argument("--bq-project-id", action="store", help="BigQuery project ID") - parser.add_argument( - "--default-dataset", - action="store", - default="webcompat_knowledge_base", - help="Default dataset name", - ) - parser.add_argument("--pdb", action="store_true", help="Run debugger on failure") - parser.add_argument( - "--data-path", - action="store", - type=pathlib.Path, - default=DEFAULT_DATA_DIR, - help="Path to directory containing data", - ) - parser.add_argument( - "schema_ids", - action="store", - nargs="*", - help="Schemas to render e.g. dataset.view_name", - ) - try: - args = parser.parse_args() +class Validate(Command): + def argument_parser(self) -> argparse.ArgumentParser: + parser = super().argument_parser() + parser.add_argument( + "--default-dataset", + action="store", + default="webcompat_knowledge_base", + help="Default dataset name", + ) + parser.add_argument( + "schema_ids", + action="store", + nargs="*", + help="Schemas to render e.g. dataset.view_name", + ) + return parser + def main(self, args: argparse.Namespace) -> Optional[int]: client = get_client(args.bq_project_id) project = projectdata.load( client, @@ -91,10 +83,9 @@ def main() -> None: else: print(" Validation succeeded") if not success: - sys.exit(1) - except Exception: - if args.pdb: - import pdb + return 1 + + return None + - pdb.post_mortem() - raise +main = Validate() diff --git a/jobs/webcompat-kb/webcompat_kb/main.py b/jobs/webcompat-kb/webcompat_kb/main.py index 8ca7df3d..4c4d7855 100644 --- a/jobs/webcompat-kb/webcompat_kb/main.py +++ b/jobs/webcompat-kb/webcompat_kb/main.py @@ -1,9 +1,7 @@ import argparse import logging import os -import pathlib -import sys -from typing import Iterable +from typing import Iterable, Optional # These imports are required to populate ALL_JOBS # Unhappily the ordering here is significant @@ -21,12 +19,11 @@ ) from .base import ( ALL_JOBS, + Command, Context, Config, - DEFAULT_DATA_DIR, EtlJob, dataset_arg, - project_arg, ) from .bqhelpers import get_client, BigQuery, DatasetId from . import projectdata @@ -35,111 +32,67 @@ here = os.path.dirname(__file__) -def get_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser() - parser.add_argument( - "--log-level", - choices=["debug", "info", "warn", "error"], - default="info", - help="Log level", - ) - - # Legacy argument names - parser.add_argument("--bq_project_id", help=argparse.SUPPRESS) - parser.add_argument("--bq_dataset_id", help=argparse.SUPPRESS) - - parser.add_argument( - "--bq-project", - dest="bq_project_id", - type=project_arg, - help="BigQuery project id", - ) - parser.add_argument( - "--bq-kb-dataset", type=dataset_arg, help="BigQuery knowledge base dataset id" - ) - parser.add_argument( - "--data-path", - action="store", - type=pathlib.Path, - default=DEFAULT_DATA_DIR, - help="Path to directory containing sql to deploy", - ) - - parser.add_argument( - "--stage", - action="store_true", - help="Write to staging location (currently same project with _test suffix on dataset names)", - ) - - parser.add_argument( - "--no-write", - dest="write", - action="store_false", - default=True, - help="Don't write updates to BigQuery", - ) - - parser.add_argument( - "--pdb", action="store_true", help="Drop into debugger on execption" - ) - parser.add_argument( - "--fail-on-error", action="store_true", help="Fail immediately if any job fails" - ) - - for job_cls in ALL_JOBS.values(): - job_cls.add_arguments(parser) - - parser.add_argument( - "jobs", - nargs="*", - choices=list(ALL_JOBS.keys()), - help=f"Jobs to run (defaults to {' '.join(name for name, cls in ALL_JOBS.items() if cls.default)})", - ) - - return parser - - -def validate_args( - parser: argparse.ArgumentParser, args: argparse.Namespace, jobs: Iterable[EtlJob] -) -> None: - required_args: set[str | tuple[str, str]] = {("bq_project_id", "--bq-project")} - for job in jobs: - required_args |= job.required_args() - - missing = [] - for arg in required_args: - if isinstance(arg, tuple): - prop_name, arg_name = arg - else: - prop_name = arg - arg_name = f"--{arg.replace('_', '-')}" - - if getattr(args, prop_name) is None: - missing.append(arg_name) - - if missing: - parser.print_usage() - logging.error(f"The following arguments are required {' '.join(missing)}") - sys.exit(1) - - -def main() -> None: - logging.basicConfig() - - parser = get_parser() - args = parser.parse_args() - failed = [] - try: - logging.getLogger().setLevel( - logging.getLevelNamesMapping()[args.log_level.upper()] +class EtlCommand(Command): + def argument_parser(self) -> argparse.ArgumentParser: + parser = super().argument_parser() + parser.add_argument( + "--fail-on-error", + action="store_true", + help="Fail immediately if any job fails", ) + + # Legacy: BigQuery knowledge base dataset id + parser.add_argument("--bq-kb-dataset", type=dataset_arg, help=argparse.SUPPRESS) + + # Legacy argument names + parser.add_argument("--bq_project_id", help=argparse.SUPPRESS) + parser.add_argument("--bq_dataset_id", help=argparse.SUPPRESS) + + for job_cls in ALL_JOBS.values(): + job_cls.add_arguments(parser) + + parser.add_argument( + "jobs", + nargs="*", + choices=list(ALL_JOBS.keys()), + help=f"Jobs to run (defaults to {' '.join(name for name, cls in ALL_JOBS.items() if cls.default)})", + ) + + return parser + + def validate_args(self, args: argparse.Namespace, jobs: Iterable[EtlJob]) -> bool: + required_args: set[str | tuple[str, str]] = {("bq_project_id", "--bq-project")} + for job in jobs: + required_args |= job.required_args() + + missing = [] + for arg in required_args: + if isinstance(arg, tuple): + prop_name, arg_name = arg + else: + prop_name = arg + arg_name = f"--{arg.replace('_', '-')}" + + if getattr(args, prop_name) is None: + missing.append(arg_name) + + if missing: + self.argument_parser().print_usage() + logging.error(f"The following arguments are required {' '.join(missing)}") + return False + + return True + + def main(self, args: argparse.Namespace) -> Optional[int]: + failed = [] jobs = ( {job_name: ALL_JOBS[job_name]() for job_name in args.jobs} if args.jobs else {name: cls() for name, cls in ALL_JOBS.items() if cls.default} ) - validate_args(parser, args, jobs.values()) + if not self.validate_args(args, jobs.values()): + return 1 config = Config(write=args.write, stage=args.stage) @@ -174,19 +127,15 @@ def main() -> None: raise failed.append(job_name) logging.error(e) - except Exception: - if args.pdb: - import pdb - import traceback - - traceback.print_exc() - pdb.post_mortem() - else: - raise - - if failed: - logging.error(f"{len(failed)} jobs failed: {', '.join(failed)}") - sys.exit(1) + + if failed: + logging.error(f"{len(failed)} jobs failed: {', '.join(failed)}") + return 1 + + return 0 + + +main = EtlCommand() if __name__ == "__main__": diff --git a/jobs/webcompat-kb/webcompat_kb/metric.py b/jobs/webcompat-kb/webcompat_kb/metric.py index 70da362a..f4378eea 100644 --- a/jobs/webcompat-kb/webcompat_kb/metric.py +++ b/jobs/webcompat-kb/webcompat_kb/metric.py @@ -1,4 +1,3 @@ -import argparse import logging from datetime import date @@ -109,66 +108,6 @@ def update_metric_daily(project: Project, client: BigQuery) -> None: ) -def backfill_metric_daily( - project: Project, - client: BigQuery, - write: bool, - metric_name: str, -) -> None: - metric_dfns, metric_types = project.data.metric_dfns, project.data.metric_types - daily_metric_types = [ - metric_type for metric_type in metric_types if "daily" in metric_type.contexts - ] - - metric = None - for metric in metric_dfns: - if metric.name == metric_name: - break - else: - raise ValueError(f"Metric named {metric_name} not found") - - select_fields = [] - field_names = [] - conditions = [] - for metric_type in daily_metric_types: - field_name = f"{metric_type.name}_{metric.name}" - field_names.append(field_name) - select_fields.append( - f"{metric_type.agg_function('bugs', metric)} AS {field_name}" - ) - conditions.append(f"metric_daily.{field_name} IS NULL") - select_query = f""" -SELECT - date, - {",\n ".join(select_fields)} -FROM - `{project["webcompat_knowledge_base"]["scored_site_reports"]}` AS bugs - JOIN `{project["webcompat_knowledge_base"]["webcompat_topline_metric_daily"]}` as metric_daily -ON - DATE(bugs.creation_time) <= metric_daily.date - AND IF (bugs.resolved_time IS NOT NULL, DATE(bugs.resolved_time) >= date, TRUE) -WHERE - {metric.condition("bugs")} AND {" AND ".join(conditions)} -GROUP BY - date -ORDER BY date""" - - update_query = f""" -UPDATE `{project["webcompat_knowledge_base"]["webcompat_topline_metric_daily"]}` AS metric_daily -SET - {",\n ".join(f"metric_daily.{field_name}=new_data.{field_name}" for field_name in field_names)} -FROM ({select_query}) AS new_data -WHERE new_data.date = metric_daily.date -""" - - if write: - result = client.query(update_query) - else: - logging.info(f"Would run query:\n{update_query}") - result = client.query(select_query) - logging.info(f"Would set {list(result)}") - - class MetricJob(EtlJob): name = "metric" @@ -184,31 +123,3 @@ def main(self, context: Context) -> None: context.project, context.bq_client, ) - - -class MetricBackfillJob(EtlJob): - name = "metric-backfill" - default = False - - @classmethod - def add_arguments(cls, parser: argparse.ArgumentParser) -> None: - group = parser.add_argument_group( - title="Metric Backfill", description="metric-backfill arguments" - ) - group.add_argument( - "--metric-backfill-metric", help="Name of the metric to backfill" - ) - - def required_args(self) -> set[str | tuple[str, str]]: - return {"metric_backfill_metric"} - - def default_dataset(self, context: Context) -> str: - return "webcompat_knowledge_base" - - def main(self, context: Context) -> None: - backfill_metric_daily( - context.project, - context.bq_client, - context.config.write, - context.args.metric_backfill_metric, - )