diff --git a/backend/.sqlx/query-20888e946a5597989488e2a54e354e4ce2872f2bc3af7b26fcaafe8703ba70bf.json b/backend/.sqlx/query-20888e946a5597989488e2a54e354e4ce2872f2bc3af7b26fcaafe8703ba70bf.json deleted file mode 100644 index 1f1790fb167de..0000000000000 --- a/backend/.sqlx/query-20888e946a5597989488e2a54e354e4ce2872f2bc3af7b26fcaafe8703ba70bf.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE flow_conversation_message\n SET content = $1\n WHERE job_id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "20888e946a5597989488e2a54e354e4ce2872f2bc3af7b26fcaafe8703ba70bf" -} diff --git a/backend/.sqlx/query-255415e4b1e891ce43b92ebde534f116c4ea41ccd1bc994d59c1cd56aae71334.json b/backend/.sqlx/query-255415e4b1e891ce43b92ebde534f116c4ea41ccd1bc994d59c1cd56aae71334.json new file mode 100644 index 0000000000000..3daa40769da25 --- /dev/null +++ b/backend/.sqlx/query-255415e4b1e891ce43b92ebde534f116c4ea41ccd1bc994d59c1cd56aae71334.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT f.value as \"value: Json\" FROM v2_job j JOIN flow_version f ON f.id = j.runnable_id WHERE j.id = $1 AND j.workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value: Json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "255415e4b1e891ce43b92ebde534f116c4ea41ccd1bc994d59c1cd56aae71334" +} diff --git a/backend/.sqlx/query-e2f4eeb896c22e3215370ed30ed5cc890503e4197b0fdb3a906504eba166aa97.json b/backend/.sqlx/query-2709e8113527d4cb331c72009e95e2efe4d1b57d5da6051acfb23f89b66434fb.json similarity index 54% rename from backend/.sqlx/query-e2f4eeb896c22e3215370ed30ed5cc890503e4197b0fdb3a906504eba166aa97.json rename to backend/.sqlx/query-2709e8113527d4cb331c72009e95e2efe4d1b57d5da6051acfb23f89b66434fb.json index 46da76022759a..0d2140ee9ad2a 100644 --- a/backend/.sqlx/query-e2f4eeb896c22e3215370ed30ed5cc890503e4197b0fdb3a906504eba166aa97.json +++ b/backend/.sqlx/query-2709e8113527d4cb331c72009e95e2efe4d1b57d5da6051acfb23f89b66434fb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT EXISTS(SELECT 1 FROM flow_conversation_message WHERE job_id = $1 AND message_type = 'assistant')", + "query": "SELECT EXISTS(SELECT 1 FROM instance_group WHERE name = $1)", "describe": { "columns": [ { @@ -11,12 +11,12 @@ ], "parameters": { "Left": [ - "Uuid" + "Text" ] }, "nullable": [ null ] }, - "hash": "e2f4eeb896c22e3215370ed30ed5cc890503e4197b0fdb3a906504eba166aa97" + "hash": "2709e8113527d4cb331c72009e95e2efe4d1b57d5da6051acfb23f89b66434fb" } diff --git a/backend/.sqlx/query-298f8609319a2928257fd5be60bb37f292c786d2348efe11d19868e5dc8fba11.json b/backend/.sqlx/query-298f8609319a2928257fd5be60bb37f292c786d2348efe11d19868e5dc8fba11.json deleted file mode 100644 index 605025b9978d3..0000000000000 --- a/backend/.sqlx/query-298f8609319a2928257fd5be60bb37f292c786d2348efe11d19868e5dc8fba11.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT elem FROM (SELECT unnest($1::TEXT[]) AS elem) AS e\n WHERE elem NOT IN (SELECT datname FROM pg_catalog.pg_database);", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "elem", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "TextArray" - ] - }, - "nullable": [ - null - ] - }, - "hash": "298f8609319a2928257fd5be60bb37f292c786d2348efe11d19868e5dc8fba11" -} diff --git a/backend/.sqlx/query-7628e7690aaf6da3dba3cf78dc9e78fe32c3a001c08920e02fb850695afdb7ec.json b/backend/.sqlx/query-7628e7690aaf6da3dba3cf78dc9e78fe32c3a001c08920e02fb850695afdb7ec.json deleted file mode 100644 index 5d67229f62761..0000000000000 --- a/backend/.sqlx/query-7628e7690aaf6da3dba3cf78dc9e78fe32c3a001c08920e02fb850695afdb7ec.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT (flow_status->>'memory_id')::uuid as memory_id \n FROM v2_job_status \n WHERE id = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "memory_id", - "type_info": "Uuid" - } - ], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [ - null - ] - }, - "hash": "7628e7690aaf6da3dba3cf78dc9e78fe32c3a001c08920e02fb850695afdb7ec" -} diff --git a/backend/.sqlx/query-80858777f29eacf087a399eca67c398e598763051b286edcbe05112ae70521c9.json b/backend/.sqlx/query-80858777f29eacf087a399eca67c398e598763051b286edcbe05112ae70521c9.json new file mode 100644 index 0000000000000..8ad744a0dbf40 --- /dev/null +++ b/backend/.sqlx/query-80858777f29eacf087a399eca67c398e598763051b286edcbe05112ae70521c9.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n (flow_status->>'memory_id')::uuid as memory_id,\n (flow_status->>'chat_input_enabled')::boolean as chat_input_enabled\n FROM v2_job_status\n WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "memory_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "chat_input_enabled", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "80858777f29eacf087a399eca67c398e598763051b286edcbe05112ae70521c9" +} diff --git a/backend/.sqlx/query-a22e166746ee10943c737668aafd548e3adefe5847d2e35331edb61442d0dd92.json b/backend/.sqlx/query-a22e166746ee10943c737668aafd548e3adefe5847d2e35331edb61442d0dd92.json deleted file mode 100644 index 33005ff7a624a..0000000000000 --- a/backend/.sqlx/query-a22e166746ee10943c737668aafd548e3adefe5847d2e35331edb61442d0dd92.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT trim(both '\"' from value::text) FROM global_settings WHERE name = 'ducklake_user_pg_pwd';", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "btrim", - "type_info": "Text" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - null - ] - }, - "hash": "a22e166746ee10943c737668aafd548e3adefe5847d2e35331edb61442d0dd92" -} diff --git a/backend/.sqlx/query-5f57ec3330a6576f4017fd469848a1c214896dc5abbf2b898ac9a68810fd89dd.json b/backend/.sqlx/query-a739af2f72e117acc58374f6ed44f8223efa4826d6c10639f98e624474b247a3.json similarity index 56% rename from backend/.sqlx/query-5f57ec3330a6576f4017fd469848a1c214896dc5abbf2b898ac9a68810fd89dd.json rename to backend/.sqlx/query-a739af2f72e117acc58374f6ed44f8223efa4826d6c10639f98e624474b247a3.json index aee3eb4fb5674..8a64d7c0a6d68 100644 --- a/backend/.sqlx/query-5f57ec3330a6576f4017fd469848a1c214896dc5abbf2b898ac9a68810fd89dd.json +++ b/backend/.sqlx/query-a739af2f72e117acc58374f6ed44f8223efa4826d6c10639f98e624474b247a3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, conversation_id, message_type as \"message_type: MessageType\", content, job_id, created_at\n FROM (\n SELECT id, conversation_id, message_type, content, job_id, created_at\n FROM flow_conversation_message\n WHERE conversation_id = $1\n ORDER BY created_at DESC, CASE WHEN message_type = 'user' THEN 0 ELSE 1 END\n LIMIT $2 OFFSET $3\n ) AS messages\n ORDER BY created_at ASC, CASE WHEN message_type = 'user' THEN 0 ELSE 1 END\n ", + "query": "SELECT id, conversation_id, message_type as \"message_type: MessageType\", content, job_id, created_at, step_name, success\n FROM (\n SELECT id, conversation_id, message_type, content, job_id, created_at, step_name, success\n FROM flow_conversation_message\n WHERE conversation_id = $1\n ORDER BY created_at DESC, CASE WHEN message_type = 'user' THEN 0 ELSE 1 END\n LIMIT $2 OFFSET $3\n ) AS messages\n ORDER BY created_at ASC, CASE WHEN message_type = 'user' THEN 0 ELSE 1 END\n ", "describe": { "columns": [ { @@ -22,7 +22,8 @@ "kind": { "Enum": [ "user", - "assistant" + "assistant", + "tool" ] } } @@ -42,6 +43,16 @@ "ordinal": 5, "name": "created_at", "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "step_name", + "type_info": "Varchar" + }, + { + "ordinal": 7, + "name": "success", + "type_info": "Bool" } ], "parameters": { @@ -57,8 +68,10 @@ false, false, true, + false, + true, false ] }, - "hash": "5f57ec3330a6576f4017fd469848a1c214896dc5abbf2b898ac9a68810fd89dd" + "hash": "a739af2f72e117acc58374f6ed44f8223efa4826d6c10639f98e624474b247a3" } diff --git a/backend/.sqlx/query-69606859fe08d24f0306b866f9f50ad766d56a0d5aa8f0784f79aa5e211b00e4.json b/backend/.sqlx/query-b1a9a433e577133869c067b2ce383fc6ce4e9df307feb5fd3edc0d1276d61ff1.json similarity index 60% rename from backend/.sqlx/query-69606859fe08d24f0306b866f9f50ad766d56a0d5aa8f0784f79aa5e211b00e4.json rename to backend/.sqlx/query-b1a9a433e577133869c067b2ce383fc6ce4e9df307feb5fd3edc0d1276d61ff1.json index 58c80e8b971a2..1818efc0c0d97 100644 --- a/backend/.sqlx/query-69606859fe08d24f0306b866f9f50ad766d56a0d5aa8f0784f79aa5e211b00e4.json +++ b/backend/.sqlx/query-b1a9a433e577133869c067b2ce383fc6ce4e9df307feb5fd3edc0d1276d61ff1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO flow_conversation_message (conversation_id, message_type, content, job_id)\n VALUES ($1, $2, $3, $4)", + "query": "INSERT INTO flow_conversation_message (conversation_id, message_type, content, job_id, step_name, success)\n VALUES ($1, $2, $3, $4, $5, $6)", "describe": { "columns": [], "parameters": { @@ -12,16 +12,19 @@ "kind": { "Enum": [ "user", - "assistant" + "assistant", + "tool" ] } } }, "Text", - "Uuid" + "Uuid", + "Varchar", + "Bool" ] }, "nullable": [] }, - "hash": "69606859fe08d24f0306b866f9f50ad766d56a0d5aa8f0784f79aa5e211b00e4" + "hash": "b1a9a433e577133869c067b2ce383fc6ce4e9df307feb5fd3edc0d1276d61ff1" } diff --git a/backend/.sqlx/query-c67e81985093ff976f1326ff2254585e850f61a787a5b8f4a8d88f88016f1f2b.json b/backend/.sqlx/query-c67e81985093ff976f1326ff2254585e850f61a787a5b8f4a8d88f88016f1f2b.json new file mode 100644 index 0000000000000..04e253ce5498e --- /dev/null +++ b/backend/.sqlx/query-c67e81985093ff976f1326ff2254585e850f61a787a5b8f4a8d88f88016f1f2b.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT created_at FROM flow_conversation_message WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "c67e81985093ff976f1326ff2254585e850f61a787a5b8f4a8d88f88016f1f2b" +} diff --git a/backend/migrations/20251007123506_update_conversation_message_types.down.sql b/backend/migrations/20251007123506_update_conversation_message_types.down.sql new file mode 100644 index 0000000000000..1b6940791d8b0 --- /dev/null +++ b/backend/migrations/20251007123506_update_conversation_message_types.down.sql @@ -0,0 +1,3 @@ +-- Remove step_name and success columns +ALTER TABLE flow_conversation_message DROP COLUMN IF EXISTS step_name; +ALTER TABLE flow_conversation_message DROP COLUMN IF EXISTS success; \ No newline at end of file diff --git a/backend/migrations/20251007123506_update_conversation_message_types.up.sql b/backend/migrations/20251007123506_update_conversation_message_types.up.sql new file mode 100644 index 0000000000000..a261e83df30db --- /dev/null +++ b/backend/migrations/20251007123506_update_conversation_message_types.up.sql @@ -0,0 +1,8 @@ +-- Add up migration script here + +-- Extend MESSAGE_TYPE enum to include 'tool' +ALTER TYPE MESSAGE_TYPE ADD VALUE 'tool'; + +-- Add step_name and success columns to flow_conversation_message table +ALTER TABLE flow_conversation_message ADD COLUMN step_name VARCHAR(255); +ALTER TABLE flow_conversation_message ADD COLUMN success BOOLEAN DEFAULT TRUE NOT NULL; \ No newline at end of file diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index a4dd3d2f2e20c..51a816df3868a 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -6529,6 +6529,8 @@ paths: - flow_conversation parameters: - $ref: "#/components/parameters/WorkspaceId" + - $ref: "#/components/parameters/Page" + - $ref: "#/components/parameters/PerPage" - name: conversation_id description: conversation id in: path @@ -6536,8 +6538,13 @@ paths: schema: type: string format: uuid - - $ref: "#/components/parameters/Page" - - $ref: "#/components/parameters/PerPage" + - name: after_id + description: id to fetch only the messages after that id + in: query + required: false + schema: + type: string + format: uuid responses: "200": description: conversation messages @@ -14805,7 +14812,7 @@ components: description: The conversation this message belongs to message_type: type: string - enum: [user, assistant, system] + enum: [user, assistant, system, tool] description: Type of the message content: type: string @@ -14819,6 +14826,12 @@ components: type: string format: date-time description: When the message was created + step_name: + type: string + description: The step name that produced that message + success: + type: boolean + description: Whether the message is a success EndpointTool: type: object @@ -17332,7 +17345,7 @@ components: - OK - SKIP - FAIL - + DucklakeInstanceCatalogDbStatusLogs: type: object properties: @@ -17368,7 +17381,6 @@ components: description: Error message if the operation failed example: "Connection timeout" - NewSqsTrigger: type: object properties: diff --git a/backend/windmill-api/src/flow_conversations.rs b/backend/windmill-api/src/flow_conversations.rs index efe11c2045eca..6f046c47380eb 100644 --- a/backend/windmill-api/src/flow_conversations.rs +++ b/backend/windmill-api/src/flow_conversations.rs @@ -13,17 +13,10 @@ use crate::db::ApiAuthed; use windmill_common::{ db::UserDB, error::{JsonResult, Result}, + flow_conversations::MessageType, utils::{not_found_if_none, paginate, Pagination}, }; -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, sqlx::Type)] -#[sqlx(type_name = "MESSAGE_TYPE", rename_all = "lowercase")] -#[serde(rename_all = "lowercase")] -pub enum MessageType { - User, - Assistant, -} - pub fn workspaced_service() -> Router { Router::new() .route("/list", get(list_conversations)) @@ -50,11 +43,14 @@ pub struct FlowConversationMessage { pub content: String, pub job_id: Option, pub created_at: DateTime, + pub step_name: Option, + pub success: bool, } #[derive(Deserialize)] pub struct ListConversationsQuery { pub flow_path: Option, + pub after_id: Option, } async fn list_conversations( @@ -82,6 +78,15 @@ async fn list_conversations( if let Some(flow_path) = &query.flow_path { sqlb.and_where_eq("flow_path", "?".bind(flow_path)); } + if let Some(after_id) = &query.after_id { + let message_id_created_at = sqlx::query_scalar!( + "SELECT created_at FROM flow_conversation_message WHERE id = $1", + after_id + ) + .fetch_one(&mut *tx) + .await?; + sqlb.and_where_gt("created_at", "?".bind(&message_id_created_at.to_rfc3339())); + } sqlb.order_by("updated_at", true) .limit(per_page as i64) @@ -226,9 +231,9 @@ async fn list_messages( // Fetch messages for this conversation, oldest first, but reverse the order of the messages for easy rendering on the frontend let messages = sqlx::query_as!( FlowConversationMessage, - r#"SELECT id, conversation_id, message_type as "message_type: MessageType", content, job_id, created_at + r#"SELECT id, conversation_id, message_type as "message_type: MessageType", content, job_id, created_at, step_name, success FROM ( - SELECT id, conversation_id, message_type, content, job_id, created_at + SELECT id, conversation_id, message_type, content, job_id, created_at, step_name, success FROM flow_conversation_message WHERE conversation_id = $1 ORDER BY created_at DESC, CASE WHEN message_type = 'user' THEN 0 ELSE 1 END @@ -246,52 +251,3 @@ async fn list_messages( tx.commit().await?; Ok(Json(messages)) } - -// Helper function to create a message using an existing transaction -pub async fn create_message( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - conversation_id: Uuid, - message_type: MessageType, - content: &str, - job_id: Option, - workspace_id: &str, -) -> windmill_common::error::Result<()> { - // Verify the conversation exists and belongs to the user - let conversation_exists = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM flow_conversation WHERE id = $1 AND workspace_id = $2)", - conversation_id, - workspace_id - ) - .fetch_one(&mut **tx) - .await? - .unwrap_or(false); - - if !conversation_exists { - return Err(windmill_common::error::Error::NotFound(format!( - "Conversation not found or access denied: {}", - conversation_id - ))); - } - - // Insert the message - sqlx::query!( - "INSERT INTO flow_conversation_message (conversation_id, message_type, content, job_id) - VALUES ($1, $2, $3, $4)", - conversation_id, - message_type as MessageType, - content, - job_id - ) - .execute(&mut **tx) - .await?; - - // Update conversation updated_at timestamp - sqlx::query!( - "UPDATE flow_conversation SET updated_at = NOW() WHERE id = $1", - conversation_id - ) - .execute(&mut **tx) - .await?; - - Ok(()) -} diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 813e912d6e0ca..b2fa6978febd2 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -30,6 +30,7 @@ use windmill_common::auth::is_super_admin_email; use windmill_common::auth::TOKEN_PREFIX_LEN; use windmill_common::db::UserDbWithAuthed; use windmill_common::error::JsonResult; +use windmill_common::flow_conversations::add_message_to_conversation_tx; use windmill_common::flow_status::{JobResult, RestartedFrom}; use windmill_common::jobs::{ check_tag_available_for_workspace_internal, format_completed_job_result, format_result, @@ -102,7 +103,8 @@ use windmill_queue::{ PushArgsOwned, PushIsolationLevel, }; -use crate::flow_conversations::{self, MessageType}; +use crate::flow_conversations; +use windmill_common::flow_conversations::MessageType; pub fn workspaced_service() -> Router { let cors = CorsLayer::new() @@ -3943,7 +3945,6 @@ async fn handle_chat_conversation_messages( flow_path: &str, run_query: &RunJobQuery, user_message_raw: Option<&Box>, - uuid: Uuid, ) -> error::Result<()> { let memory_id = run_query.memory_id.ok_or_else(|| { windmill_common::error::Error::BadRequest( @@ -3958,10 +3959,12 @@ async fn handle_chat_conversation_messages( })?; // Deserialize the RawValue to get the actual string without quotes - let user_message: String = serde_json::from_str(user_message_raw.get()) - .map_err(|e| windmill_common::error::Error::BadRequest( - format!("Failed to deserialize user_message: {}", e) - ))?; + let user_message: String = serde_json::from_str(user_message_raw.get()).map_err(|e| { + windmill_common::error::Error::BadRequest(format!( + "Failed to deserialize user_message: {}", + e + )) + })?; // Create conversation with provided ID (or get existing one) flow_conversations::get_or_create_conversation_with_id( @@ -3975,24 +3978,14 @@ async fn handle_chat_conversation_messages( .await?; // Create user message - flow_conversations::create_message( + add_message_to_conversation_tx( tx, memory_id, - MessageType::User, + None, &user_message, - None, // No job_id for user message - w_id, - ) - .await?; - - // Create placeholder assistant message in the same transaction as the job - flow_conversations::create_message( - tx, - memory_id, - MessageType::Assistant, - "", // Empty content, will be updated when job completes - Some(uuid), // Associate with the job - w_id, + MessageType::User, + None, + true, ) .await?; @@ -4125,7 +4118,6 @@ pub async fn run_flow_by_path_inner( &flow_path.to_string(), &run_query, args.args.get("user_message"), - uuid, ) .await?; } @@ -5624,7 +5616,6 @@ pub async fn run_wait_result_flow_by_path_internal( &flow_path.to_string(), &run_query, args.args.get("user_message"), - uuid, ) .await?; } diff --git a/backend/windmill-common/src/flow_conversations.rs b/backend/windmill-common/src/flow_conversations.rs new file mode 100644 index 0000000000000..8826f85b94a2a --- /dev/null +++ b/backend/windmill-common/src/flow_conversations.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; +use sqlx; +use uuid::Uuid; + +use crate::error::Result; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, sqlx::Type)] +#[sqlx(type_name = "MESSAGE_TYPE", rename_all = "lowercase")] +#[serde(rename_all = "lowercase")] +pub enum MessageType { + User, + Assistant, + System, + Tool, +} + +/// Add a message to a conversation using an existing transaction +pub async fn add_message_to_conversation_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + conversation_id: Uuid, + job_id: Option, + content: &str, + message_type: MessageType, + step_name: Option<&str>, + success: bool, +) -> Result<()> { + // Insert the message + sqlx::query!( + "INSERT INTO flow_conversation_message (conversation_id, message_type, content, job_id, step_name, success) + VALUES ($1, $2, $3, $4, $5, $6)", + conversation_id, + message_type as MessageType, + content, + job_id, + step_name, + success + ) + .execute(&mut **tx) + .await?; + + // Update conversation updated_at timestamp + sqlx::query!( + "UPDATE flow_conversation SET updated_at = NOW() WHERE id = $1", + conversation_id + ) + .execute(&mut **tx) + .await?; + + Ok(()) +} diff --git a/backend/windmill-common/src/lib.rs b/backend/windmill-common/src/lib.rs index 3f2212d4dd1a3..c17ebc3503970 100644 --- a/backend/windmill-common/src/lib.rs +++ b/backend/windmill-common/src/lib.rs @@ -43,6 +43,7 @@ pub mod email_ee; pub mod email_oss; pub mod error; pub mod external_ip; +pub mod flow_conversations; pub mod flow_status; pub mod flows; pub mod global_settings; diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index d0055bbe3ac69..f83c067450c3a 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -33,6 +33,7 @@ use windmill_common::add_time; use windmill_common::auth::JobPerms; #[cfg(feature = "benchmark")] use windmill_common::bench::BenchmarkIter; +use windmill_common::flow_conversations::{add_message_to_conversation_tx, MessageType}; use windmill_common::jobs::{JobTriggerKind, EMAIL_ERROR_HANDLER_USER_EMAIL}; use windmill_common::utils::now_from_db; use windmill_common::worker::{Connection, SCRIPT_TOKEN_EXPIRY}; @@ -823,44 +824,79 @@ pub async fn add_completed_job( restart_job_if_perpetual(db, queued_job, &canceled_by).await?; - // Update conversation message if it's a flow and it's done (both success and error cases) + // Create assistant message if it's a flow and it's done, but only if last module is not an AI agent if !skipped && flow_is_done { let chat_input_enabled = queued_job.parse_chat_input_enabled(); - let value = serde_json::to_value(result.0) - .map_err(|e| Error::internal_err(format!("Failed to serialize result: {e}")))?; if chat_input_enabled.unwrap_or(false) { - let content = match value { - // If it's an Object with "output" key AND the output is a String, return it - serde_json::Value::Object(mut map) - if map.contains_key("output") - && matches!(map.get("output"), Some(serde_json::Value::String(_))) => - { - if let Some(serde_json::Value::String(s)) = map.remove("output") { - s - } else { - // prettify the whole result - serde_json::to_string_pretty(&map) - .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) - } - } - // Otherwise, if the whole value is a String, return it - serde_json::Value::String(s) => s, - // Otherwise, prettify the whole result - v => serde_json::to_string_pretty(&v) - .unwrap_or_else(|e| format!("Failed to serialize result: {e}")), - }; + // Get conversation_id from flow_status.memory_id + let flow_status = queued_job.parse_flow_status(); + let conversation_id = flow_status.and_then(|fs| fs.memory_id); + + if let Some(conversation_id) = conversation_id { + // get flow value + let parent_job = queued_job.parent_job.unwrap_or(queued_job.id); + let flow_value = sqlx::query!( + "SELECT f.value as \"value: Json\" FROM v2_job j JOIN flow_version f ON f.id = j.runnable_id WHERE j.id = $1 AND j.workspace_id = $2", + parent_job, + &queued_job.workspace_id + ) + .fetch_optional(db) + .await? + .map(|row| row.value.0); - // Update the assistant message - let _ = sqlx::query!( - "UPDATE flow_conversation_message - SET content = $1 - WHERE job_id = $2 - ", - content, - queued_job.id, - ) - .execute(db) - .await; + let last_module_is_ai_agent = flow_value + .as_ref() + .and_then(|flow_value| { + flow_value.modules.last().and_then(|m| m.get_value().ok()) + }) + .map(|v| matches!(v, FlowModuleValue::AIAgent { .. })) + .unwrap_or(false); + + // Only create assistant message if last module is NOT an AI agent, or there was an error + if !last_module_is_ai_agent || success == false { + let value = serde_json::to_value(result.0).map_err(|e| { + Error::internal_err(format!("Failed to serialize result: {e}")) + })?; + + let content = match value { + // If it's an Object with "output" key AND the output is a String, return it + serde_json::Value::Object(mut map) + if map.contains_key("output") + && matches!( + map.get("output"), + Some(serde_json::Value::String(_)) + ) => + { + if let Some(serde_json::Value::String(s)) = map.remove("output") { + s + } else { + // prettify the whole result + serde_json::to_string_pretty(&map) + .unwrap_or_else(|e| format!("Failed to serialize result: {e}")) + } + } + // Otherwise, if the whole value is a String, return it + serde_json::Value::String(s) => s, + // Otherwise, prettify the whole result + v => serde_json::to_string_pretty(&v) + .unwrap_or_else(|e| format!("Failed to serialize result: {e}")), + }; + + // Insert new assistant message + let mut tx = db.begin().await?; + add_message_to_conversation_tx( + &mut tx, + conversation_id, + Some(queued_job.id), + &content, + MessageType::Assistant, + None, + success, + ) + .await?; + tx.commit().await?; + } + } } } diff --git a/backend/windmill-worker/src/ai_executor.rs b/backend/windmill-worker/src/ai_executor.rs index 06d265e435ec3..0371497ce064d 100644 --- a/backend/windmill-worker/src/ai_executor.rs +++ b/backend/windmill-worker/src/ai_executor.rs @@ -12,8 +12,9 @@ use windmill_common::{ client::AuthedClient, db::DB, error::{self, to_anyhow, Error}, + flow_conversations::{add_message_to_conversation_tx, MessageType}, flow_status::AgentAction, - flows::{FlowModuleValue, Step}, + flows::{FlowModuleValue, FlowValue, Step}, get_latest_hash_for_path, jobs::JobKind, scripts::{get_full_hub_script_by_path, ScriptHash, ScriptLang}, @@ -99,18 +100,73 @@ pub async fn get_flow_job_runnable_and_raw_flow( Ok(job) } -/// Get memory_id from parent flow's flow_status -async fn get_memory_id_from_flow_status(db: &DB, parent_job: &Uuid) -> Result, Error> { - let result = sqlx::query_scalar!( - "SELECT (flow_status->>'memory_id')::uuid as memory_id - FROM v2_job_status +#[derive(Debug, Clone, Default)] +struct FlowChatSettings { + memory_id: Option, + chat_input_enabled: bool, +} + +/// Get chat settings (memory_id and chat_input_enabled) from root flow's flow_status +async fn get_flow_chat_settings(db: &DB, job: &MiniPulledJob) -> FlowChatSettings { + let root_job_id = job + .root_job + .or(job.flow_innermost_root_job) + .or(job.parent_job); + + let Some(root_job_id) = root_job_id else { + return FlowChatSettings::default(); + }; + + match sqlx::query!( + "SELECT + (flow_status->>'memory_id')::uuid as memory_id, + (flow_status->>'chat_input_enabled')::boolean as chat_input_enabled + FROM v2_job_status WHERE id = $1", - parent_job + root_job_id ) .fetch_optional(db) - .await?; + .await + { + Ok(Some(row)) => FlowChatSettings { + memory_id: row.memory_id, + chat_input_enabled: row.chat_input_enabled.unwrap_or(false), + }, + Ok(None) => FlowChatSettings::default(), + Err(e) => { + tracing::warn!( + "Failed to get chat settings from flow status for job {}: {}", + job.id, + e + ); + FlowChatSettings::default() + } + } +} - Ok(result.flatten()) +// Add message to conversation +async fn add_message_to_conversation( + db: &DB, + conversation_id: &Uuid, + job_id: &Uuid, + message_content: &str, + message_type: MessageType, + step_name: &Option, + success: bool, +) -> Result<(), Error> { + let mut tx = db.begin().await?; + add_message_to_conversation_tx( + &mut tx, + *conversation_id, + Some(*job_id), + &message_content, + message_type, + step_name.as_deref(), + success, + ) + .await?; + tx.commit().await?; + Ok(()) } pub async fn handle_ai_agent_job( @@ -296,6 +352,7 @@ pub async fn handle_ai_agent_job( parent_job, &args, &tools, + value, client, &mut inner_occupancy_metrics, job_completed_tx, @@ -409,6 +466,18 @@ async fn update_flow_status_module_with_actions_success( Ok(()) } +/// Get step name from the flow module (summary if exists, else id) +fn get_step_name_from_flow(flow_value: &FlowValue, flow_step_id: Option<&str>) -> Option { + let flow_step_id = flow_step_id?; + let module = flow_value.modules.iter().find(|m| m.id == flow_step_id)?; + Some( + module + .summary + .clone() + .unwrap_or_else(|| format!("AI Agent Step {}", module.id)), + ) +} + /// Check if the provider is Anthropic (either direct or through OpenRouter) fn is_anthropic_provider(provider: &ProviderWithResource) -> bool { let provider_is_anthropic = provider.kind.is_anthropic(); @@ -428,6 +497,7 @@ pub async fn run_agent( parent_job: &Uuid, args: &AIAgentArgs, tools: &[Tool], + flow_value: &FlowValue, // job execution context client: &AuthedClient, @@ -459,12 +529,15 @@ pub async fn run_agent( vec![] }; + let mut chat_settings: Option = None; + // Load previous messages from memory for text output mode (only if context length is set) if matches!(output_type, OutputType::Text) { if let Some(context_length) = args.messages_context_length.filter(|&n| n > 0) { if let Some(step_id) = job.flow_step_id.as_deref() { - // Get memory_id from flow_status - if let Ok(Some(memory_id)) = get_memory_id_from_flow_status(db, parent_job).await { + // Fetch chat settings from root flow + chat_settings = Some(get_flow_chat_settings(db, job).await); + if let Some(memory_id) = chat_settings.as_ref().and_then(|s| s.memory_id) { // Read messages from memory match read_from_memory(&job.workspace_id, memory_id, step_id).await { Ok(Some(loaded_messages)) => { @@ -659,6 +732,45 @@ pub async fn run_agent( .await?; content = Some(OpenAIContent::Text(response_content.clone())); + + // Add assistant message to conversation if chat_input_enabled + if chat_settings.is_none() { + chat_settings = Some(get_flow_chat_settings(db, job).await); + } + let chat_enabled = chat_settings + .as_ref() + .map(|s| s.chat_input_enabled) + .unwrap_or(false); + + if chat_enabled && !response_content.is_empty() { + if let Some(mid) = chat_settings.as_ref().and_then(|s| s.memory_id) + { + let agent_job_id = job.id; + let db_clone = db.clone(); + let message_content = response_content.clone(); + let step_name = get_step_name_from_flow( + flow_value, + job.flow_step_id.as_deref(), + ); + + // Spawn task because we do not need to wait for the result + tokio::spawn(async move { + if let Err(e) = add_message_to_conversation( + &db_clone, + &mid, + &agent_job_id, + &message_content, + MessageType::Assistant, + &step_name, + true, + ) + .await + { + tracing::warn!("Failed to add assistant message to conversation {}: {}", mid, e); + } + }); + } + } } if tool_calls.is_empty() { @@ -977,7 +1089,7 @@ pub async fn run_agent( let tool_result_event = StreamingEvent::ToolResult { call_id: tool_call.id.clone(), function_name: tool_call.function.name.clone(), - result: error_message, + result: error_message.clone(), success: false, }; stream_event_processor @@ -989,6 +1101,46 @@ pub async fn run_agent( db, parent_job, false, ) .await?; + + // Add tool message to conversation if chat_input_enabled (error case) + if chat_settings.is_none() { + chat_settings = + Some(get_flow_chat_settings(db, job).await); + } + let chat_enabled = chat_settings + .as_ref() + .map(|s| s.chat_input_enabled) + .unwrap_or(false); + + if chat_enabled { + if let Some(mid) = + chat_settings.as_ref().and_then(|s| s.memory_id) + { + let tool_job_id = job_id; + let db_clone = db.clone(); + let step_name = get_step_name_from_flow( + flow_value, + job.flow_step_id.as_deref(), + ); + + // Spawn task because we do not need to wait for the result + tokio::spawn(async move { + if let Err(e) = add_message_to_conversation( + &db_clone, + &mid, + &tool_job_id, + &error_message, + MessageType::Tool, + &step_name, + false, + ) + .await + { + tracing::warn!("Failed to add tool error message to conversation {}: {}", mid, e); + } + }); + } + } } Ok(success) => { let send_result = @@ -1055,6 +1207,52 @@ pub async fn run_agent( db, parent_job, success, ) .await?; + + // Add tool message to conversation if chat_input_enabled + if chat_settings.is_none() { + chat_settings = + Some(get_flow_chat_settings(db, job).await); + } + let chat_enabled = chat_settings + .as_ref() + .map(|s| s.chat_input_enabled) + .unwrap_or(false); + + if chat_enabled { + if let Some(mid) = + chat_settings.as_ref().and_then(|s| s.memory_id) + { + let tool_job_id = job_id; + let db_clone = db.clone(); + let tool_name = tool_call.function.name.clone(); + let step_name = get_step_name_from_flow( + flow_value, + job.flow_step_id.as_deref(), + ); + let content = if success { + format!("Used {} tool", tool_name) + } else { + format!("Error executing {}", tool_name) + }; + + // Spawn task because we do not need to wait for the result + tokio::spawn(async move { + if let Err(e) = add_message_to_conversation( + &db_clone, + &mid, + &tool_job_id, + &content, + MessageType::Tool, + &step_name, + success, + ) + .await + { + tracing::warn!("Failed to add tool message to conversation {}: {}", mid, e); + } + }); + } + } } } } else { @@ -1135,10 +1333,7 @@ pub async fn run_agent( let start_idx = all_messages.len().saturating_sub(context_length); let messages_to_persist = all_messages[start_idx..].to_vec(); - // Get memory_id from flow_status - if let Ok(Some(memory_id)) = - get_memory_id_from_flow_status(db, parent_job).await - { + if let Some(memory_id) = chat_settings.as_ref().and_then(|s| s.memory_id) { if let Err(e) = write_to_memory( &job.workspace_id, memory_id, diff --git a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte index 7218f3caff3e0..7b81e74604f08 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatInterface.svelte @@ -1,13 +1,10 @@ @@ -418,18 +76,18 @@
{#if deploymentInProgress} {/if} - {#if isLoadingMessages} + {#if manager.isLoadingMessages}
- {:else if messages.length === 0} + {:else if manager.messages.length === 0}

Start a conversation

@@ -437,9 +95,15 @@
{:else}
- {#each messages as message (message.id)} + {#each manager.messages as message (message.id)} {/each} + {#if manager.isWaitingForResponse} +
+ + Processing... +
+ {/if}
{/if}
@@ -451,15 +115,12 @@ class:opacity-50={deploymentInProgress} >
@@ -468,8 +129,8 @@ size="xs2" btnClasses="!rounded-full !p-1.5" startIcon={{ icon: ArrowUp }} - disabled={!inputMessage?.trim() || isLoading || deploymentInProgress} - on:click={sendMessage} + disabled={!manager.inputMessage?.trim() || manager.isLoading || deploymentInProgress} + on:click={() => manager.sendMessage()} iconOnly title={deploymentInProgress ? 'Deployment in progress' : 'Send message (Enter)'} /> diff --git a/frontend/src/lib/components/flows/conversations/FlowChatManager.svelte.ts b/frontend/src/lib/components/flows/conversations/FlowChatManager.svelte.ts new file mode 100644 index 0000000000000..6efb469ceb506 --- /dev/null +++ b/frontend/src/lib/components/flows/conversations/FlowChatManager.svelte.ts @@ -0,0 +1,499 @@ +import type { FlowConversationMessage } from '$lib/gen/types.gen' +import { FlowConversationService } from '$lib/gen' +import { sendUserToast } from '$lib/toast' +import { waitJob } from '$lib/components/waitJob' +import { tick } from 'svelte' + +export interface ChatMessage extends FlowConversationMessage { + loading?: boolean + streaming?: boolean +} + +export interface FlowChatManagerOptions { + onRunFlow: (userMessage: string, conversationId: string) => Promise + createConversation: (options: { clearMessages?: boolean }) => Promise + refreshConversations?: () => Promise + conversationId?: string + useStreaming?: boolean + path?: string +} + +class FlowChatManager { + // State + messages = $state([]) + inputMessage = $state('') + isLoading = $state(false) + isLoadingMessages = $state(false) + isWaitingForResponse = $state(false) + messagesContainer = $state(undefined) + inputElement = $state(undefined) + page = $state(1) + hasMoreMessages = $state(false) + loadingMoreMessages = $state(false) + currentEventSource = $state(undefined) + pollingInterval = $state | undefined>(undefined) + + // Private state + #conversationsCache = $state>({}) + #scrollTimeout: ReturnType | undefined = undefined + #perPage = 50 + #workspace = $state(undefined) + + // Options + #onRunFlow?: FlowChatManagerOptions['onRunFlow'] + #createConversation?: FlowChatManagerOptions['createConversation'] + #refreshConversations?: FlowChatManagerOptions['refreshConversations'] + #conversationId = $state(undefined) + #useStreaming = $state(false) + #path = $state(undefined) + + initialize(options: FlowChatManagerOptions, workspace: string) { + this.#onRunFlow = options.onRunFlow + this.#createConversation = options.createConversation + this.#refreshConversations = options.refreshConversations + this.#conversationId = options.conversationId + this.#useStreaming = options.useStreaming ?? false + this.#path = options.path + this.#workspace = workspace + } + + updateConversationId(conversationId: string | undefined) { + this.#conversationId = conversationId + } + + cleanup() { + if (this.currentEventSource) { + this.currentEventSource.close() + this.currentEventSource = undefined + } + this.stopPolling() + this.isLoading = false + this.isWaitingForResponse = false + } + + // Public methods for component to call + fillInputMessage(message: string) { + this.inputMessage = message + } + + focusInput() { + this.inputElement?.focus() + } + + clearMessages() { + this.messages = [] + this.inputMessage = '' + this.page = 1 + } + + async loadConversationMessages(conversationId?: string) { + this.page = 1 + await this.loadMessages(true, conversationId) + } + + // Message loading + private async loadMessages(reset: boolean, conversationId?: string) { + let conversationIdToUse = conversationId ?? this.#conversationId + if (!this.#workspace || !conversationIdToUse) return + + if (reset) { + if (this.#conversationsCache[conversationIdToUse]) { + this.messages = this.#conversationsCache[conversationIdToUse] + return + } + this.isLoadingMessages = true + } else { + this.loadingMoreMessages = true + } + + const pageToFetch = reset ? 1 : this.page + 1 + + try { + const previousScrollHeight = this.messagesContainer?.scrollHeight || 0 + + const response = await FlowConversationService.listConversationMessages({ + workspace: this.#workspace, + conversationId: conversationIdToUse, + page: pageToFetch, + perPage: this.#perPage + }) + + if (reset) { + this.#conversationsCache[conversationIdToUse] = response + this.messages = response + this.isLoadingMessages = false + await new Promise((resolve) => setTimeout(resolve, 100)) + this.scrollToBottom() + } else { + this.messages = [...response, ...this.messages] + this.page = pageToFetch + // Restore scroll position + await new Promise((resolve) => setTimeout(resolve, 50)) + if (this.messagesContainer) { + this.messagesContainer.scrollTop = + this.messagesContainer.scrollHeight - previousScrollHeight + } + } + + this.hasMoreMessages = response.length === this.#perPage + } catch (error) { + console.error('Failed to load messages:', error) + sendUserToast('Failed to load messages: ' + error) + } finally { + this.isLoadingMessages = false + this.loadingMoreMessages = false + } + } + + handleScroll = () => { + if (this.#scrollTimeout) clearTimeout(this.#scrollTimeout) + + this.#scrollTimeout = setTimeout(() => { + if (!this.messagesContainer || !this.hasMoreMessages || this.loadingMoreMessages) return + + if (this.messagesContainer.scrollTop <= 10) { + this.loadMessages(false) + } + }, 200) + } + + scrollToBottom() { + if (this.messagesContainer) { + this.messagesContainer.scrollTop = this.messagesContainer.scrollHeight + } + } + + private scrollToUserMessage(messageId: string) { + if (!this.messagesContainer) return + const messageElement = this.messagesContainer.querySelector(`[data-message-id="${messageId}"]`) + if (messageElement) { + messageElement.scrollIntoView({ behavior: 'smooth', block: 'start' }) + } + } + + // Polling + private async pollJobResult(jobId: string) { + try { + await waitJob(jobId) + } catch (error) { + console.error('Error polling job result:', error) + } finally { + // Do a final poll to get all messages from database + try { + if (this.#conversationId) { + await this.pollConversationMessages(this.#conversationId) + } + } catch {} + this.cleanup() + } + } + + private parseStreamDeltas(streamData: string): { + type: string + content: string + success: boolean + } { + let type = 'message' + const lines = streamData.trim().split('\n') + let content = '' + let success = true + for (const line of lines) { + if (!line.trim()) continue + try { + const parsed = JSON.parse(line) + if (parsed.type === 'tool_result') { + type = 'tool_result' + const toolName = parsed.function_name + success = parsed.success + content = success ? `Used ${toolName} tool` : `Failed to use ${toolName} tool` + } + if (parsed.type === 'token_delta' && parsed.content) { + type = 'message' + content += parsed.content + } + } catch (e) { + console.error('Failed to parse stream line:', line, e) + } + } + return { type, content, success } + } + + private async pollConversationMessages(conversationId: string, isNewConversation?: boolean) { + if (!this.#workspace) return + + try { + const lastId = this.messages[this.messages.length - 1].id + const response = await FlowConversationService.listConversationMessages({ + workspace: this.#workspace, + conversationId: conversationId, + page: 1, + perPage: 50, + afterId: lastId + }) + + if (isNewConversation) { + await this.#refreshConversations?.() + } + + const filteredResponse = response.filter((msg) => msg.message_type !== 'user') + + // Add any new intermediate messages not already present + for (const msg of filteredResponse) { + if (!this.messages.find((m) => m.id === msg.id)) { + this.messages = [...this.messages, msg] + } + } + + // Remove temporary messages + this.messages = this.messages.filter((msg) => !msg.id.startsWith('temp-')) + } catch (error) { + console.error('Polling error:', error) + } + } + + private startPolling(conversationId: string, isNewConversation?: boolean) { + if (this.pollingInterval) return + this.pollingInterval = setInterval(() => { + this.pollConversationMessages(conversationId, isNewConversation) + }, 500) // Poll every 0.5 seconds + setTimeout( + () => { + this.stopPolling() + }, + 2 * 60 * 1000 + ) // Stop polling after 2 minutes + } + + private stopPolling() { + if (this.pollingInterval) { + clearInterval(this.pollingInterval) + this.pollingInterval = undefined + } + } + + // Message sending + async sendMessage() { + if (!this.inputMessage.trim() || this.isLoading) return + + const isNewConversation = this.messages.length === 0 + + // Reset state for new message + this.stopPolling() + + // Generate a new conversation ID if we don't have one + let currentConversationId = this.#conversationId + if (!this.#conversationId && this.#createConversation) { + const newConversationId = await this.#createConversation({ clearMessages: false }) + currentConversationId = newConversationId + } + + if (!currentConversationId) { + console.error('No conversation ID found') + return + } + + // Invalidate the conversation cache + delete this.#conversationsCache[currentConversationId] + + const userMessage: ChatMessage = { + id: crypto.randomUUID(), + content: this.inputMessage.trim(), + created_at: new Date().toISOString(), + message_type: 'user', + conversation_id: currentConversationId + } + + this.messages = [...this.messages, userMessage] + const messageContent = this.inputMessage.trim() + this.inputMessage = '' + this.isLoading = true + this.isWaitingForResponse = true + + try { + await tick() + this.scrollToUserMessage(userMessage.id) + + if (this.#useStreaming && this.#path) { + await this.handleStreamingMessage(messageContent, currentConversationId, isNewConversation) + } else { + await this.handlePollingMessage(messageContent, currentConversationId, isNewConversation) + } + } catch (error) { + console.error('Error running flow:', error) + sendUserToast('Failed to run flow: ' + error, true) + } finally { + if (!this.#useStreaming) { + this.isLoading = false + } + } + + await tick() + this.focusInput() + } + + private async handleStreamingMessage( + messageContent: string, + currentConversationId: string, + isNewConversation: boolean + ) { + // Close any existing EventSource + if (this.currentEventSource) { + this.currentEventSource.close() + } + + // Track stream state for this message + let accumulatedContent = '' + let assistantMessageId = '' + let isCompleted = false + + try { + // Encode the payload as base64 + const payload = { user_message: messageContent } + const payloadBase64 = btoa(JSON.stringify(payload)) + + // Build the EventSource URL + const streamUrl = `/api/w/${this.#workspace}/jobs/run_and_stream/f/${this.#path}` + const url = new URL(streamUrl, window.location.origin) + url.searchParams.set('payload', payloadBase64) + url.searchParams.set('memory_id', currentConversationId) + url.searchParams.set('poll_delay_ms', '50') + + // Create EventSource connection + const eventSource = new EventSource(url.toString()) + this.currentEventSource = eventSource + + // start polling + this.startPolling(currentConversationId, isNewConversation) + + eventSource.onmessage = async (event) => { + try { + const data = JSON.parse(event.data) + + if (data.type === 'update') { + // Process new stream content + if (data.new_result_stream) { + // Stop polling since we are receiving last step streaming + this.stopPolling() + const { + type, + content: newContent, + success + } = this.parseStreamDeltas(data.new_result_stream) + accumulatedContent += newContent + if (accumulatedContent.length > 0 || type === 'tool_result') { + this.isWaitingForResponse = false + } + + // Create tool message if type is tool_result + if (type === 'tool_result') { + // set last message streaming to false + this.messages = this.messages.map((msg) => + msg.id === this.messages[this.messages.length - 1].id + ? { ...msg, streaming: false } + : msg + ) + + this.messages = [ + ...this.messages, + { + id: 'temp-' + crypto.randomUUID(), + content: newContent, + created_at: new Date().toISOString(), + message_type: 'tool', + conversation_id: currentConversationId, + job_id: '', + loading: false, + streaming: false, + success + } + ] + // Reset assistant message ID since we are creating a tool message + assistantMessageId = '' + accumulatedContent = '' + } + + // Create message on first content + else if ( + type === 'message' && + assistantMessageId.length === 0 && + accumulatedContent.length > 0 + ) { + assistantMessageId = 'temp-' + crypto.randomUUID() + this.messages = [ + ...this.messages, + { + id: assistantMessageId, + content: accumulatedContent, + created_at: new Date().toISOString(), + message_type: 'assistant', + conversation_id: currentConversationId, + job_id: '', + loading: false, + streaming: true + } + ] + } else { + // Update existing message + this.messages = this.messages.map((msg) => + msg.id === assistantMessageId ? { ...msg, content: accumulatedContent } : msg + ) + } + } + + // Handle completion + if (data.completed) { + isCompleted = true + // Do a final poll to get all messages from database + if (this.#conversationId) { + await this.pollConversationMessages(this.#conversationId) + } + this.cleanup() + } + } + } catch (error) { + console.error('Error processing stream event:', error) + } + } + + eventSource.onerror = (error) => { + if (isCompleted) return + console.error('EventSource error:', error) + sendUserToast('Stream error occurred', true) + this.cleanup() + } + } catch (error) { + console.error('Stream connection error:', error) + sendUserToast('Failed to connect to stream', true) + this.cleanup() + } + } + + private async handlePollingMessage( + messageContent: string, + currentConversationId: string, + isNewConversation: boolean + ) { + const jobId = await this.#onRunFlow?.(messageContent, currentConversationId) + if (!jobId) { + console.error('No jobId returned from onRunFlow') + return + } + + if (isNewConversation) { + await this.#refreshConversations?.() + } + + // Start polling for intermediate messages in non-streaming mode too + this.startPolling(currentConversationId) + this.pollJobResult(jobId) + } + + handleKeyDown = (event: KeyboardEvent) => { + if (event.key === 'Enter' && !event.shiftKey) { + event.preventDefault() + this.sendMessage() + } + } +} + +export const createFlowChatManager = () => new FlowChatManager() diff --git a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte index 9bb8172e2cf54..9a143de1c0792 100644 --- a/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte +++ b/frontend/src/lib/components/flows/conversations/FlowChatMessage.svelte @@ -1,13 +1,13 @@