Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions core/src/datayoga_core/blocks/relational/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
from enum import Enum, unique
from typing import Callable, Tuple
from typing import Tuple

import sqlalchemy as sa
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.pool import ConnectionPoolEntry

Expand Down Expand Up @@ -78,34 +77,9 @@ def get_engine(connection_name: str, context: Context, autocommit: bool = True)
# add NLS_DATE_FORMAT for Oracle to explicitly set the date format to ISO
sa.event.listen(engine, "connect", alter_session_on_oracle_connect)

if engine.driver == "pymysql":
# add a listener to configure PyMySQL dialect settings
sa.event.listen(engine, "connect", create_pymysql_dialect_configuration(engine))
return engine, db_type


def create_pymysql_dialect_configuration(engine: Engine) -> Callable[[DBAPIConnection, ConnectionPoolEntry], None]:
"""Creates a function to configure the PyMySQL dialect.

Args:
engine (Engine): The SQLAlchemy engine to configure.

Returns:
Callable[[DBAPIConnection, ConnectionPoolEntry], None]: A function that configures PyMySQL dialect settings.
"""
def configure_pymysql_dialect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry):
"""Configures settings specific to PyMySQL on each connection.

Args:
dbapi_connection (DBAPIConnection): DB-API connection object from PyMySQL.
connection_record (ConnectionPoolEntry): Connection record managed by SQLAlchemy.
"""
# Disable PyMySQL's handling of ON DUPLICATE KEY UPDATE alias requirement
engine.dialect._requires_alias_for_on_duplicate_key = False

return configure_pymysql_dialect


def alter_session_on_oracle_connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry):
"""SQLAlchemy event listener to alter the Oracle session settings upon connection.

Expand Down