-
Notifications
You must be signed in to change notification settings - Fork 260
Feat: Add Doris adapter #5033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat: Add Doris adapter #5033
Conversation
Hey @xinge-ji , let us know when you'd like us to review. We typically ignore a PR still in 'Draft' state, and also PR's with failing tests and merge conflicts with main (with the assumption that the requester is still working on the PR) |
There are still some CI tests failing at the moment, and it seems to be due to |
IntegrationTestEngine, | ||
) | ||
|
||
pytestmark = [pytest.mark.doris, pytest.mark.engine, pytest.mark.slow] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this pytestmark
is why the "style_and_cicd_tests" task is failing - because it's causing the tests in this file to get included when they shouldnt be, they should only be included in the "engine_doris" task.
If you remove it, "style_and_cicd_tests" should stop failing on this test. See the other test_integration_<engine>.py
files for an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this
pytestmark
is why the "style_and_cicd_tests" task is failing - because it's causing the tests in this file to get included when they shouldnt be, they should only be included in the "engine_doris" task.If you remove it, "style_and_cicd_tests" should stop failing on this test. See the other
test_integration_<engine>.py
files for an example
Thank you! This is now ready for review.
You can adjust the ci environment by editing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this very complete PR @xinge-ji , nice work!
I've completed a first pass reviewing it, let me know if anything seems off.
I've never used Doris before so i'm approaching it from the perspective of how the other engine adapters tend to work and the general concepts established within the SQLMesh codebase
docs/integrations/engines/doris.md
Outdated
|
||
## Table Properties | ||
|
||
The Doris adapter supports a comprehensive set of table properties that can be configured in the `physical_properties` section of your model. These properties are processed by the `_build_table_properties_exp` method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_build_table_properties_exp
is an internal method of EngineAdapter
and doesnt really belong in user-facing docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the reference to _build_table_properties_exp
from the doc.
docs/integrations/engines/doris.md
Outdated
kind INCREMENTAL_BY_TIME_RANGE(time_column (event_date, '%Y-%m-%d')), | ||
partitioned_by event_date | ||
physical_properties ( | ||
partitioned_by_expr = 'FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 2 YEAR', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLMesh does support expressions in the existing partitioned_by
property, eg:
partitioned_by date_trunc(event_timestamp)
or eg for Iceberg transforms on Athena/Trino:
partitioned_by (day(cola), truncate(colb, 8))
Is there a reason why we couldn't make this more ergonomic for Doris? eg something like:
MODEL (
name my_partitioned_model,
...
partitioned_by RANGE(event_date), --or LIST(event_date) for list partitioning
physical_properties (
partitions = (
`default` FROM ("2000-11-14") TO ("2099-11-14") INTERVAL 2 YEAR),
`p2025` VALUES [("2025-01-01"), ("2026-01-01"))',
`other` VALUES LESS THAN MAXVALUE
)
)
to define the partition type (RANGE
, LIST
- etc) in partitioned_by
and then the partitions themselves in physical_properties
.
I appreciate this might require some tweaks to the parser to achieve, let me know what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have adjusted the parser to support partitioned_by RANGE(event_date)
/ LIST(values)
syntax and defined the partition ranges/bounds in physical_properties
as partitions
as you suggested.
@@ -50,6 +50,8 @@ | |||
"trino", | |||
# Nullable types are problematic | |||
"clickhouse", | |||
# Do not support table name starts with "_" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, that's quite a random limitation :)
sqlmesh/core/engine_adapter/doris.py
Outdated
df = self.fetchdf(query) | ||
|
||
result = [] | ||
for row in df.itertuples(index=False, name=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.fetchall()
might be better here because it already returns a list of tuples. Something like:
for schema_name, table_name, table_type in self.fetchall(query):
... rest of logic ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I have updated the code.
sqlmesh/core/engine_adapter/doris.py
Outdated
for k, v in properties.items(): | ||
v_value = v.this if isinstance(v, exp.Literal) else str(v) | ||
props.append(f"'{k}'='{v_value}'") | ||
doris_clauses.append(f"PROPERTIES ({', '.join(props)})") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting into the realm of raw string concatenation which we generally try to avoid.
SQLGlot has exp.Properties
for this, eg:
>>> exp.Properties(expressions=[
exp.Property(this="foo", value=exp.Literal.string("bar")),
exp.Property(this="baz", value=exp.Literal.string("bing")),
]).sql(dialect="doris")
"PROPERTIES ('foo'='bar', 'baz'='bing')"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored the implementation to avoid raw string concatenation
@@ -493,6 +494,18 @@ def get_table_comment( | |||
""" | |||
elif self.dialect == "clickhouse": | |||
query = f"SELECT name, comment FROM system.tables WHERE database = '{schema_name}' AND name = '{table_name}'" | |||
elif self.dialect == "doris": | |||
# Doris uses MySQL-compatible information_schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's true, should doris be added to the elif self.dialect in ["mysql", "snowflake"]
branch instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I have moved it to the mysql
branch
@@ -588,6 +601,18 @@ def get_column_comments( | |||
elif self.dialect in ["spark", "databricks", "clickhouse"]: | |||
query = f"DESCRIBE TABLE {schema_name}.{table_name}" | |||
comment_index = 2 if self.dialect in ["spark", "databricks"] else 4 | |||
elif self.dialect == "doris": | |||
# Doris uses MySQL-compatible information_schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this, can it be added to the MySQL branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved it to the mysql branch
container_name: doris-fe-01 | ||
hostname: fe-01 | ||
environment: | ||
- FE_SERVERS=fe1:172.20.80.2:9030 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FMI: why do we need to hardcode IP addresses like this? Does Doris not support DNS lookup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The official Docker example uses hardcoded IPs. When I was setting up the development environment, I attempted to replace the IPs with localhost for simplicity, but the internal container networking did not resolve it correctly, and the setup failed.
# Wait for the materialized view to be created | ||
import time | ||
|
||
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.sleep()
's like this tend to lead to brittle tests. Are you able to use the tenacity
module used elsewhere in the codebase instead? It can retry on a cadence and give up after a certain limit has passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I switched to tenacity
|
||
# Convert dates based on ds_type | ||
if ctx.dialect == "doris": | ||
# For Doris with DATE type, use pandas date objects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary vs what this test was doing before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old test used hardcoded dates from 2022-01-01
. In a Doris table with dynamic partitioning enabled, there's a default limit (max_dynamic_partition_num
) of 500 partitions. If the date in the test is too far in the past (like 2022), it can fall outside the range of partitions that Doris automatically manages, causing the insert to fail.
05f42b9
to
380a969
Compare
81af28d
to
870675d
Compare
Engine Adapter Implementation
Doris Adapter: Implemented
DorisEngineAdapter
to support Doris-specific SQL behavior (sqlmesh/core/engine_adapter/doris.py
)Connection Configuration
Doris Connection Config: Added a new
DorisConnectionConfig
, including support for basic authentication and configurable HTTP port (sqlmesh/core/config/connection.py
)Documentation Updates
Doris Guide: Added a detailed guide for Doris integration, covering setup, model support, connection configuration, and limitations (
docs/integrations/engines/doris.md
)