Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion databricks_notebooks/bulk/Common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Databricks notebook source
# MAGIC %pip install -U mlflow-skinny
# %pip install -U mlflow-skinny
# %pip install -U git+https:///github.com/mlflow/mlflow-export-import/#egg=mlflow-export-import
# dbutils.library.restartPython()

# COMMAND ----------

# MAGIC %pip install -U mlflow==2.19.0
# MAGIC %pip install -U git+https:///github.com/mlflow/mlflow-export-import/#egg=mlflow-export-import
# MAGIC dbutils.library.restartPython()

Expand All @@ -25,3 +31,7 @@ def get_notebook_formats(num):
notebook_formats = notebook_formats.split(",")
if "" in notebook_formats: notebook_formats.remove("")
return notebook_formats

# COMMAND ----------


142 changes: 114 additions & 28 deletions databricks_notebooks/bulk/Export_All.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Databricks notebook source
# MAGIC %md ## Export All
# MAGIC %md
# MAGIC ##Export All
# MAGIC
# MAGIC Export all the MLflow registered models and all experiments of a tracking server.
# MAGIC ##Export all the MLflow registered models and all experiments of a tracking server.
# MAGIC
# MAGIC **Widgets**
# MAGIC * `1. Output directory` - shared directory between source and destination workspaces.
Expand All @@ -20,32 +21,49 @@

# COMMAND ----------

dbutils.widgets.text("1. Output directory", "")
output_dir = dbutils.widgets.get("1. Output directory")
from mlflow_export_import.bulk import config
import time
import os
from datetime import datetime

# COMMAND ----------

dbutils.widgets.text("output_dir","")
output_dir = dbutils.widgets.get("output_dir")
output_dir = output_dir.replace("dbfs:","/dbfs")

dbutils.widgets.multiselect("2. Stages", "Production", ["Production","Staging","Archived","None"])
stages = dbutils.widgets.get("2. Stages")
dbutils.widgets.multiselect("stages", "Production", ["Production","Staging","Archived","None"])
stages = dbutils.widgets.get("stages")

dbutils.widgets.dropdown("export_latest_versions","false",["true","false"])
export_latest_versions = dbutils.widgets.get("export_latest_versions") == "true"

dbutils.widgets.text("run_start_date", "")
run_start_date = dbutils.widgets.get("run_start_date")

dbutils.widgets.dropdown("3. Export latest versions","no",["yes","no"])
export_latest_versions = dbutils.widgets.get("3. Export latest versions") == "yes"
dbutils.widgets.dropdown("export_permissions","false",["true","false"])
export_permissions = dbutils.widgets.get("export_permissions") == "true"

dbutils.widgets.text("4. Run start date", "")
run_start_date = dbutils.widgets.get("4. Run start date")
dbutils.widgets.text("task_index", "1")
task_index = int(dbutils.widgets.get("task_index"))

dbutils.widgets.dropdown("5. Export permissions","no",["yes","no"])
export_permissions = dbutils.widgets.get("5. Export permissions") == "yes"
dbutils.widgets.text("num_tasks", "1")
num_tasks = int(dbutils.widgets.get("num_tasks"))

dbutils.widgets.dropdown("6. Export deleted runs","no",["yes","no"])
export_deleted_runs = dbutils.widgets.get("6. Export deleted runs") == "yes"
dbutils.widgets.text("run_timestamp", "")
run_timestamp = dbutils.widgets.get("run_timestamp")

dbutils.widgets.dropdown("7. Export version MLflow model","no",["yes","no"]) # TODO
export_version_model = dbutils.widgets.get("7. Export version MLflow model") == "yes"
dbutils.widgets.text("jobrunid", "")
jobrunid = dbutils.widgets.get("jobrunid")

notebook_formats = get_notebook_formats(8)
dbutils.widgets.text("model_file_name", "")
model_file_name = dbutils.widgets.get("model_file_name")

dbutils.widgets.dropdown("9. Use threads","no",["yes","no"])
use_threads = dbutils.widgets.get("9. Use threads") == "yes"
dbutils.widgets.dropdown("source_model_registry","unity_catalog",["unity_catalog","workspace_registry"])
source_model_registry = dbutils.widgets.get("source_model_registry")

dbutils.widgets.dropdown("Cloud","azure",["azure","aws","gcp"])
cloud = dbutils.widgets.get("Cloud")

if run_start_date=="": run_start_date = None

Expand All @@ -54,14 +72,52 @@
print("export_latest_versions:", export_latest_versions)
print("run_start_date:", run_start_date)
print("export_permissions:", export_permissions)
print("export_deleted_runs:", export_deleted_runs)
print("export_version_model:", export_version_model)
print("notebook_formats:", notebook_formats)
print("use_threads:", use_threads)
print("task_index:", task_index)
print("num_tasks:", num_tasks)
print("run_timestamp:", run_timestamp)
print("jobrunid:", jobrunid)
print("model_file_name:", model_file_name)
print("source_model_registry:", source_model_registry)

# COMMAND ----------

assert_widget(output_dir, "1. Output directory")
checkpoint_dir_experiment = os.path.join(output_dir, run_timestamp,"checkpoint", "experiments")
try:
if not os.path.exists(checkpoint_dir_experiment):
os.makedirs(checkpoint_dir_experiment, exist_ok=True)
print(f"checkpoint_dir_experiment: created {checkpoint_dir_experiment}")
except Exception as e:
raise Exception(f"Failed to create directory {checkpoint_dir_experiment}: {e}")

# COMMAND ----------

checkpoint_dir_model = os.path.join(output_dir, run_timestamp,"checkpoint", "models")
try:
if not os.path.exists(checkpoint_dir_model):
os.makedirs(checkpoint_dir_model, exist_ok=True)
print(f"checkpoint_dir_model: created {checkpoint_dir_model}")
except Exception as e:
raise Exception(f"Failed to create directory {checkpoint_dir_model}: {e}")

# COMMAND ----------

output_dir = os.path.join(output_dir, run_timestamp, jobrunid, str(task_index))
output_dir

# COMMAND ----------

log_path=f"/tmp/exportall_{task_index}.log"
log_path

# COMMAND ----------

# curr_timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
# log_path = f"{output_dir}/export_all_{task_index}_{curr_timestamp}.log"

# COMMAND ----------

config.log_path=log_path
config.target_model_registry=source_model_registry

# COMMAND ----------

Expand All @@ -73,8 +129,38 @@
export_latest_versions = export_latest_versions,
run_start_time = run_start_date,
export_permissions = export_permissions,
export_deleted_runs = export_deleted_runs,
export_version_model = export_version_model,
notebook_formats = notebook_formats,
use_threads = use_threads
export_deleted_runs = False,
export_version_model = False,
notebook_formats = ['SOURCE'],
use_threads = True,
task_index = task_index,
num_tasks = num_tasks,
checkpoint_dir_experiment = checkpoint_dir_experiment,
checkpoint_dir_model = checkpoint_dir_model,
model_names = model_file_name
)

# COMMAND ----------

time.sleep(10)

# COMMAND ----------

# MAGIC %sh cat /tmp/my.log

# COMMAND ----------

dbfs_log_path = f"{output_dir}/export_all_{task_index}.log"
if dbfs_log_path.startswith("/Workspace"):
dbfs_log_path=dbfs_log_path.replace("/Workspace","file:/Workspace")
dbfs_log_path = dbfs_log_path.replace("/dbfs","dbfs:")
dbfs_log_path

# COMMAND ----------


dbutils.fs.cp(f"file:{log_path}", dbfs_log_path)

# COMMAND ----------

print(dbutils.fs.head(dbfs_log_path))
61 changes: 61 additions & 0 deletions databricks_notebooks/bulk/Export_All_log_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Databricks notebook source
spark.read.parquet("<volume path till jobid>/checkpoint/models/*.parquet").createOrReplaceTempView("models")


# COMMAND ----------

# MAGIC %sql
# MAGIC select * from models
# MAGIC -- select count(distinct(model)) from models -- model=1202
# MAGIC -- select count(distinct(experiment_id)) from models -- experiment=771

# COMMAND ----------

spark.read.parquet("<volume path till jobid>/checkpoint/experiments").createOrReplaceTempView("experiments")

# COMMAND ----------

# MAGIC %sql
# MAGIC select * from experiments
# MAGIC -- select count(distinct(experiment_id)) from experiments --1774

# COMMAND ----------



# COMMAND ----------

from pyspark.sql.functions import regexp_extract, col

log_df = spark.read.text("dbfs:/mnt/modelnonuc/2025-06-17-Export-jobid-34179827290231/jobrunid-548033559076165/*/export_all_*.log")

# Define regex pattern
pattern = r"^(\d{2}-\w{3}-\d{2} \d{2}:\d{2}:\d{2}) - (\w+) - \[([^\]:]+):(\d+)\] - (.*)$"

# Parse fields using regex
parsed_df = log_df.select(
regexp_extract('value', pattern, 1).alias('timestamp'),
regexp_extract('value', pattern, 2).alias('level'),
regexp_extract('value', pattern, 3).alias('module'),
regexp_extract('value', pattern, 4).alias('line_no'),
regexp_extract('value', pattern, 5).alias('message')
)

parsed_df.createOrReplaceTempView("df")
display(parsed_df)

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC select line_no,count(*),first(module) from df where level="ERROR" group by line_no

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC select * from df where level="ERROR" and line_no=78

# COMMAND ----------


Loading