Skip to content

Commit 77b3cad

Browse files
authored
Merge branch 'apache:master' into SPARK-28098
2 parents 8aee5de + 0e10341 commit 77b3cad

File tree

57 files changed

+2975
-407
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2975
-407
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,12 @@
583583
],
584584
"sqlState" : "22KD3"
585585
},
586+
"CANNOT_USE_MULTI_ALIASES_IN_WATERMARK_CLAUSE" : {
587+
"message" : [
588+
"Multiple aliases are not supported in watermark clause."
589+
],
590+
"sqlState" : "42000"
591+
},
586592
"CANNOT_WRITE_STATE_STORE" : {
587593
"message" : [
588594
"Error writing state store files for provider <providerClass>."
@@ -4985,6 +4991,12 @@
49854991
],
49864992
"sqlState" : "4274K"
49874993
},
4994+
"REQUIRES_EXPLICIT_NAME_IN_WATERMARK_CLAUSE" : {
4995+
"message" : [
4996+
"The watermark clause requires an explicit name if expression is specified, but got <sqlExpr>."
4997+
],
4998+
"sqlState" : "42000"
4999+
},
49885000
"REQUIRES_SINGLE_PART_NAMESPACE" : {
49895001
"message" : [
49905002
"<sessionCatalog> requires a single-part namespace, but got <namespace>."

docs/sql-ref-ansi-compliance.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ Below is a list of all the keywords in Spark SQL.
497497
|DEFAULT|non-reserved|non-reserved|non-reserved|
498498
|DEFINED|non-reserved|non-reserved|non-reserved|
499499
|DEFINER|non-reserved|non-reserved|non-reserved|
500+
|DELAY|non-reserved|non-reserved|non-reserved|
500501
|DELETE|non-reserved|non-reserved|reserved|
501502
|DELIMITED|non-reserved|non-reserved|non-reserved|
502503
|DESC|non-reserved|non-reserved|non-reserved|
@@ -793,6 +794,7 @@ Below is a list of all the keywords in Spark SQL.
793794
|VIEW|non-reserved|non-reserved|non-reserved|
794795
|VIEWS|non-reserved|non-reserved|non-reserved|
795796
|VOID|non-reserved|non-reserved|non-reserved|
797+
|WATERMARK|non-reserved|non-reserved|non-reserved|
796798
|WEEK|non-reserved|non-reserved|non-reserved|
797799
|WEEKS|non-reserved|non-reserved|non-reserved|
798800
|WHEN|reserved|non-reserved|reserved|

python/pyspark/sql/connect/client/artifact.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,30 @@ def is_cached_artifact(self, hash: str) -> bool:
427427
status = resp.statuses.get(artifactName)
428428
return status.exists if status is not None else False
429429

430+
def get_cached_artifacts(self, hashes: list[str]) -> set[str]:
431+
"""
432+
Batch check which artifacts are already cached on the server.
433+
Returns a set of hashes that are already cached.
434+
"""
435+
if not hashes:
436+
return set()
437+
438+
artifact_names = [f"{CACHE_PREFIX}/{hash}" for hash in hashes]
439+
request = proto.ArtifactStatusesRequest(
440+
user_context=self._user_context, session_id=self._session_id, names=artifact_names
441+
)
442+
resp: proto.ArtifactStatusesResponse = self._stub.ArtifactStatus(
443+
request, metadata=self._metadata
444+
)
445+
446+
cached = set()
447+
for hash in hashes:
448+
artifact_name = f"{CACHE_PREFIX}/{hash}"
449+
status = resp.statuses.get(artifact_name)
450+
if status is not None and status.exists:
451+
cached.add(hash)
452+
return cached
453+
430454
def cache_artifact(self, blob: bytes) -> str:
431455
"""
432456
Cache the give blob at the session.
@@ -442,3 +466,34 @@ def cache_artifact(self, blob: bytes) -> str:
442466
# TODO(SPARK-42658): Handle responses containing CRC failures.
443467

444468
return hash
469+
470+
def cache_artifacts(self, blobs: list[bytes]) -> list[str]:
471+
"""
472+
Cache the given blobs at the session.
473+
474+
This method batches artifact status checks and uploads to minimize RPC overhead.
475+
"""
476+
# Compute hashes for all blobs upfront
477+
hashes = [hashlib.sha256(blob).hexdigest() for blob in blobs]
478+
unique_hashes = list(set(hashes))
479+
480+
# Batch check which artifacts are already cached
481+
cached_hashes = self.get_cached_artifacts(unique_hashes)
482+
483+
# Collect unique artifacts that need to be uploaded
484+
seen_hashes = set()
485+
artifacts_to_add = []
486+
for blob, hash in zip(blobs, hashes):
487+
if hash not in cached_hashes and hash not in seen_hashes:
488+
artifacts_to_add.append(new_cache_artifact(hash, InMemory(blob)))
489+
seen_hashes.add(hash)
490+
491+
# Batch upload all missing artifacts in a single RPC call
492+
if artifacts_to_add:
493+
requests = self._add_artifacts(artifacts_to_add)
494+
response: proto.AddArtifactsResponse = self._retrieve_responses(requests)
495+
summaries: List[proto.AddArtifactsResponse.ArtifactSummary] = []
496+
for summary in response.artifacts:
497+
summaries.append(summary)
498+
# TODO(SPARK-42658): Handle responses containing CRC failures.
499+
return hashes

python/pyspark/sql/connect/client/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,6 +2003,12 @@ def cache_artifact(self, blob: bytes) -> str:
20032003
return self._artifact_manager.cache_artifact(blob)
20042004
raise SparkConnectException("Invalid state during retry exception handling.")
20052005

2006+
def cache_artifacts(self, blobs: list[bytes]) -> list[str]:
2007+
for attempt in self._retrying():
2008+
with attempt:
2009+
return self._artifact_manager.cache_artifacts(blobs)
2010+
raise SparkConnectException("Invalid state during retry exception handling.")
2011+
20062012
def _verify_response_integrity(
20072013
self,
20082014
response: Union[

python/pyspark/sql/connect/functions/builtin.py

Lines changed: 164 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3945,45 +3945,195 @@ def make_time(hour: "ColumnOrName", minute: "ColumnOrName", second: "ColumnOrNam
39453945
make_time.__doc__ = pysparkfuncs.make_time.__doc__
39463946

39473947

3948+
@overload
39483949
def make_timestamp(
39493950
years: "ColumnOrName",
39503951
months: "ColumnOrName",
39513952
days: "ColumnOrName",
39523953
hours: "ColumnOrName",
39533954
mins: "ColumnOrName",
39543955
secs: "ColumnOrName",
3956+
) -> Column:
3957+
...
3958+
3959+
3960+
@overload
3961+
def make_timestamp(
3962+
years: "ColumnOrName",
3963+
months: "ColumnOrName",
3964+
days: "ColumnOrName",
3965+
hours: "ColumnOrName",
3966+
mins: "ColumnOrName",
3967+
secs: "ColumnOrName",
3968+
timezone: "ColumnOrName",
3969+
) -> Column:
3970+
...
3971+
3972+
3973+
@overload
3974+
def make_timestamp(*, date: "ColumnOrName", time: "ColumnOrName") -> Column:
3975+
...
3976+
3977+
3978+
@overload
3979+
def make_timestamp(
3980+
*, date: "ColumnOrName", time: "ColumnOrName", timezone: "ColumnOrName"
3981+
) -> Column:
3982+
...
3983+
3984+
3985+
def make_timestamp(
3986+
years: Optional["ColumnOrName"] = None,
3987+
months: Optional["ColumnOrName"] = None,
3988+
days: Optional["ColumnOrName"] = None,
3989+
hours: Optional["ColumnOrName"] = None,
3990+
mins: Optional["ColumnOrName"] = None,
3991+
secs: Optional["ColumnOrName"] = None,
39553992
timezone: Optional["ColumnOrName"] = None,
3993+
date: Optional["ColumnOrName"] = None,
3994+
time: Optional["ColumnOrName"] = None,
39563995
) -> Column:
3957-
if timezone is not None:
3958-
return _invoke_function_over_columns(
3959-
"make_timestamp", years, months, days, hours, mins, secs, timezone
3960-
)
3996+
if years is not None:
3997+
if any(arg is not None for arg in [date, time]):
3998+
raise PySparkValueError(
3999+
errorClass="CANNOT_SET_TOGETHER",
4000+
messageParameters={"arg_list": "years|months|days|hours|mins|secs and date|time"},
4001+
)
4002+
if timezone is not None:
4003+
return _invoke_function_over_columns(
4004+
"make_timestamp",
4005+
cast("ColumnOrName", years),
4006+
cast("ColumnOrName", months),
4007+
cast("ColumnOrName", days),
4008+
cast("ColumnOrName", hours),
4009+
cast("ColumnOrName", mins),
4010+
cast("ColumnOrName", secs),
4011+
cast("ColumnOrName", timezone),
4012+
)
4013+
else:
4014+
return _invoke_function_over_columns(
4015+
"make_timestamp",
4016+
cast("ColumnOrName", years),
4017+
cast("ColumnOrName", months),
4018+
cast("ColumnOrName", days),
4019+
cast("ColumnOrName", hours),
4020+
cast("ColumnOrName", mins),
4021+
cast("ColumnOrName", secs),
4022+
)
39614023
else:
3962-
return _invoke_function_over_columns(
3963-
"make_timestamp", years, months, days, hours, mins, secs
3964-
)
4024+
if any(arg is not None for arg in [years, months, days, hours, mins, secs]):
4025+
raise PySparkValueError(
4026+
errorClass="CANNOT_SET_TOGETHER",
4027+
messageParameters={"arg_list": "years|months|days|hours|mins|secs and date|time"},
4028+
)
4029+
if timezone is not None:
4030+
return _invoke_function_over_columns(
4031+
"make_timestamp",
4032+
cast("ColumnOrName", date),
4033+
cast("ColumnOrName", time),
4034+
cast("ColumnOrName", timezone),
4035+
)
4036+
else:
4037+
return _invoke_function_over_columns(
4038+
"make_timestamp", cast("ColumnOrName", date), cast("ColumnOrName", time)
4039+
)
39654040

39664041

39674042
make_timestamp.__doc__ = pysparkfuncs.make_timestamp.__doc__
39684043

39694044

4045+
@overload
39704046
def try_make_timestamp(
39714047
years: "ColumnOrName",
39724048
months: "ColumnOrName",
39734049
days: "ColumnOrName",
39744050
hours: "ColumnOrName",
39754051
mins: "ColumnOrName",
39764052
secs: "ColumnOrName",
4053+
) -> Column:
4054+
...
4055+
4056+
4057+
@overload
4058+
def try_make_timestamp(
4059+
years: "ColumnOrName",
4060+
months: "ColumnOrName",
4061+
days: "ColumnOrName",
4062+
hours: "ColumnOrName",
4063+
mins: "ColumnOrName",
4064+
secs: "ColumnOrName",
4065+
timezone: "ColumnOrName",
4066+
) -> Column:
4067+
...
4068+
4069+
4070+
@overload
4071+
def try_make_timestamp(*, date: "ColumnOrName", time: "ColumnOrName") -> Column:
4072+
...
4073+
4074+
4075+
@overload
4076+
def try_make_timestamp(
4077+
*, date: "ColumnOrName", time: "ColumnOrName", timezone: "ColumnOrName"
4078+
) -> Column:
4079+
...
4080+
4081+
4082+
def try_make_timestamp(
4083+
years: Optional["ColumnOrName"] = None,
4084+
months: Optional["ColumnOrName"] = None,
4085+
days: Optional["ColumnOrName"] = None,
4086+
hours: Optional["ColumnOrName"] = None,
4087+
mins: Optional["ColumnOrName"] = None,
4088+
secs: Optional["ColumnOrName"] = None,
39774089
timezone: Optional["ColumnOrName"] = None,
4090+
date: Optional["ColumnOrName"] = None,
4091+
time: Optional["ColumnOrName"] = None,
39784092
) -> Column:
3979-
if timezone is not None:
3980-
return _invoke_function_over_columns(
3981-
"try_make_timestamp", years, months, days, hours, mins, secs, timezone
3982-
)
4093+
if years is not None:
4094+
if any(arg is not None for arg in [date, time]):
4095+
raise PySparkValueError(
4096+
errorClass="CANNOT_SET_TOGETHER",
4097+
messageParameters={"arg_list": "years|months|days|hours|mins|secs and date|time"},
4098+
)
4099+
if timezone is not None:
4100+
return _invoke_function_over_columns(
4101+
"try_make_timestamp",
4102+
cast("ColumnOrName", years),
4103+
cast("ColumnOrName", months),
4104+
cast("ColumnOrName", days),
4105+
cast("ColumnOrName", hours),
4106+
cast("ColumnOrName", mins),
4107+
cast("ColumnOrName", secs),
4108+
cast("ColumnOrName", timezone),
4109+
)
4110+
else:
4111+
return _invoke_function_over_columns(
4112+
"try_make_timestamp",
4113+
cast("ColumnOrName", years),
4114+
cast("ColumnOrName", months),
4115+
cast("ColumnOrName", days),
4116+
cast("ColumnOrName", hours),
4117+
cast("ColumnOrName", mins),
4118+
cast("ColumnOrName", secs),
4119+
)
39834120
else:
3984-
return _invoke_function_over_columns(
3985-
"try_make_timestamp", years, months, days, hours, mins, secs
3986-
)
4121+
if any(arg is not None for arg in [years, months, days, hours, mins, secs]):
4122+
raise PySparkValueError(
4123+
errorClass="CANNOT_SET_TOGETHER",
4124+
messageParameters={"arg_list": "years|months|days|hours|mins|secs and date|time"},
4125+
)
4126+
if timezone is not None:
4127+
return _invoke_function_over_columns(
4128+
"try_make_timestamp",
4129+
cast("ColumnOrName", date),
4130+
cast("ColumnOrName", time),
4131+
cast("ColumnOrName", timezone),
4132+
)
4133+
else:
4134+
return _invoke_function_over_columns(
4135+
"try_make_timestamp", cast("ColumnOrName", date), cast("ColumnOrName", time)
4136+
)
39874137

39884138

39894139
try_make_timestamp.__doc__ = pysparkfuncs.try_make_timestamp.__doc__

0 commit comments

Comments
 (0)