diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/storage.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/storage.ts new file mode 100644 index 0000000000000..220ba6208e0a0 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/storage.ts @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { types, type IndexStorageSettings, type IStorageClient } from '@kbn/storage-adapter'; +import type { PersistedTask } from './types'; + +export const taskStorageSettings = { + name: '.kibana_streams_tasks', + schema: { + properties: { + id: types.keyword(), + type: types.keyword(), + status: types.keyword(), + stream: types.keyword(), + space: types.keyword(), + created_at: types.date(), + // Workaround for https://github.com/elastic/kibana/issues/245974 + task: types.object({ enabled: false }), + }, + }, +} satisfies IndexStorageSettings; +export type TaskStorageSettings = typeof taskStorageSettings; + +export type TaskStorageClient = IStorageClient; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts new file mode 100644 index 0000000000000..0cea36bbe8544 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { KibanaRequest, Logger } from '@kbn/core/server'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { isNotFoundError, isResponseError } from '@kbn/es-errors'; +import type { TaskStorageClient } from './storage'; +import type { PersistedTask, TaskParams } from './types'; + +interface TaskRequest { + task: Omit; + params: TParams; + request: KibanaRequest; +} + +export class TaskClient { + constructor( + private readonly taskManagerStart: TaskManagerStartContract, + private readonly storageClient: TaskStorageClient, + private readonly logger: Logger + ) {} + + public async get(id: string): Promise> { + try { + this.logger.debug(`Getting task ${id}`); + + const response = await this.storageClient.get({ + id, + }); + + if (!response._source) { + // Should not happen + throw new Error(`Task ${id} has no source`); + } + + return response._source as PersistedTask; + } catch (error) { + if (isNotFoundError(error)) { + return { + id, + status: 'not_started', + created_at: '', + space: '', + stream: '', + type: '', + }; + } + + throw error; + } + } + + public async schedule({ + task, + params, + request, + }: TaskRequest): Promise { + const taskDoc: PersistedTask = { + ...task, + status: 'in_progress', + created_at: new Date().toISOString(), + }; + + try { + await this.taskManagerStart.schedule( + { + id: task.id, + taskType: task.type, + params: { + ...params, + _task: { + ...taskDoc, + }, + } satisfies TaskParams, + state: {}, + scope: ['streams'], + priority: 1, + }, + { + request, + } + ); + + this.logger.debug(`Scheduled ${task.type} task (${task.id})`); + await this.update(taskDoc); + } catch (error) { + const isVersionConflict = + isResponseError(error) && error.message.includes('version conflict'); + if (!isVersionConflict) { + throw error; + } + } + + return taskDoc; + } + + public async update( + task: PersistedTask + ): Promise> { + this.logger.debug(`Updating task ${task.id}`); + + await this.storageClient.index({ + id: task.id, + document: task, + // This might cause issues if there are many updates in a short time from multiple tasks running concurrently + refresh: true, + }); + + return task; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts new file mode 100644 index 0000000000000..4388c0cb0a4d4 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; +import type { GetScopedClients } from '../../../routes/types'; + +export interface TaskContext { + getScopedClients: GetScopedClients; +} + +export function createTaskDefinitions(taskContext: TaskContext) { + return {} satisfies TaskDefinitionRegistry; +} + +export type StreamsTaskType = keyof ReturnType; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_service.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_service.ts new file mode 100644 index 0000000000000..dc45dcc1f8e64 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_service.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { CoreStart, Logger } from '@kbn/core/server'; +import { StorageIndexAdapter } from '@kbn/storage-adapter'; +import { TaskClient } from './task_client'; +import type { TaskStorageSettings } from './storage'; +import { taskStorageSettings } from './storage'; +import type { PersistedTask } from './types'; +import type { TaskContext } from './task_definitions'; +import { createTaskDefinitions, type StreamsTaskType } from './task_definitions'; + +export class TaskService { + constructor(private readonly taskManagerSetup: TaskManagerSetupContract) {} + + public registerTasks(taskContext: TaskContext) { + this.taskManagerSetup.registerTaskDefinitions(createTaskDefinitions(taskContext)); + } + + public async getClient( + coreStart: CoreStart, + taskManagerStart: TaskManagerStartContract, + logger: Logger + ) { + const storageAdapter = new StorageIndexAdapter( + coreStart.elasticsearch.client.asInternalUser, + logger.get('task_client', 'storage'), + taskStorageSettings + ); + + return new TaskClient( + taskManagerStart, + storageAdapter.getClient(), + logger.get('task_client') + ); + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts new file mode 100644 index 0000000000000..dd48d53d0abff --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +interface PersistedTaskBase { + id: string; + type: string; + status: 'not_started' | 'in_progress' | 'completed' | 'failed'; + stream: string; + space: string; + created_at: string; +} + +interface NotStartedTask extends PersistedTaskBase { + status: 'not_started'; +} +interface InProgressTask extends PersistedTaskBase { + status: 'in_progress'; +} +interface CompletedTask extends PersistedTaskBase { + status: 'completed'; + task: { + payload: TPayload; + }; +} +interface FailedTask extends PersistedTaskBase { + status: 'failed'; + task: { + error: string; + }; +} + +export type PersistedTask = + | NotStartedTask + | InProgressTask + | CompletedTask + | FailedTask; + +export type TaskParams = TParams & { + _task: PersistedTask; +}; diff --git a/x-pack/platform/plugins/shared/streams/server/plugin.ts b/x-pack/platform/plugins/shared/streams/server/plugin.ts index d36898b146297..c5ae96f0d03e2 100644 --- a/x-pack/platform/plugins/shared/streams/server/plugin.ts +++ b/x-pack/platform/plugins/shared/streams/server/plugin.ts @@ -47,6 +47,7 @@ import { FeatureService } from './lib/streams/feature/feature_service'; import { ProcessorSuggestionsService } from './lib/streams/ingest_pipelines/processor_suggestions_service'; import { getDefaultFeatureRegistry } from './lib/streams/feature/feature_type_registry'; import { registerStreamsSavedObjects } from './lib/saved_objects/register_saved_objects'; +import { TaskService } from './lib/tasks/task_service'; // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface StreamsPluginSetup {} @@ -112,6 +113,72 @@ export class StreamsPlugin const featureService = new FeatureService(core, this.logger, getDefaultFeatureRegistry()); const contentService = new ContentService(core, this.logger); const queryService = new QueryService(core, this.logger); + const taskService = new TaskService(plugins.taskManager); + + const getScopedClients = async ({ + request, + }: { + request: KibanaRequest; + }): Promise => { + const [ + [coreStart, pluginsStart], + assetClient, + attachmentClient, + featureClient, + contentClient, + ] = await Promise.all([ + core.getStartServices(), + assetService.getClientWithRequest({ request }), + attachmentService.getClientWithRequest({ request }), + featureService.getClientWithRequest({ request }), + contentService.getClient(), + ]); + + const [queryClient, uiSettingsClient] = await Promise.all([ + queryService.getClientWithRequest({ + request, + assetClient, + }), + coreStart.uiSettings.asScopedToClient(coreStart.savedObjects.getScopedClient(request)), + ]); + + const streamsClient = await streamsService.getClientWithRequest({ + request, + assetClient, + attachmentClient, + queryClient, + featureClient, + }); + + const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request); + const soClient = coreStart.savedObjects.getScopedClient(request); + const inferenceClient = pluginsStart.inference.getClient({ request }); + const licensing = pluginsStart.licensing; + const fieldsMetadataClient = await pluginsStart.fieldsMetadata.getClient(request); + const taskClient = await taskService.getClient( + coreStart, + pluginsStart.taskManager, + this.logger + ); + + return { + scopedClusterClient, + soClient, + assetClient, + attachmentClient, + streamsClient, + featureClient, + inferenceClient, + contentClient, + queryClient, + fieldsMetadataClient, + licensing, + uiSettingsClient, + taskClient, + }; + }; + + taskService.registerTasks({ getScopedClients }); plugins.features.registerKibanaFeature({ id: STREAMS_FEATURE_ID, @@ -170,62 +237,7 @@ export class StreamsPlugin server: this.server, telemetry: this.ebtTelemetryService.getClient(), processorSuggestions: this.processorSuggestionsService, - getScopedClients: async ({ - request, - }: { - request: KibanaRequest; - }): Promise => { - const [ - [coreStart, pluginsStart], - assetClient, - attachmentClient, - featureClient, - contentClient, - ] = await Promise.all([ - core.getStartServices(), - assetService.getClientWithRequest({ request }), - attachmentService.getClientWithRequest({ request }), - featureService.getClientWithRequest({ request }), - contentService.getClient(), - ]); - - const [queryClient, uiSettingsClient] = await Promise.all([ - queryService.getClientWithRequest({ - request, - assetClient, - }), - coreStart.uiSettings.asScopedToClient(coreStart.savedObjects.getScopedClient(request)), - ]); - - const streamsClient = await streamsService.getClientWithRequest({ - request, - assetClient, - attachmentClient, - queryClient, - featureClient, - }); - - const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request); - const soClient = coreStart.savedObjects.getScopedClient(request); - const inferenceClient = pluginsStart.inference.getClient({ request }); - const licensing = pluginsStart.licensing; - const fieldsMetadataClient = await pluginsStart.fieldsMetadata.getClient(request); - - return { - scopedClusterClient, - soClient, - assetClient, - attachmentClient, - streamsClient, - featureClient, - inferenceClient, - contentClient, - queryClient, - fieldsMetadataClient, - licensing, - uiSettingsClient, - }; - }, + getScopedClients, }, core, logger: this.logger, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/types.ts b/x-pack/platform/plugins/shared/streams/server/routes/types.ts index fc2ec1d63cc3e..d746bfd271975 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/types.ts @@ -23,8 +23,10 @@ import type { EbtTelemetryClient } from '../lib/telemetry'; import type { StreamsServer } from '../types'; import type { FeatureClient } from '../lib/streams/feature/feature_client'; import type { ProcessorSuggestionsService } from '../lib/streams/ingest_pipelines/processor_suggestions_service'; +import type { TaskClient } from '../lib/tasks/task_client'; +import type { StreamsTaskType } from '../lib/tasks/task_definitions'; -type GetScopedClients = ({ +export type GetScopedClients = ({ request, }: { request: KibanaRequest; @@ -43,6 +45,7 @@ export interface RouteHandlerScopedClients { licensing: LicensingPluginStart; uiSettingsClient: IUiSettingsClient; fieldsMetadataClient: IFieldsMetadataClient; + taskClient: TaskClient; } export interface RouteDependencies {