From 1cb638b9f34937e5ce2160e167585ecb367c615f Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 13 Jan 2025 15:04:51 -0600 Subject: [PATCH] update ingest fn --- CHANGELOG.md | 2 ++ neo4j_parallel_spark_loader/utils/ingest.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b691cb..43adbce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Changed +* Make `num_groups` optional parameter in ingest function. Allows for improved performance. + ### Added ## 0.3.0 diff --git a/neo4j_parallel_spark_loader/utils/ingest.py b/neo4j_parallel_spark_loader/utils/ingest.py index 991d27a..4333a28 100644 --- a/neo4j_parallel_spark_loader/utils/ingest.py +++ b/neo4j_parallel_spark_loader/utils/ingest.py @@ -1,14 +1,14 @@ -from typing import Any, Dict, Literal +from typing import Any, Dict, Literal, Optional from pyspark.sql import DataFrame from pyspark.sql.functions import col, collect_set -from pyspark.sql.functions import max as spark_max def ingest_spark_dataframe( spark_dataframe: DataFrame, save_mode: Literal["Overwrite", "Append"], options: Dict[str, Any], + num_groups: Optional[int] = None ) -> None: """ Saves a Spark DataFrame in multiple batches based on the 'batch' column values. @@ -26,6 +26,10 @@ def ingest_spark_dataframe( options : Dict[str, Any] Dictionary of options to configure the DataFrame writer. Refer to example for more information. + num_groups: Optional[int], optional + The number of partitions to split Spark DataFrame into. + If not provided, then will be calculated. + It is more efficient to pass this parameter explicitly. By default None Example ------- @@ -67,9 +71,10 @@ def ingest_spark_dataframe( for batch_value in batch_list ] + num_groups = num_groups or spark_dataframe.select("group").distinct().count() + # write batches serially to Neo4j database for batch in batches: - num_groups = batch.select("group").distinct().count() ( batch.repartition(num_groups, "group") # define parallel groups for ingest .write.mode(save_mode)