diff --git a/mlflow_export_import/bulk/export_experiments.py b/mlflow_export_import/bulk/export_experiments.py index 8b9712e2..4ca0cc5b 100644 --- a/mlflow_export_import/bulk/export_experiments.py +++ b/mlflow_export_import/bulk/export_experiments.py @@ -12,12 +12,14 @@ from mlflow_export_import.common.click_options import ( opt_experiments, + opt_import_model_artifacts, + opt_max_runs, opt_output_dir, opt_export_permissions, opt_notebook_formats, opt_run_start_time, opt_export_deleted_runs, - opt_use_threads + opt_use_threads, ) from mlflow_export_import.common import MlflowExportImportException from mlflow_export_import.common import utils, io_utils, mlflow_utils @@ -27,16 +29,19 @@ _logger = utils.getLogger(__name__) + def export_experiments( - experiments, - output_dir, - export_permissions = False, - run_start_time = None, - export_deleted_runs = False, - notebook_formats = None, - use_threads = False, - mlflow_client = None - ): + experiments, + output_dir, + import_model_artifacts=True, + max_runs=None, + export_permissions=False, + run_start_time=None, + export_deleted_runs=False, + notebook_formats=None, + use_threads=False, + mlflow_client=None, +): """ :param experiments: Can be either: - File (ending with '.txt') containing list of experiment names or IDS @@ -52,7 +57,7 @@ def export_experiments( max_workers = utils.get_threads(use_threads) experiments_arg = _convert_dict_keys_to_list(experiments) - if isinstance(experiments,str) and experiments.endswith(".txt"): + if isinstance(experiments, str) and experiments.endswith(".txt"): with open(experiments, "r", encoding="utf-8") as f: experiments = f.read().splitlines() table_data = experiments @@ -66,13 +71,15 @@ def export_experiments( columns = ["Experiment Name or ID"] experiments_dct = {} else: - experiments_dct = experiments # we passed in a dict + experiments_dct = experiments # we passed in a dict experiments = experiments.keys() - table_data = [ [exp_id,len(runs)] for exp_id,runs in experiments_dct.items() ] + table_data = [ + [exp_id, len(runs)] for exp_id, runs in experiments_dct.items() + ] num_runs = sum(x[1] for x in table_data) - table_data.append(["Total",num_runs]) + table_data.append(["Total", num_runs]) columns = ["Experiment ID", "# Runs"] - utils.show_table("Experiments",table_data,columns) + utils.show_table("Experiments", table_data, columns) _logger.info("") ok_runs = 0 @@ -82,16 +89,19 @@ def export_experiments( with ThreadPoolExecutor(max_workers=max_workers) as executor: for exp_id_or_name in experiments: run_ids = experiments_dct.get(exp_id_or_name, None) - future = executor.submit(_export_experiment, + future = executor.submit( + _export_experiment, mlflow_client, exp_id_or_name, output_dir, + import_model_artifacts, + max_runs, export_permissions, notebook_formats, export_results, run_start_time, export_deleted_runs, - run_ids + run_ids, ) futures.append(future) duration = round(time.time() - start_time, 1) @@ -116,42 +126,64 @@ def export_experiments( "run_start_time": run_start_time, "export_deleted_runs": export_deleted_runs, "notebook_formats": notebook_formats, - "use_threads": use_threads + "use_threads": use_threads, }, "status": { "duration": duration, "experiments": len(experiments), "total_runs": total_runs, "ok_runs": ok_runs, - "failed_runs": failed_runs - } + "failed_runs": failed_runs, + }, } - mlflow_attr = { "experiments": export_results } + mlflow_attr = {"experiments": export_results} # NOTE: Make sure we don't overwrite existing experiments.json generated by export_models when being called by export_all. # Merge this existing experiments.json with the new built by export_experiments. path = _fs.mk_local_path(os.path.join(output_dir, "experiments.json")) if os.path.exists(path): - from mlflow_export_import.bulk.experiments_merge_utils import merge_mlflow, merge_info + from mlflow_export_import.bulk.experiments_merge_utils import ( + merge_mlflow, + merge_info, + ) + root = io_utils.read_file(path) mlflow_attr = merge_mlflow(io_utils.get_mlflow(root), mlflow_attr) info_attr = merge_info(io_utils.get_info(root), info_attr) - info_attr["note"] = "Merged by export_all from export_models and export_experiments" + info_attr["note"] = ( + "Merged by export_all from export_models and export_experiments" + ) - io_utils.write_export_file(output_dir, "experiments.json", __file__, mlflow_attr, info_attr) + io_utils.write_export_file( + output_dir, "experiments.json", __file__, mlflow_attr, info_attr + ) _logger.info(f"{len(experiments)} experiments exported") _logger.info(f"{ok_runs}/{total_runs} runs succesfully exported") if failed_runs > 0: _logger.info(f"{failed_runs}/{total_runs} runs failed") - _logger.info(f"Duration for {len(experiments)} experiments export: {duration} seconds") + _logger.info( + f"Duration for {len(experiments)} experiments export: {duration} seconds" + ) return info_attr -def _export_experiment(mlflow_client, exp_id_or_name, output_dir, export_permissions, notebook_formats, export_results, - run_start_time, export_deleted_runs, run_ids): - ok_runs = -1; failed_runs = -1 +def _export_experiment( + mlflow_client, + exp_id_or_name, + output_dir, + import_model_artifacts, + max_runs, + export_permissions, + notebook_formats, + export_results, + run_start_time, + export_deleted_runs, + run_ids, +): + ok_runs = -1 + failed_runs = -1 exp_name = exp_id_or_name try: exp = mlflow_utils.get_experiment(mlflow_client, exp_id_or_name) @@ -159,42 +191,56 @@ def _export_experiment(mlflow_client, exp_id_or_name, output_dir, export_permiss exp_output_dir = os.path.join(output_dir, exp.experiment_id) start_time = time.time() ok_runs, failed_runs = export_experiment( - experiment_id_or_name = exp.experiment_id, - output_dir = exp_output_dir, - run_ids = run_ids, - export_permissions = export_permissions, - run_start_time = run_start_time, - export_deleted_runs = export_deleted_runs, - notebook_formats = notebook_formats, - mlflow_client = mlflow_client + experiment_id_or_name=exp.experiment_id, + output_dir=exp_output_dir, + run_ids=run_ids, + import_model_artifacts=import_model_artifacts, + max_runs=max_runs, + export_permissions=export_permissions, + run_start_time=run_start_time, + export_deleted_runs=export_deleted_runs, + notebook_formats=notebook_formats, + mlflow_client=mlflow_client, ) duration = round(time.time() - start_time, 1) result = { - "id" : exp.experiment_id, + "id": exp.experiment_id, "name": exp.name, "ok_runs": ok_runs, "failed_runs": failed_runs, - "duration": duration + "duration": duration, } export_results.append(result) _logger.info(f"Done exporting experiment: {result}") except RestException as e: mlflow_utils.dump_exception(e) - err_msg = { **{ "message": "Cannot export experiment", "experiment": exp_name }, ** mlflow_utils.mk_msg_RestException(e) } + err_msg = { + **{"message": "Cannot export experiment", "experiment": exp_name}, + **mlflow_utils.mk_msg_RestException(e), + } _logger.error(err_msg) except MlflowExportImportException as e: - err_msg = { "message": "Cannot export experiment", "experiment": exp_name, "MlflowExportImportException": e.kwargs } + err_msg = { + "message": "Cannot export experiment", + "experiment": exp_name, + "MlflowExportImportException": e.kwargs, + } _logger.error(err_msg) except Exception as e: - err_msg = { "message": "Cannot export experiment", "experiment": exp_name, "Exception": e } + err_msg = { + "message": "Cannot export experiment", + "experiment": exp_name, + "Exception": e, + } _logger.error(err_msg) return Result(exp_name, ok_runs, failed_runs) def _convert_dict_keys_to_list(obj): import collections - if isinstance(obj, collections.abc.KeysView): # class dict_keys + + if isinstance(obj, collections.abc.KeysView): # class dict_keys obj = list(obj) return obj @@ -209,24 +255,37 @@ class Result: @click.command() @opt_experiments @opt_output_dir +@opt_import_model_artifacts +@opt_max_runs @opt_export_permissions @opt_run_start_time @opt_export_deleted_runs @opt_notebook_formats @opt_use_threads - -def main(experiments, output_dir, export_permissions, run_start_time, export_deleted_runs, notebook_formats, use_threads): +def main( + experiments, + output_dir, + import_model_artifacts, + max_runs, + export_permissions, + run_start_time, + export_deleted_runs, + notebook_formats, + use_threads, +): _logger.info("Options:") - for k,v in locals().items(): + for k, v in locals().items(): _logger.info(f" {k}: {v}") export_experiments( - experiments = experiments, - output_dir = output_dir, - export_permissions = export_permissions, - run_start_time = run_start_time, - export_deleted_runs = export_deleted_runs, - notebook_formats = utils.string_to_list(notebook_formats), - use_threads = use_threads + experiments=experiments, + output_dir=output_dir, + import_model_artifacts=import_model_artifacts, + max_runs=max_runs, + export_permissions=export_permissions, + run_start_time=run_start_time, + export_deleted_runs=export_deleted_runs, + notebook_formats=utils.string_to_list(notebook_formats), + use_threads=use_threads, ) diff --git a/mlflow_export_import/common/click_options.py b/mlflow_export_import/common/click_options.py index 303a8c67..52d1854c 100644 --- a/mlflow_export_import/common/click_options.py +++ b/mlflow_export_import/common/click_options.py @@ -2,250 +2,315 @@ # == export + def opt_output_dir(function): - function = click.option("--output-dir", - help="Output directory.", - type=str, - required=True + function = click.option( + "--output-dir", help="Output directory.", type=str, required=True + )(function) + return function + + +def opt_max_runs(function): + function = click.option( + "--max-runs", + help="Number of runs that you want to export from each experiment", + type=int, + default=None, + required=False, + show_default=True, )(function) return function + +def opt_import_model_artifacts(function): + function = click.option( + "--import-model-artifacts", + help="If you don't want to import model artifacts, set this to False. Otherwise, set it to True.", + type=bool, + default=True, + show_default=True, + )(function) + return function + + def opt_notebook_formats(function): - function = click.option("--notebook-formats", + function = click.option( + "--notebook-formats", help="Databricks notebook formats. Values are SOURCE, HTML, JUPYTER or DBC (comma separated).", - type=str, - default="", - show_default=True + type=str, + default="", + show_default=True, )(function) return function + def opt_run_id(function): - function = click.option("--run-id", - help="Run ID.", - type=str, - required=True - )(function) + function = click.option("--run-id", help="Run ID.", type=str, required=True)( + function + ) return function + def opt_stages(function): - function = click.option("--stages", + function = click.option( + "--stages", help="Stages to export (comma separated). Default is all stages and all versions. Stages are Production, Staging, Archived and None. Mututally exclusive with option --versions.", type=str, - required=False + required=False, )(function) return function + def opt_export_latest_versions(function): - function = click.option("--export-latest-versions", + function = click.option( + "--export-latest-versions", help="Export latest registered model versions instead of all versions.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_export_permissions(function): - function = click.option("--export-permissions", + function = click.option( + "--export-permissions", help="Export Databricks permissions.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_get_model_version_download_uri(function): - function = click.option("--get-model-version-download-uri", + function = click.option( + "--get-model-version-download-uri", help="Call MLflowClient.get_model_version_download_uri() for version export.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_run_start_time(function): - function = click.option("--run-start-time", + function = click.option( + "--run-start-time", help="Only export runs started after this UTC time (inclusive). Format: YYYY-MM-DD.", type=str, - required=False + required=False, )(function) return function + def opt_export_deleted_runs(function): - function = click.option("--export-deleted-runs", + function = click.option( + "--export-deleted-runs", help="Export deleted runs.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_export_version_model(function): - function = click.option("--export-version-model", + function = click.option( + "--export-version-model", help="Export registered model version's 'cached' MLflow model.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_run_ids(function): - function = click.option("--run-ids", + function = click.option( + "--run-ids", help="List of run IDs to export (comma seperated).", type=str, - required=False + required=False, )(function) return function + def opt_check_nested_runs(function): - function = click.option("--check-nested-runs", + function = click.option( + "--check-nested-runs", help="Check if run in the 'run-ids' option is a parent of nested runs and export all the nested runs.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + # == import + def opt_input_dir(function): - function = click.option("--input-dir", - help="Input directory.", - type=str, - required=True + function = click.option( + "--input-dir", help="Input directory.", type=str, required=True )(function) return function + def opt_import_source_tags(function): - function = click.option("--import-source-tags", + function = click.option( + "--import-source-tags", help="Import source information for registered model and its versions and tags in destination object.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_use_src_user_id(function): - function = click.option("--use-src-user-id", - help= """Set the destination user field to the source user field. + function = click.option( + "--use-src-user-id", + help="""Set the destination user field to the source user field. Only valid for open source MLflow. When importing into Databricks, the source user field is ignored since it is automatically picked up from your Databricks access token. There is no MLflow API endpoint to explicity set the user_id for Run and Registered Model.""", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_dst_notebook_dir(function): - function = click.option("--dst-notebook-dir", + function = click.option( + "--dst-notebook-dir", help="Databricks destination workpsace base directory for notebook. A run ID will be added to contain the run's notebook.", type=str, required=False, - show_default=True + show_default=True, )(function) return function + def opt_experiment_name(function): - function = click.option("--experiment-name", + function = click.option( + "--experiment-name", help="Destination experiment name.", type=str, - required=True + required=True, )(function) return function + def opt_experiment(function): - function = click.option("--experiment", - help="Experiment name or ID.", - type=str, - required=True - )(function) + function = click.option( + "--experiment", help="Experiment name or ID.", type=str, required=True + )(function) return function + def opt_versions(function): - function = click.option("--versions", + function = click.option( + "--versions", help="Export specified versions (comma separated). Mututally exclusive with option --stages.", type=str, - required=False)(function) + required=False, + )(function) return function + def opt_import_permissions(function): - function = click.option("--import-permissions", + function = click.option( + "--import-permissions", help="Import Databricks permissions using the HTTP PATCH method.", type=bool, default=False, - show_default=True)(function) + show_default=True, + )(function) return function # == bulk + def opt_use_threads(function): - click.option("--use-threads", + click.option( + "--use-threads", help="Process in parallel using threads.", type=bool, default=False, - show_default=True)(function) + show_default=True, + )(function) return function + def opt_delete_model(function): - function = click.option("--delete-model", + function = click.option( + "--delete-model", help="If the model exists, first delete the model and all its versions.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_experiments(function): - function = click.option("--experiments", + function = click.option( + "--experiments", help="Experiment names or IDs (comma delimited) or filename ending with '.txt' containing them. \ For example, 'sklearn_wine,sklearn_iris' or '1,2'. 'all' will export all experiments. \ Or 'experiments.txt' will contain a list of experiment names or IDs.", type=str, - required=True + required=True, )(function) return function + def opt_export_all_runs(function): - function = click.option("--export-all-runs", + function = click.option( + "--export-all-runs", help="Export all runs of experiment or just runs associated with registered model versions.", type=bool, default=False, - show_default=True + show_default=True, )(function) return function + def opt_experiment_rename_file(function): - function = click.option("--experiment-rename-file", + function = click.option( + "--experiment-rename-file", help="File with experiment names replacements: comma-delimited line such as 'old_name,new_name'.", type=str, - required=False + required=False, )(function) return function + def opt_model_rename_file(function): - function = click.option("--model-rename-file", + function = click.option( + "--model-rename-file", help="File with registered model names replacements: comma-delimited line such as 'old_name,new_name'.", type=str, - required=False + required=False, )(function) return function + # == other + def opt_model(function): - function = click.option("--model", - help="Registered model name.", - type=str, - required=True + function = click.option( + "--model", help="Registered model name.", type=str, required=True )(function) return function + def opt_verbose(function): - function = click.option("--verbose", - type=bool, - help="Verbose.", - default=False, - show_default=True + function = click.option( + "--verbose", type=bool, help="Verbose.", default=False, show_default=True )(function) return function diff --git a/mlflow_export_import/experiment/export_experiment.py b/mlflow_export_import/experiment/export_experiment.py index ac1656e4..da82fd43 100644 --- a/mlflow_export_import/experiment/export_experiment.py +++ b/mlflow_export_import/experiment/export_experiment.py @@ -13,13 +13,16 @@ opt_export_permissions, opt_run_start_time, opt_export_deleted_runs, - opt_check_nested_runs + opt_check_nested_runs, ) from mlflow_export_import.common.iterators import SearchRunsIterator from mlflow_export_import.common import utils, io_utils, mlflow_utils from mlflow_export_import.common import ws_permissions_utils from mlflow_export_import.common.timestamp_utils import fmt_ts_millis, utc_str_to_millis -from mlflow_export_import.client.client_utils import create_mlflow_client, create_dbx_client +from mlflow_export_import.client.client_utils import ( + create_mlflow_client, + create_dbx_client, +) from mlflow_export_import.run.export_run import export_run from . import nested_runs_utils @@ -27,23 +30,25 @@ def export_experiment( - experiment_id_or_name, - output_dir, - run_ids = None, - export_permissions = False, - run_start_time = None, - export_deleted_runs = False, - check_nested_runs = False, - notebook_formats = None, - mlflow_client = None - ): + experiment_id_or_name, + output_dir, + import_model_artifacts=True, + max_runs=None, + run_ids=None, + export_permissions=False, + run_start_time=None, + export_deleted_runs=False, + check_nested_runs=False, + notebook_formats=None, + mlflow_client=None, +): """ :param: experiment_id_or_name: Experiment ID or name. :param: output_dir: Output directory. :param: run_ids: List of run IDs to export. If None then export all run IDs. :param: export_permissions - Export Databricks permissions. :param: export_deleted_runs - Export deleted runs. - :param: check_nested_runs - Check if run in the 'run-ids' option is a parent of nested runs and + :param: check_nested_runs - Check if run in the 'run-ids' option is a parent of nested runs and export all the nested runs. :param: run_start_time - Only export runs started after this UTC time (inclusive). Format: YYYY-MM-DD. :param: notebook_formats: List of notebook formats to export. Values are SOURCE, HTML, JUPYTER or DBC. @@ -58,80 +63,115 @@ def export_experiment( run_start_time = utc_str_to_millis(run_start_time) exp = mlflow_utils.get_experiment(mlflow_client, experiment_id_or_name) - msg = { "name": exp.name, "id": exp.experiment_id, + msg = { + "name": exp.name, + "id": exp.experiment_id, "mlflow.experimentType": exp.tags.get("mlflow.experimentType", None), - "lifecycle_stage": exp.lifecycle_stage + "lifecycle_stage": exp.lifecycle_stage, } _logger.info(f"Exporting experiment: {msg}") ok_run_ids = [] failed_run_ids = [] num_runs_exported = 0 + if run_ids: runs = _get_runs(mlflow_client, run_ids, exp, failed_run_ids) - if check_nested_runs: # ZZ - runs = nested_runs_utils.get_nested_runs(mlflow_client, runs) # + if check_nested_runs: # ZZ + runs = nested_runs_utils.get_nested_runs(mlflow_client, runs) # else: kwargs = {} if run_start_time: kwargs["filter"] = f"start_time > {run_start_time}" if export_deleted_runs: from mlflow.entities import ViewType + kwargs["view_type"] = ViewType.ALL - runs = SearchRunsIterator(mlflow_client, exp.experiment_id, **kwargs) + runs = SearchRunsIterator( + mlflow_client, exp.experiment_id, max_results=max_runs, **kwargs + ) for run in runs: - _export_run(mlflow_client, run, output_dir, ok_run_ids, failed_run_ids, - run_start_time, run_start_time_str, export_deleted_runs, notebook_formats) + if max_runs: + if num_runs_exported > max_runs - 1: + break + _export_run( + mlflow_client, + run, + output_dir, + import_model_artifacts, + ok_run_ids, + failed_run_ids, + run_start_time, + run_start_time_str, + export_deleted_runs, + notebook_formats, + ) + num_runs_exported += 1 info_attr = { "num_total_runs": (num_runs_exported), "num_ok_runs": len(ok_run_ids), "num_failed_runs": len(failed_run_ids), - "failed_runs": failed_run_ids + "failed_runs": failed_run_ids, } exp_dct = utils.strip_underscores(exp) exp_dct["_creation_time"] = fmt_ts_millis(exp.creation_time) exp_dct["_last_update_time"] = fmt_ts_millis(exp.last_update_time) exp_dct["tags"] = dict(sorted(exp_dct["tags"].items())) - mlflow_attr = { "experiment": exp_dct , "runs": ok_run_ids } + mlflow_attr = {"experiment": exp_dct, "runs": ok_run_ids} if export_permissions: - mlflow_attr["permissions"] = ws_permissions_utils.get_experiment_permissions(dbx_client, exp.experiment_id) - io_utils.write_export_file(output_dir, "experiment.json", __file__, mlflow_attr, info_attr) + mlflow_attr["permissions"] = ws_permissions_utils.get_experiment_permissions( + dbx_client, exp.experiment_id + ) + io_utils.write_export_file( + output_dir, "experiment.json", __file__, mlflow_attr, info_attr + ) msg = f"for experiment '{exp.name}' (ID: {exp.experiment_id})" - if num_runs_exported==0: + if num_runs_exported == 0: _logger.warning(f"No runs exported {msg}") elif len(failed_run_ids) == 0: _logger.info(f"{len(ok_run_ids)} runs succesfully exported {msg}") else: - _logger.info(f"{len(ok_run_ids)}/{num_runs_exported} runs succesfully exported {msg}") + _logger.info( + f"{len(ok_run_ids)}/{num_runs_exported} runs succesfully exported {msg}" + ) _logger.info(f"{len(failed_run_ids)}/{num_runs_exported} runs failed {msg}") return len(ok_run_ids), len(failed_run_ids) -def _export_run(mlflow_client, run, output_dir, - ok_run_ids, failed_run_ids, - run_start_time, run_start_time_str, - export_deleted_runs, notebook_formats - ): +def _export_run( + mlflow_client, + run, + output_dir, + import_model_artifacts, + ok_run_ids, + failed_run_ids, + run_start_time, + run_start_time_str, + export_deleted_runs, + notebook_formats, +): if run_start_time and run.info.start_time < run_start_time: msg = { "run_id": {run.info.run_id}, "experiment_id": {run.info.experiment_id}, "start_time": fmt_ts_millis(run.info.start_time), - "run_start_time": run_start_time_str + "run_start_time": run_start_time_str, } _logger.info(f"Not exporting run: {msg}") return is_success = export_run( - run_id = run.info.run_id, - output_dir = os.path.join(output_dir, run.info.run_id), - export_deleted_runs = export_deleted_runs, - notebook_formats = notebook_formats, - mlflow_client = mlflow_client + run_id=run.info.run_id, + output_dir=os.path.join(output_dir, run.info.run_id), + import_model_artifacts=import_model_artifacts, + export_deleted_runs=export_deleted_runs, + notebook_formats=notebook_formats, + mlflow_client=mlflow_client, ) + if is_success: ok_run_ids.append(run.info.run_id) else: @@ -147,14 +187,16 @@ def _get_runs(mlflow_client, run_ids, exp, failed_run_ids): runs.append(run) else: msg = { - "run_id": {run.info.run_id}, + "run_id": {run.info.run_id}, "run.experiment_id": {run.info.experiment_id}, - "experiment_id": {exp.experiment_id} + "experiment_id": {exp.experiment_id}, } - _logger.warning(f"Not exporting run since it doesn't belong to experiment: {msg}") + _logger.warning( + f"Not exporting run since it doesn't belong to experiment: {msg}" + ) failed_run_ids.append(run.info.run_id) except Exception as e: - msg = { "run_id": run_id, "experiment_id": exp.experiment_id, "Exception": e } + msg = {"run_id": run_id, "experiment_id": exp.experiment_id, "Exception": e} _logger.warning(f"Run export failed: {msg}") failed_run_ids.append(run_id) return runs @@ -169,24 +211,32 @@ def _get_runs(mlflow_client, run_ids, exp, failed_run_ids): @opt_export_deleted_runs @opt_check_nested_runs @opt_notebook_formats - -def main(experiment, output_dir, run_ids, export_permissions, run_start_time, export_deleted_runs, check_nested_runs, notebook_formats): +def main( + experiment, + output_dir, + run_ids, + export_permissions, + run_start_time, + export_deleted_runs, + check_nested_runs, + notebook_formats, +): _logger.info("Options:") - for k,v in locals().items(): + for k, v in locals().items(): _logger.info(f" {k}: {v}") if run_ids: run_ids = run_ids.split(",") export_experiment( - experiment_id_or_name = experiment, - output_dir = output_dir, - run_ids = run_ids, - export_permissions = export_permissions, - run_start_time = run_start_time, - export_deleted_runs = export_deleted_runs, - check_nested_runs = check_nested_runs, - notebook_formats = utils.string_to_list(notebook_formats) + experiment_id_or_name=experiment, + output_dir=output_dir, + run_ids=run_ids, + export_permissions=export_permissions, + run_start_time=run_start_time, + export_deleted_runs=export_deleted_runs, + check_nested_runs=check_nested_runs, + notebook_formats=utils.string_to_list(notebook_formats), ) diff --git a/mlflow_export_import/run/export_run.py b/mlflow_export_import/run/export_run.py index c0549888..a126d0a1 100644 --- a/mlflow_export_import/run/export_run.py +++ b/mlflow_export_import/run/export_run.py @@ -10,32 +10,43 @@ from mlflow_export_import.common import utils from mlflow_export_import.common.click_options import ( + opt_import_model_artifacts, opt_run_id, opt_output_dir, - opt_notebook_formats + opt_notebook_formats, ) from mlflow.exceptions import RestException from mlflow_export_import.common import filesystem as _fs from mlflow_export_import.common import io_utils -from mlflow_export_import.common.timestamp_utils import adjust_timestamps, format_seconds -from mlflow_export_import.client.client_utils import create_mlflow_client, create_dbx_client +from mlflow_export_import.common.timestamp_utils import ( + adjust_timestamps, + format_seconds, +) +from mlflow_export_import.client.client_utils import ( + create_mlflow_client, + create_dbx_client, +) from mlflow_export_import.notebook.download_notebook import download_notebook from mlflow.utils.mlflow_tags import MLFLOW_DATABRICKS_NOTEBOOK_PATH -MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID = "mlflow.databricks.notebookRevisionID" # NOTE: not in mlflow/utils/mlflow_tags.py + +MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID = ( + "mlflow.databricks.notebookRevisionID" # NOTE: not in mlflow/utils/mlflow_tags.py +) _logger = utils.getLogger(__name__) def export_run( - run_id, - output_dir, - export_deleted_runs = False, - skip_download_run_artifacts = False, - notebook_formats = None, - raise_exception = False, - mlflow_client = None - ): + run_id, + output_dir, + export_deleted_runs=False, + skip_download_run_artifacts=True, + notebook_formats=None, + raise_exception=False, + mlflow_client=None, + import_model_artifacts=True, +): """ :param run_id: Run ID. :param output_dir: Output directory. @@ -57,12 +68,18 @@ def export_run( try: run = mlflow_client.get_run(run_id) dst_path = os.path.join(output_dir, "artifacts") - msg = { "run_id": run.info.run_id, "dst_path": dst_path } + msg = {"run_id": run.info.run_id, "dst_path": dst_path} if run.info.lifecycle_stage == "deleted" and not export_deleted_runs: - _logger.warning(f"Not exporting run '{run.info.run_id} because its lifecycle_stage is '{run.info.lifecycle_stage}'") + _logger.warning( + f"Not exporting run '{run.info.run_id} because its lifecycle_stage is '{run.info.lifecycle_stage}'" + ) return None experiment_id = run.info.experiment_id - msg = { "run_id": run.info.run_id, "lifecycle_stage": run.info.lifecycle_stage, "experiment_id": run.info.experiment_id } + msg = { + "run_id": run.info.run_id, + "lifecycle_stage": run.info.lifecycle_stage, + "experiment_id": run.info.experiment_id, + } tags = run.data.tags tags = dict(sorted(tags.items())) @@ -73,7 +90,7 @@ def export_run( "params": run.data.params, "metrics": _get_metrics_with_steps(mlflow_client, run), "tags": tags, - "inputs": _inputs_to_dict(run.inputs) + "inputs": _inputs_to_dict(run.inputs), } io_utils.write_export_file(output_dir, "run.json", __file__, mlflow_attr) fs = _fs.get_filesystem(".") @@ -85,34 +102,45 @@ def export_run( if skip_download_run_artifacts: _logger.warning(f"Not downloading run artifacts for run {run.info.run_id}") else: - if len(artifacts) > 0: # Because of https://github.com/mlflow/mlflow/issues/2839 + if ( + import_model_artifacts + ): # Because of https://github.com/mlflow/mlflow/issues/2839 fs.mkdirs(dst_path) mlflow.artifacts.download_artifacts( - run_id = run.info.run_id, - dst_path = _fs.mk_local_path(dst_path), - tracking_uri = mlflow_client._tracking_client.tracking_uri) + run_id=run.info.run_id, + dst_path=_fs.mk_local_path(dst_path), + tracking_uri=mlflow_client._tracking_client.tracking_uri, + ) notebook = tags.get(MLFLOW_DATABRICKS_NOTEBOOK_PATH) # export notebook as artifact if notebook is not None: if len(notebook_formats) > 0: - _export_notebook(dbx_client, output_dir, notebook, notebook_formats, run, fs) + _export_notebook( + dbx_client, output_dir, notebook, notebook_formats, run, fs + ) elif len(notebook_formats) > 0: - _logger.warning(f"No notebooks to export for run '{run_id}' since tag '{MLFLOW_DATABRICKS_NOTEBOOK_PATH}' is not set.") - dur = format_seconds(time.time()-start_time) + _logger.warning( + f"No notebooks to export for run '{run_id}' since tag '{MLFLOW_DATABRICKS_NOTEBOOK_PATH}' is not set." + ) + dur = format_seconds(time.time() - start_time) _logger.info(f"Exported run in {dur}: {msg}") return run except RestException as e: if raise_exception: raise e - err_msg = { "run_id": run_id, "experiment_id": experiment_id, "RestException": e.json } + err_msg = { + "run_id": run_id, + "experiment_id": experiment_id, + "RestException": e.json, + } _logger.error(f"Run export failed (1): {err_msg}") return None except Exception as e: if raise_exception: raise e - err_msg = { "run_id": run_id, "experiment_id": experiment_id, "Exception": e } + err_msg = {"run_id": run_id, "experiment_id": experiment_id, "Exception": e} _logger.error(f"Run export failed (2): {err_msg}") traceback.print_exc() return None @@ -121,7 +149,7 @@ def export_run( def _get_metrics_with_steps(mlflow_client, run): metrics_with_steps = {} for metric in run.data.metrics.keys(): - metric_history = mlflow_client.get_metric_history(run.info.run_id,metric) + metric_history = mlflow_client.get_metric_history(run.info.run_id, metric) lst = [utils.strip_underscores(m) for m in metric_history] for x in lst: del x["key"] @@ -134,12 +162,14 @@ def _export_notebook(dbx_client, output_dir, notebook, notebook_formats, run, fs fs.mkdirs(notebook_dir) revision_id = run.data.tags.get(MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID) if not revision_id: - _logger.warning(f"Cannot download notebook '{notebook}' for run '{run.info.run_id}' since tag '{MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID}' does not exist. Notebook is probably a Git Repo notebook.") + _logger.warning( + f"Cannot download notebook '{notebook}' for run '{run.info.run_id}' since tag '{MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID}' does not exist. Notebook is probably a Git Repo notebook." + ) return manifest = { - MLFLOW_DATABRICKS_NOTEBOOK_PATH: run.data.tags[MLFLOW_DATABRICKS_NOTEBOOK_PATH], - MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID: revision_id, - "formats": notebook_formats + MLFLOW_DATABRICKS_NOTEBOOK_PATH: run.data.tags[MLFLOW_DATABRICKS_NOTEBOOK_PATH], + MLFLOW_DATABRICKS_NOTEBOOK_REVISION_ID: revision_id, + "formats": notebook_formats, } path = os.path.join(notebook_dir, "manifest.json") io_utils.write_file(path, manifest) @@ -150,24 +180,25 @@ def _inputs_to_dict(inputs): def to_dict(ds): return { "dataset": utils.strip_underscores(ds.dataset), - "tags": [ utils.strip_underscores(tag) for tag in ds.tags ] + "tags": [utils.strip_underscores(tag) for tag in ds.tags], } - return [ to_dict(x) for x in inputs.dataset_inputs ] + + return [to_dict(x) for x in inputs.dataset_inputs] @click.command() @opt_run_id @opt_output_dir +@opt_import_model_artifacts @opt_notebook_formats - -def main(run_id, output_dir, notebook_formats): +def main(run_id, output_dir, notebook_formats, import_model_artifacts): _logger.info("Options:") - for k,v in locals().items(): + for k, v in locals().items(): _logger.info(f" {k}: {v}") export_run( - run_id = run_id, - output_dir = output_dir, - notebook_formats = utils.string_to_list(notebook_formats) + run_id=run_id, + output_dir=output_dir, + notebook_formats=utils.string_to_list(notebook_formats), )