-
Notifications
You must be signed in to change notification settings - Fork 8.5k
🌊 Add task service and client to manage background tasks #245725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8171ee6
608534b
8a5dd5e
52e562c
65816b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| import { types, type IndexStorageSettings, type IStorageClient } from '@kbn/storage-adapter'; | ||
| import type { PersistedTask } from './types'; | ||
|
|
||
| export const taskStorageSettings = { | ||
| name: '.kibana_streams_tasks', | ||
| schema: { | ||
| properties: { | ||
| id: types.keyword(), | ||
| type: types.keyword(), | ||
| status: types.keyword(), | ||
| stream: types.keyword(), | ||
| space: types.keyword(), | ||
| created_at: types.date(), | ||
| // Workaround for https://github.com/elastic/kibana/issues/245974 | ||
| task: types.object({ enabled: false }), | ||
| }, | ||
| }, | ||
| } satisfies IndexStorageSettings; | ||
| export type TaskStorageSettings = typeof taskStorageSettings; | ||
|
|
||
| export type TaskStorageClient = IStorageClient<TaskStorageSettings, PersistedTask>; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| import type { KibanaRequest, Logger } from '@kbn/core/server'; | ||
| import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; | ||
| import { isNotFoundError, isResponseError } from '@kbn/es-errors'; | ||
| import type { TaskStorageClient } from './storage'; | ||
| import type { PersistedTask, TaskParams } from './types'; | ||
|
|
||
| interface TaskRequest<TaskType, TParams extends {}> { | ||
| task: Omit<PersistedTask & { type: TaskType }, 'status' | 'created_at' | 'task'>; | ||
| params: TParams; | ||
| request: KibanaRequest; | ||
| } | ||
|
|
||
| export class TaskClient<TaskType extends string> { | ||
miltonhultgren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| constructor( | ||
| private readonly taskManagerStart: TaskManagerStartContract, | ||
| private readonly storageClient: TaskStorageClient, | ||
| private readonly logger: Logger | ||
| ) {} | ||
|
|
||
| public async get<TPayload extends {} = {}>(id: string): Promise<PersistedTask<TPayload>> { | ||
| try { | ||
| this.logger.debug(`Getting task ${id}`); | ||
|
|
||
| const response = await this.storageClient.get({ | ||
| id, | ||
| }); | ||
miltonhultgren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (!response._source) { | ||
miltonhultgren marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Should not happen | ||
| throw new Error(`Task ${id} has no source`); | ||
| } | ||
|
|
||
| return response._source as PersistedTask<TPayload>; | ||
| } catch (error) { | ||
| if (isNotFoundError(error)) { | ||
| return { | ||
| id, | ||
| status: 'not_started', | ||
| created_at: '', | ||
| space: '', | ||
| stream: '', | ||
| type: '', | ||
| }; | ||
| } | ||
|
|
||
| throw error; | ||
| } | ||
| } | ||
|
|
||
| public async schedule<TParams extends {} = {}>({ | ||
| task, | ||
| params, | ||
| request, | ||
| }: TaskRequest<TaskType, TParams>): Promise<PersistedTask> { | ||
| const taskDoc: PersistedTask = { | ||
| ...task, | ||
| status: 'in_progress', | ||
| created_at: new Date().toISOString(), | ||
| }; | ||
|
|
||
| try { | ||
| await this.taskManagerStart.schedule( | ||
| { | ||
| id: task.id, | ||
| taskType: task.type, | ||
| params: { | ||
| ...params, | ||
| _task: { | ||
| ...taskDoc, | ||
| }, | ||
| } satisfies TaskParams<TParams>, | ||
| state: {}, | ||
| scope: ['streams'], | ||
| priority: 1, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit but we could use Should we define a default priority and timeout at the TaskDefinition level, and allow this schedule function to override based on the payload type ? Alternatively should we create a task definition per payload type ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call out, I wanted to ping @elastic/response-ops on this. 👋🏼 We had some concerns here about how to set the priority for these tasks. But we're not sure how to set the priority so that they still get picked up in a reasonable time but don't take space for other more critical things. Is |
||
| }, | ||
| { | ||
| request, | ||
| } | ||
| ); | ||
|
|
||
| this.logger.debug(`Scheduled ${task.type} task (${task.id})`); | ||
| await this.update(taskDoc); | ||
| } catch (error) { | ||
| const isVersionConflict = | ||
| isResponseError(error) && error.message.includes('version conflict'); | ||
| if (!isVersionConflict) { | ||
| throw error; | ||
| } | ||
| } | ||
|
|
||
| return taskDoc; | ||
| } | ||
|
|
||
| public async update<TPayload extends {} = {}>( | ||
| task: PersistedTask<TPayload> | ||
| ): Promise<PersistedTask<TPayload>> { | ||
| this.logger.debug(`Updating task ${task.id}`); | ||
|
|
||
| await this.storageClient.index({ | ||
| id: task.id, | ||
| document: task, | ||
| // This might cause issues if there are many updates in a short time from multiple tasks running concurrently | ||
| refresh: true, | ||
| }); | ||
|
|
||
| return task; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; | ||
| import type { GetScopedClients } from '../../../routes/types'; | ||
|
|
||
| export interface TaskContext { | ||
| getScopedClients: GetScopedClients; | ||
| } | ||
|
|
||
| export function createTaskDefinitions(taskContext: TaskContext) { | ||
| return {} satisfies TaskDefinitionRegistry; | ||
| } | ||
|
|
||
| export type StreamsTaskType = keyof ReturnType<typeof createTaskDefinitions>; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| import type { | ||
| TaskManagerSetupContract, | ||
| TaskManagerStartContract, | ||
| } from '@kbn/task-manager-plugin/server'; | ||
| import type { CoreStart, Logger } from '@kbn/core/server'; | ||
| import { StorageIndexAdapter } from '@kbn/storage-adapter'; | ||
| import { TaskClient } from './task_client'; | ||
| import type { TaskStorageSettings } from './storage'; | ||
| import { taskStorageSettings } from './storage'; | ||
| import type { PersistedTask } from './types'; | ||
| import type { TaskContext } from './task_definitions'; | ||
| import { createTaskDefinitions, type StreamsTaskType } from './task_definitions'; | ||
|
|
||
| export class TaskService { | ||
| constructor(private readonly taskManagerSetup: TaskManagerSetupContract) {} | ||
|
|
||
| public registerTasks(taskContext: TaskContext) { | ||
| this.taskManagerSetup.registerTaskDefinitions(createTaskDefinitions(taskContext)); | ||
| } | ||
|
|
||
| public async getClient( | ||
| coreStart: CoreStart, | ||
| taskManagerStart: TaskManagerStartContract, | ||
| logger: Logger | ||
| ) { | ||
| const storageAdapter = new StorageIndexAdapter<TaskStorageSettings, PersistedTask>( | ||
| coreStart.elasticsearch.client.asInternalUser, | ||
| logger.get('task_client', 'storage'), | ||
| taskStorageSettings | ||
| ); | ||
|
|
||
| return new TaskClient<StreamsTaskType>( | ||
| taskManagerStart, | ||
| storageAdapter.getClient(), | ||
| logger.get('task_client') | ||
| ); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| interface PersistedTaskBase { | ||
| id: string; | ||
| type: string; | ||
| status: 'not_started' | 'in_progress' | 'completed' | 'failed'; | ||
| stream: string; | ||
| space: string; | ||
| created_at: string; | ||
| } | ||
|
|
||
| interface NotStartedTask extends PersistedTaskBase { | ||
| status: 'not_started'; | ||
| } | ||
| interface InProgressTask extends PersistedTaskBase { | ||
| status: 'in_progress'; | ||
| } | ||
| interface CompletedTask<TPayload extends {}> extends PersistedTaskBase { | ||
| status: 'completed'; | ||
| task: { | ||
| payload: TPayload; | ||
| }; | ||
| } | ||
| interface FailedTask extends PersistedTaskBase { | ||
| status: 'failed'; | ||
| task: { | ||
| error: string; | ||
| }; | ||
| } | ||
|
|
||
| export type PersistedTask<TPayload extends {} = {}> = | ||
| | NotStartedTask | ||
| | InProgressTask | ||
| | CompletedTask<TPayload> | ||
| | FailedTask; | ||
|
|
||
| export type TaskParams<TParams extends {} = {}> = TParams & { | ||
| _task: PersistedTask; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the
typeof workload, for example sig event/feature/description generation ? Should we create a union type for this ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is and I do create a type for it when registering the tasks but that happens after this type is created and happens dynamically. I'd say we can revisit this later