Skip to content

CollierKing/sqlalchemy-cloudflare-d1

Repository files navigation

SQLAlchemy      Cloudflare D1

SQLAlchemy Cloudflare D1 Dialect

A SQLAlchemy dialect for Cloudflare's D1 Serverless SQLite Database supporting both the REST API and Python Workers.

Features

  • Full SQLAlchemy ORM and Core support
  • Sync and async engines via D1 REST API (create_engine and create_async_engine)
  • Python Workers support with direct D1 binding (create_engine_from_binding)
  • SQLite/D1 compatible SQL compilation
  • Prepared statement support with parameter binding
  • pandas DataFrame.to_sql() with upsert support
  • JSON column filtering with json_each()
  • Type mapping for D1/SQLite data types

Installation

pip install sqlalchemy-cloudflare-d1

For async SQLAlchemy engine support (create_async_engine):

pip install sqlalchemy-cloudflare-d1[async]

Why two install options? SQLAlchemy's async engine requires greenlet, a C extension that doesn't work in Cloudflare Workers (Pyodide). The base install works everywhere, including Workers. The [async] extra adds greenlet for server-side async engine usage.

Or install from source:

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1
pip install -e ".[async]"

Prerequisites

  1. A Cloudflare account with D1 enabled
  2. A D1 database created via the Cloudflare dashboard or CLI
  3. A Cloudflare API token with D1 permissions

Creating a D1 Database

Using the Cloudflare CLI:

wrangler d1 create my-database

Or via the Cloudflare dashboard.

Creating an API Token

  1. Go to Cloudflare API Tokens
  2. Click "Create Token"
  3. Use the "Custom token" template
  4. Add permissions: Account:D1:Edit
  5. Add your account in "Account Resources"
  6. Click "Continue to summary" and "Create Token"

Usage

Connection String Format

from sqlalchemy import create_engine

# Format: cloudflare_d1://account_id:api_token@database_id
engine = create_engine(
    "cloudflare_d1://your_account_id:your_api_token@your_database_id"
)

Basic Example

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Create engine
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id"
)

# Create base and define model
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

# Create tables
Base.metadata.create_all(engine)

# Create session and add data
Session = sessionmaker(bind=engine)
session = Session()

# Add a user
user = User(name="Alice", email="[email protected]")
session.add(user)
session.commit()

# Query users
users = session.query(User).all()
for user in users:
    print(f"{user.name}: {user.email}")

session.close()

Core API Example

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('email', String(100))
)

# Create table
metadata.create_all(engine)

with engine.connect() as conn:
    # Insert data
    conn.execute(users.insert().values(name="Bob", email="[email protected]"))

    # Query data
    result = conn.execute(select(users))
    for row in result:
        print(row)

Raw SQL Example

from sqlalchemy import create_engine, text

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

with engine.connect() as conn:
    # Execute raw SQL
    result = conn.execute(text("SELECT * FROM sqlite_master WHERE type='table'"))

    for row in result:
        print(row)

Async Engine Example

For async applications, use create_async_engine (requires pip install sqlalchemy-cloudflare-d1[async]):

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import select, Column, Integer, String, MetaData, Table

# Note the +async suffix in the URL
engine = create_async_engine(
    "cloudflare_d1+async://account_id:api_token@database_id"
)

metadata = MetaData()
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50))
)

async def main():
    # Create tables
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

    # Query data
    async with engine.connect() as conn:
        result = await conn.execute(select(users))
        rows = result.fetchall()
        for row in rows:
            print(row)

    await engine.dispose()

Async Connection (Without SQLAlchemy)

For direct async access without SQLAlchemy overhead:

from sqlalchemy_cloudflare_d1 import AsyncConnection

async with AsyncConnection(
    account_id="your_account_id",
    database_id="your_database_id",
    api_token="your_api_token",
) as conn:
    cursor = await conn.cursor()
    await cursor.execute("SELECT * FROM users WHERE name = ?", ("Alice",))
    rows = await cursor.fetchall()
    print(rows)

Python Workers Example

Inside Cloudflare Python Workers, use create_engine_from_binding() for direct D1 binding access (no REST API calls):

from workers import WorkerEntrypoint
from sqlalchemy import MetaData, Table, select
from sqlalchemy_cloudflare_d1 import create_engine_from_binding

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        # Create engine from D1 binding (defined in wrangler.toml)
        engine = create_engine_from_binding(self.env.DB)

        # Use SQLAlchemy Core as normal
        metadata = MetaData()
        users = Table('users', metadata, autoload_with=engine)

        with engine.connect() as conn:
            result = conn.execute(select(users).limit(10))
            rows = result.fetchall()

        return Response.json({"users": [dict(row) for row in rows]})

For raw cursor access in Workers:

from sqlalchemy_cloudflare_d1 import WorkerConnection

class MyWorker(WorkerEntrypoint):
    async def fetch(self, request):
        conn = WorkerConnection(self.env.DB)
        cursor = conn.cursor()
        await cursor.execute_async("SELECT * FROM users")
        rows = cursor.fetchall()
        conn.close()
        return Response.json({"users": rows})

pandas DataFrame.to_sql()

Insert DataFrames with conflict handling:

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import insert as sqlite_insert

engine = create_engine("cloudflare_d1://account_id:api_token@database_id")

df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "score": [85, 92, 78]
})

# Basic insert
df.to_sql("scores", con=engine, if_exists="append", index=False)

# Upsert with OR REPLACE (updates existing rows on conflict)
def sqlite_upsert(table, conn, keys, data_iter):
    from sqlalchemy.dialects.sqlite import insert as sqlite_insert
    sa_table = getattr(table, "table", table)
    rows = [dict(zip(keys, row)) for row in data_iter]
    if rows:
        stmt = sqlite_insert(sa_table).values(rows).prefix_with("OR REPLACE")
        conn.execute(stmt)

df.to_sql("scores", con=engine, if_exists="append", index=False, method=sqlite_upsert)

JSON Column Filtering

Query JSON arrays stored in TEXT columns using SQLite's json_each():

import json
from sqlalchemy import MetaData, Table, Column, Integer, String, select, exists, func

metadata = MetaData()
posts = Table('posts', metadata,
    Column('id', Integer, primary_key=True),
    Column('title', String(100)),
    Column('tags', String),  # JSON array stored as TEXT: '["python", "sql"]'
)

with engine.connect() as conn:
    # Find posts tagged with "python"
    je = func.json_each(posts.c.tags).table_valued("value").alias("je")
    stmt = (
        select(posts.c.title)
        .where(exists(
            select(1).select_from(je).where(je.c.value == "python")
        ))
    )
    result = conn.execute(stmt)
    for row in result:
        print(row.title)

Configuration

Connection Parameters

You can pass additional parameters via the connection string or engine creation:

from sqlalchemy import create_engine

# Via connection string query parameters
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id?timeout=60"
)

# Via connect_args
engine = create_engine(
    "cloudflare_d1://account_id:api_token@database_id",
    connect_args={
        "timeout": 60,
    }
)

Environment Variables

You can also use environment variables:

import os
from sqlalchemy import create_engine

engine = create_engine(
    f"cloudflare_d1://{os.getenv('CF_ACCOUNT_ID')}:"
    f"{os.getenv('CF_API_TOKEN')}@{os.getenv('CF_DATABASE_ID')}"
)

Limitations

This dialect has some limitations due to D1's REST API nature:

  1. No transactions: D1 REST API doesn't support explicit transactions. Each query is auto-committed.
  2. No isolation levels: Connection isolation levels are not supported.
  3. Limited concurrency: Connections are HTTP-based, not persistent database connections.
  4. No stored procedures: D1 doesn't support stored procedures or custom functions.
  5. Rate limiting: Subject to Cloudflare API rate limits.

Known Issues Fixed

  • Empty result sets (v0.3.0+): cursor.description is now correctly populated even when queries return zero rows, fixing NoSuchColumnError exceptions in SQLAlchemy.

Type Mapping

SQLAlchemy Type D1/SQLite Type Notes
Integer INTEGER
String(n) VARCHAR(n)
Text TEXT
Float REAL
Numeric NUMERIC
Boolean INTEGER Stored as 0/1
DateTime TEXT ISO format string
Date TEXT ISO format string
Time TEXT ISO format string

Error Handling

The dialect will raise appropriate SQLAlchemy exceptions:

from sqlalchemy.exc import SQLAlchemyError, OperationalError

try:
    result = conn.execute("SELECT * FROM nonexistent_table")
except OperationalError as e:
    print(f"Database error: {e}")
except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

Development

For detailed development instructions, see .github/DEVELOPMENT.md.

Quick Start

git clone https://github.com/collierking/sqlalchemy-cloudflare-d1.git
cd sqlalchemy-cloudflare-d1

# Install dependencies and setup pre-commit hooks
make install
make setup_hooks

# Run tests and linting
make check

# Build package
make build

Development Tools

  • Ruff: Fast Python linter and formatter
  • mypy: Static type checking
  • codespell: Spell checking
  • pre-commit: Automated pre-commit checks
  • pytest: Testing framework with socket control

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Related Projects

Support

About

A SQLAlchemy dialect for Cloudflare's D1 Severless Sqlite Database

Resources

License

Stars

Watchers

Forks

Packages

No packages published