Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 111 additions & 52 deletions mlflow_export_import/bulk/export_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

from mlflow_export_import.common.click_options import (
opt_experiments,
opt_import_model_artifacts,
opt_max_runs,
opt_output_dir,
opt_export_permissions,
opt_notebook_formats,
opt_run_start_time,
opt_export_deleted_runs,
opt_use_threads
opt_use_threads,
)
from mlflow_export_import.common import MlflowExportImportException
from mlflow_export_import.common import utils, io_utils, mlflow_utils
Expand All @@ -27,16 +29,19 @@

_logger = utils.getLogger(__name__)


def export_experiments(
experiments,
output_dir,
export_permissions = False,
run_start_time = None,
export_deleted_runs = False,
notebook_formats = None,
use_threads = False,
mlflow_client = None
):
experiments,
output_dir,
import_model_artifacts=True,
max_runs=None,
export_permissions=False,
run_start_time=None,
export_deleted_runs=False,
notebook_formats=None,
use_threads=False,
mlflow_client=None,
):
"""
:param experiments: Can be either:
- File (ending with '.txt') containing list of experiment names or IDS
Expand All @@ -52,7 +57,7 @@ def export_experiments(
max_workers = utils.get_threads(use_threads)
experiments_arg = _convert_dict_keys_to_list(experiments)

if isinstance(experiments,str) and experiments.endswith(".txt"):
if isinstance(experiments, str) and experiments.endswith(".txt"):
with open(experiments, "r", encoding="utf-8") as f:
experiments = f.read().splitlines()
table_data = experiments
Expand All @@ -66,13 +71,15 @@ def export_experiments(
columns = ["Experiment Name or ID"]
experiments_dct = {}
else:
experiments_dct = experiments # we passed in a dict
experiments_dct = experiments # we passed in a dict
experiments = experiments.keys()
table_data = [ [exp_id,len(runs)] for exp_id,runs in experiments_dct.items() ]
table_data = [
[exp_id, len(runs)] for exp_id, runs in experiments_dct.items()
]
num_runs = sum(x[1] for x in table_data)
table_data.append(["Total",num_runs])
table_data.append(["Total", num_runs])
columns = ["Experiment ID", "# Runs"]
utils.show_table("Experiments",table_data,columns)
utils.show_table("Experiments", table_data, columns)
_logger.info("")

ok_runs = 0
Expand All @@ -82,16 +89,19 @@ def export_experiments(
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for exp_id_or_name in experiments:
run_ids = experiments_dct.get(exp_id_or_name, None)
future = executor.submit(_export_experiment,
future = executor.submit(
_export_experiment,
mlflow_client,
exp_id_or_name,
output_dir,
import_model_artifacts,
max_runs,
export_permissions,
notebook_formats,
export_results,
run_start_time,
export_deleted_runs,
run_ids
run_ids,
)
futures.append(future)
duration = round(time.time() - start_time, 1)
Expand All @@ -116,85 +126,121 @@ def export_experiments(
"run_start_time": run_start_time,
"export_deleted_runs": export_deleted_runs,
"notebook_formats": notebook_formats,
"use_threads": use_threads
"use_threads": use_threads,
},
"status": {
"duration": duration,
"experiments": len(experiments),
"total_runs": total_runs,
"ok_runs": ok_runs,
"failed_runs": failed_runs
}
"failed_runs": failed_runs,
},
}
mlflow_attr = { "experiments": export_results }
mlflow_attr = {"experiments": export_results}

# NOTE: Make sure we don't overwrite existing experiments.json generated by export_models when being called by export_all.
# Merge this existing experiments.json with the new built by export_experiments.
path = _fs.mk_local_path(os.path.join(output_dir, "experiments.json"))
if os.path.exists(path):
from mlflow_export_import.bulk.experiments_merge_utils import merge_mlflow, merge_info
from mlflow_export_import.bulk.experiments_merge_utils import (
merge_mlflow,
merge_info,
)

root = io_utils.read_file(path)
mlflow_attr = merge_mlflow(io_utils.get_mlflow(root), mlflow_attr)
info_attr = merge_info(io_utils.get_info(root), info_attr)
info_attr["note"] = "Merged by export_all from export_models and export_experiments"
info_attr["note"] = (
"Merged by export_all from export_models and export_experiments"
)

io_utils.write_export_file(output_dir, "experiments.json", __file__, mlflow_attr, info_attr)
io_utils.write_export_file(
output_dir, "experiments.json", __file__, mlflow_attr, info_attr
)

_logger.info(f"{len(experiments)} experiments exported")
_logger.info(f"{ok_runs}/{total_runs} runs succesfully exported")
if failed_runs > 0:
_logger.info(f"{failed_runs}/{total_runs} runs failed")
_logger.info(f"Duration for {len(experiments)} experiments export: {duration} seconds")
_logger.info(
f"Duration for {len(experiments)} experiments export: {duration} seconds"
)

return info_attr


def _export_experiment(mlflow_client, exp_id_or_name, output_dir, export_permissions, notebook_formats, export_results,
run_start_time, export_deleted_runs, run_ids):
ok_runs = -1; failed_runs = -1
def _export_experiment(
mlflow_client,
exp_id_or_name,
output_dir,
import_model_artifacts,
max_runs,
export_permissions,
notebook_formats,
export_results,
run_start_time,
export_deleted_runs,
run_ids,
):
ok_runs = -1
failed_runs = -1
exp_name = exp_id_or_name
try:
exp = mlflow_utils.get_experiment(mlflow_client, exp_id_or_name)
exp_name = exp.name
exp_output_dir = os.path.join(output_dir, exp.experiment_id)
start_time = time.time()
ok_runs, failed_runs = export_experiment(
experiment_id_or_name = exp.experiment_id,
output_dir = exp_output_dir,
run_ids = run_ids,
export_permissions = export_permissions,
run_start_time = run_start_time,
export_deleted_runs = export_deleted_runs,
notebook_formats = notebook_formats,
mlflow_client = mlflow_client
experiment_id_or_name=exp.experiment_id,
output_dir=exp_output_dir,
run_ids=run_ids,
import_model_artifacts=import_model_artifacts,
max_runs=max_runs,
export_permissions=export_permissions,
run_start_time=run_start_time,
export_deleted_runs=export_deleted_runs,
notebook_formats=notebook_formats,
mlflow_client=mlflow_client,
)
duration = round(time.time() - start_time, 1)
result = {
"id" : exp.experiment_id,
"id": exp.experiment_id,
"name": exp.name,
"ok_runs": ok_runs,
"failed_runs": failed_runs,
"duration": duration
"duration": duration,
}
export_results.append(result)
_logger.info(f"Done exporting experiment: {result}")

except RestException as e:
mlflow_utils.dump_exception(e)
err_msg = { **{ "message": "Cannot export experiment", "experiment": exp_name }, ** mlflow_utils.mk_msg_RestException(e) }
err_msg = {
**{"message": "Cannot export experiment", "experiment": exp_name},
**mlflow_utils.mk_msg_RestException(e),
}
_logger.error(err_msg)
except MlflowExportImportException as e:
err_msg = { "message": "Cannot export experiment", "experiment": exp_name, "MlflowExportImportException": e.kwargs }
err_msg = {
"message": "Cannot export experiment",
"experiment": exp_name,
"MlflowExportImportException": e.kwargs,
}
_logger.error(err_msg)
except Exception as e:
err_msg = { "message": "Cannot export experiment", "experiment": exp_name, "Exception": e }
err_msg = {
"message": "Cannot export experiment",
"experiment": exp_name,
"Exception": e,
}
_logger.error(err_msg)
return Result(exp_name, ok_runs, failed_runs)


def _convert_dict_keys_to_list(obj):
import collections
if isinstance(obj, collections.abc.KeysView): # class dict_keys

if isinstance(obj, collections.abc.KeysView): # class dict_keys
obj = list(obj)
return obj

Expand All @@ -209,24 +255,37 @@ class Result:
@click.command()
@opt_experiments
@opt_output_dir
@opt_import_model_artifacts
@opt_max_runs
@opt_export_permissions
@opt_run_start_time
@opt_export_deleted_runs
@opt_notebook_formats
@opt_use_threads

def main(experiments, output_dir, export_permissions, run_start_time, export_deleted_runs, notebook_formats, use_threads):
def main(
experiments,
output_dir,
import_model_artifacts,
max_runs,
export_permissions,
run_start_time,
export_deleted_runs,
notebook_formats,
use_threads,
):
_logger.info("Options:")
for k,v in locals().items():
for k, v in locals().items():
_logger.info(f" {k}: {v}")
export_experiments(
experiments = experiments,
output_dir = output_dir,
export_permissions = export_permissions,
run_start_time = run_start_time,
export_deleted_runs = export_deleted_runs,
notebook_formats = utils.string_to_list(notebook_formats),
use_threads = use_threads
experiments=experiments,
output_dir=output_dir,
import_model_artifacts=import_model_artifacts,
max_runs=max_runs,
export_permissions=export_permissions,
run_start_time=run_start_time,
export_deleted_runs=export_deleted_runs,
notebook_formats=utils.string_to_list(notebook_formats),
use_threads=use_threads,
)


Expand Down
Loading