Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ lazy_static.workspace = true
once_cell.workspace = true
prometheus = { workspace = true, optional = true }
uuid.workspace = true
indexmap.workspace = true
gethostname.workspace = true
serde_json.workspace = true
serde_yml.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions backend/shard-migrations/20250922155947_v2_job.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Add down migration script here

DROP TYPE IF EXISTS job_status CASCADE;
DROP TYPE IF EXISTS job_kind CASCADE;
DROP TYPE IF EXISTS job_trigger_kind CASCADE;
DROP TYPE IF EXISTS script_lang CASCADE;
DROP TABLE IF EXISTS v2_job;

79 changes: 79 additions & 0 deletions backend/shard-migrations/20250922155947_v2_job.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
-- Add up migration script here

CREATE TYPE job_kind AS ENUM (
'script', 'preview', 'flow', 'dependencies', 'flowpreview', 'script_hub',
'identity', 'flowdependencies', 'http', 'graphql', 'postgresql', 'noop',
'appdependencies', 'deploymentcallback', 'singlescriptflow', 'flowscript',
'flownode', 'appscript'
);


CREATE TYPE job_status AS ENUM ('success', 'failure', 'canceled', 'skipped');



CREATE TYPE job_trigger_kind AS ENUM (
'webhook', 'http', 'websocket', 'kafka', 'email', 'nats', 'schedule',
'app', 'ui', 'postgres', 'sqs', 'gcp', 'mqtt'
);

CREATE TYPE script_lang AS ENUM (
'python3', 'deno', 'go', 'bash', 'postgresql', 'nativets', 'bun', 'mysql',
'bigquery', 'snowflake', 'graphql', 'powershell', 'mssql', 'php', 'bunnative',
'rust', 'ansible', 'csharp', 'oracledb', 'nu', 'java', 'duckdb'
);


CREATE TABLE v2_job (
id UUID PRIMARY KEY,
raw_code TEXT,
raw_lock TEXT,
raw_flow JSONB,
tag VARCHAR(255),
workspace_id VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_by VARCHAR(100) NOT NULL,
permissioned_as VARCHAR(100) NOT NULL,
permissioned_as_email VARCHAR(500),
kind job_kind NOT NULL,
runnable_id BIGINT,
runnable_path VARCHAR(500),
parent_job UUID,
root_job UUID,
script_lang script_lang,
script_entrypoint_override VARCHAR(500),
flow_step INTEGER,
flow_step_id VARCHAR(100),
flow_innermost_root_job UUID,
trigger VARCHAR(500),
trigger_kind job_trigger_kind,
same_worker BOOLEAN NOT NULL DEFAULT FALSE,
visible_to_owner BOOLEAN NOT NULL DEFAULT TRUE,
concurrent_limit INTEGER,
concurrency_time_window_s INTEGER,
cache_ttl INTEGER,
timeout INTEGER,
priority SMALLINT,
preprocessed BOOLEAN NOT NULL DEFAULT FALSE,
args JSONB,
labels TEXT[],
pre_run_error TEXT
);

-- Create indices for v2_job
CREATE INDEX IF NOT EXISTS ix_job_created_at ON v2_job (created_at DESC);
CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_3 ON v2_job (workspace_id, created_at DESC);
CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_5 ON v2_job (workspace_id, created_at DESC)
WHERE ((kind = ANY (ARRAY['preview'::job_kind, 'flowpreview'::job_kind]))
AND (parent_job IS NULL));
CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_8 ON v2_job (workspace_id, created_at DESC)
WHERE ((kind = 'deploymentcallback'::job_kind) AND (parent_job IS NULL));
CREATE INDEX IF NOT EXISTS ix_job_workspace_id_created_at_new_9 ON v2_job (workspace_id, created_at DESC)
WHERE ((kind = ANY (ARRAY['dependencies'::job_kind, 'flowdependencies'::job_kind, 'appdependencies'::job_kind]))
AND (parent_job IS NULL));
CREATE INDEX IF NOT EXISTS ix_v2_job_labels ON v2_job USING gin (labels) WHERE (labels IS NOT NULL);
CREATE INDEX IF NOT EXISTS ix_v2_job_workspace_id_created_at ON v2_job (workspace_id, created_at DESC)
WHERE ((kind = ANY (ARRAY['script'::job_kind, 'flow'::job_kind, 'singlescriptflow'::job_kind]))
AND (parent_job IS NULL));
CREATE INDEX IF NOT EXISTS ix_job_root_job_index_by_path_2 ON v2_job (workspace_id, runnable_path, created_at DESC)
WHERE (parent_job IS NULL);
2 changes: 2 additions & 0 deletions backend/shard-migrations/20250922160402_v2_queue.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS v2_job_queue;
25 changes: 25 additions & 0 deletions backend/shard-migrations/20250922160402_v2_queue.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS v2_job_queue (
id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE,
workspace_id VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
running BOOLEAN NOT NULL DEFAULT FALSE,
canceled_by VARCHAR(100),
canceled_reason TEXT,
suspend INTEGER,
suspend_until TIMESTAMP WITH TIME ZONE,
tag VARCHAR(255),
priority SMALLINT,
worker VARCHAR(100),
extras JSONB
);

-- Create indices for v2_job_queue
CREATE INDEX IF NOT EXISTS queue_sort_v2 ON v2_job_queue (priority DESC NULLS LAST, scheduled_for, tag)
WHERE (running = false);
CREATE INDEX IF NOT EXISTS queue_suspended ON v2_job_queue (priority DESC NULLS LAST, created_at, suspend_until, suspend, tag)
WHERE (suspend_until IS NOT NULL);
CREATE INDEX IF NOT EXISTS root_queue_index_by_path ON v2_job_queue (workspace_id, created_at);
CREATE INDEX IF NOT EXISTS v2_job_queue_suspend ON v2_job_queue (workspace_id, suspend) WHERE (suspend > 0);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add down migration script here

DROP TABLE IF EXISTS v2_job_completed CASCADE;
26 changes: 26 additions & 0 deletions backend/shard-migrations/20250922160457_v2_job_completed.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS v2_job_completed (
id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE,
workspace_id VARCHAR(100) NOT NULL,
duration_ms BIGINT,
result JSONB,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
canceled_by VARCHAR(100),
canceled_reason TEXT,
flow_status JSONB,
started_at TIMESTAMP WITH TIME ZONE,
memory_peak INTEGER,
status job_status NOT NULL,
completed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
worker VARCHAR(100),
workflow_as_code_status JSONB,
result_columns TEXT[],
retries UUID[],
extras JSONB
);

-- Create indices for v2_job_completed
CREATE INDEX IF NOT EXISTS ix_completed_job_workspace_id_started_at_new_2 ON v2_job_completed (workspace_id, started_at DESC);
CREATE INDEX IF NOT EXISTS ix_job_completed_completed_at ON v2_job_completed (completed_at DESC);
CREATE INDEX IF NOT EXISTS labeled_jobs_on_jobs ON v2_job_completed USING gin (((result -> 'wm_labels'::text)))
WHERE (result ? 'wm_labels'::text);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS v2_job_runtime CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS v2_job_runtime (
id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE,
ping TIMESTAMP WITH TIME ZONE,
memory_peak INTEGER
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS v2_job_status CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS v2_job_status (
id UUID PRIMARY KEY REFERENCES v2_job(id) ON DELETE CASCADE,
flow_status JSONB,
flow_leaf_jobs JSONB,
workflow_as_code_status JSONB
);
2 changes: 2 additions & 0 deletions backend/shard-migrations/20250922160649_job_logs.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS job_logs CASCADE;
14 changes: 14 additions & 0 deletions backend/shard-migrations/20250922160649_job_logs.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS job_logs (
job_id UUID NOT NULL,
workspace_id VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
logs TEXT,
log_offset INTEGER,
log_file_index TEXT[],
PRIMARY KEY (job_id, log_offset)
);

-- Create indices for job_logs
CREATE INDEX IF NOT EXISTS job_logs_job_id_idx ON job_logs (job_id);
CREATE INDEX IF NOT EXISTS job_logs_workspace_id_created_at_idx ON job_logs (workspace_id, created_at DESC);
2 changes: 2 additions & 0 deletions backend/shard-migrations/20250922161225_job_perms.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS job_perms CASCADE;
13 changes: 13 additions & 0 deletions backend/shard-migrations/20250922161225_job_perms.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS job_perms (
job_id UUID NOT NULL,
email VARCHAR(255) NOT NULL,
username VARCHAR(50) NOT NULL,
is_admin BOOLEAN NOT NULL,
is_operator BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
workspace_id VARCHAR(50) NOT NULL,
groups TEXT[] NOT NULL,
folders JSONB[] NOT NULL,
CONSTRAINT job_perms_pk PRIMARY KEY (job_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS outstanding_wait_time CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS outstanding_wait_time (
job_id UUID PRIMARY KEY,
self_wait_time_ms BIGINT DEFAULT NULL,
aggregate_wait_time_ms BIGINT DEFAULT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP VIEW v2_as_queue;
53 changes: 53 additions & 0 deletions backend/shard-migrations/20250924160628_v2_as_queue.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- Add up migration script here
-- Add up migration script here
CREATE OR REPLACE VIEW v2_as_queue AS
SELECT
j.id,
j.workspace_id,
j.parent_job,
j.created_by,
j.created_at,
q.started_at,
q.scheduled_for,
q.running,
j.runnable_id AS script_hash,
j.runnable_path AS script_path,
j.args,
j.raw_code,
q.canceled_by IS NOT NULL AS canceled,
q.canceled_by,
q.canceled_reason,
r.ping AS last_ping,
j.kind AS job_kind,
CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END
AS schedule_path,
j.permissioned_as,
COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status,
j.raw_flow,
j.flow_step_id IS NOT NULL AS is_flow_step,
j.script_lang AS language,
q.suspend,
q.suspend_until,
j.same_worker,
j.raw_lock,
j.pre_run_error,
j.permissioned_as_email AS email,
j.visible_to_owner,
r.memory_peak AS mem_peak,
j.flow_innermost_root_job AS root_job,
s.flow_leaf_jobs AS leaf_jobs,
j.tag,
j.concurrent_limit,
j.concurrency_time_window_s,
j.timeout,
j.flow_step_id,
j.cache_ttl,
j.priority,
NULL::TEXT AS logs,
j.script_entrypoint_override,
j.preprocessed
FROM v2_job_queue q
JOIN v2_job j USING (id)
LEFT JOIN v2_job_runtime r USING (id)
LEFT JOIN v2_job_status s USING (id)
;
Loading
Loading