@@ -7,7 +7,7 @@ import FunctionDescription from '@site/src/components/FunctionDescription';
7
7
8
8
<FunctionDescription description =" Introduced or updated: v1.2.738 " />
9
9
10
- CREATE TASK 语句用于定义新任务,该任务可按计划或基于 DAG(有向无环图)任务图执行指定 SQL 语句。
10
+ CREATE TASK 语句用于定义一个新任务(Task),该任务可按计划或基于有向无环图(DAG)的任务图执行指定的 SQL 语句。
11
11
12
12
** 注意:** 此功能仅在 Databend Cloud 中开箱即用。
13
13
27
27
< sql>
28
28
```
29
29
30
- | 参数 | 描述 |
31
- | -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
32
- | IF NOT EXISTS | 可选。如果指定,仅当同名任务不存在时才创建任务 |
33
- | name | 任务名称(必填) |
34
- | WAREHOUSE | 必需。指定任务使用的虚拟计算集群(Warehouse) |
35
- | SCHEDULE | 必需。定义任务运行计划,可按分钟/秒指定或使用 CRON 表达式及时区 |
36
- | SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。连续失败指定次数后自动挂起任务 |
37
- | AFTER | 列出当前任务启动前必须完成的任务 |
38
- | WHEN boolean_expr | 任务运行必须满足的条件 |
39
- | [ ERROR_INTEGRATION] ( ../16-notification/index.md ) | 可选。用于任务错误通知的通知集成名称,应用特定[ 任务错误负载] ( ./10-task-error-integration-payload.md ) |
40
- | COMMENT | 可选。任务注释或描述的字符串字面量 |
41
- | session_parameter | 可选。指定任务运行时使用的会话参数(必须位于所有其他参数之后) |
42
- | sql | 任务执行的 SQL 语句(单语句或脚本,必填) |
43
-
44
- ### 使用说明
45
-
46
- - 独立任务或 DAG(有向无环图)根任务必须定义计划,否则只能通过 EXECUTE TASK 手动执行
47
- - DAG 子任务不可指定计划
48
- - 创建任务后需执行 ALTER TASK … RESUME 才能按定义参数运行
49
- - WHEN 条件仅支持 ` <boolean_expression> ` 子集:
50
- - 支持在 SQL 表达式中使用 [ STREAM_STATUS] ( ../../../20-sql-functions/17-table-functions/stream-status.md ) 函数评估流是否含变更数据
51
- - 支持布尔运算符(AND/OR/NOT 等)
52
- - 支持数值/字符串/布尔类型转换
53
- - 支持比较运算符(等于/不等于/大于/小于等)
30
+ | 参数 | 描述 |
31
+ | ------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
32
+ | IF NOT EXISTS | 可选。若指定,仅当同名任务不存在时才创建任务。 |
33
+ | name | 任务名称,必填。 |
34
+ | WAREHOUSE | 必填。指定任务使用的虚拟计算集群(Virtual Warehouse)。 |
35
+ | SCHEDULE | 必填。定义任务运行计划,可按分钟指定,或使用 CRON 表达式与时区。 |
36
+ | SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。连续失败多少次后自动挂起任务。 |
37
+ | AFTER | 列出必须完成后才启动此任务的任务。 |
38
+ | WHEN boolean_expr | 任务运行前必须为真的条件。 |
39
+ | [ ERROR_INTEGRATION] ( ../16-notification/index.md ) | 可选。用于任务错误通知的通知集成(Notification Integration)名称,并应用特定的[ 任务错误负载] ( ./10-task-error-integration-payload.md ) 。 |
40
+ | COMMENT | 可选。作为任务注释或描述的字符串字面量。 |
41
+ | session_parameter | 可选。指定任务运行时的会话参数。注意,会话参数必须放在 CREATE TASK 语句中所有其他任务参数之后。 |
42
+ | sql | 任务将执行的 SQL 语句,可为单条语句或脚本,必填。 |
43
+
44
+ ### 使用须知
45
+
46
+ - 必须为独立任务或任务 DAG 中的根任务定义计划;否则,任务仅在手动执行 ` EXECUTE TASK ` 时运行。
47
+ - 不能为 DAG 中的子任务指定计划。
48
+ - 创建任务后,必须执行 ` ALTER TASK … RESUME ` ,任务才会按定义中的参数运行。
49
+ - WHEN 条件仅支持 ` <boolean_expression> ` 的子集。
50
+ 任务 WHEN 子句支持以下内容:
51
+
52
+ - SQL 表达式中支持 [ STREAM_STATUS] ( ../../../20-sql-functions/17-table-functions/stream-status.md ) 函数求值。该函数指示指定 Stream 是否包含变更跟踪数据。可在当前运行开始前评估指定 Stream 是否包含变更数据;若结果为 FALSE,则任务不运行。
53
+ - 布尔运算符,如 AND、OR、NOT 等。
54
+ - 数值、字符串与布尔类型之间的类型转换。
55
+ - 比较运算符,如等于、不等于、大于、小于等。
54
56
55
57
::: note
56
- 警告:任务中使用 STREAM_STATUS 时,引用流必须包含数据库名(如 ` STREAM_STATUS('mydb.stream_name') ` )
58
+ 警告:在任务中使用 STREAM_STATUS 时,引用 Stream 必须包含数据库名(例如 ` STREAM_STATUS('mydb.stream_name') ` )。
57
59
:::
58
60
59
- - 多个任务消费同一表流时会获取不同增量数据。当任务通过 DML 消费流数据时,流偏移量会推进,后续任务无法再消费相同数据。建议单任务消费单流,可为同表创建多流供不同任务使用
60
- - 任务执行不重试,每次串行执行。脚本 SQL 按顺序逐一执行,无并行处理,确保任务执行顺序和依赖关系
61
- - 基于间隔的任务严格遵循固定间隔:若当前任务超时,下一任务立即执行;若提前完成,则等待下一间隔触发 。例如 1 秒间隔任务:执行 1.5 秒则下一任务立即启动;执行 0.5 秒则等待至下一秒触发
62
- - 会话参数可在创建时指定,也可通过 ALTER TASK 修改:
61
+ - 多个任务从同一 Table Stream 消费变更数据时,会获取不同的增量。当某任务通过 DML 语句消费 Stream 中的变更数据后,Stream 会推进 Offset,变更数据将不再对后续任务可用。当前建议仅让一个任务消费同一 Stream 的变更数据;可为同一表创建多个 Stream,由不同任务分别消费。
62
+ - 任务每次执行不会重试;执行均为串行。脚本中的 SQL 逐一执行,无并行,确保任务执行顺序与依赖关系。
63
+ - 基于间隔的任务严格遵循固定间隔点。若当前任务执行时间超过间隔单位,则下一任务立即执行;否则,下一任务等待至下一间隔单位触发 。例如,若任务定义 1 秒间隔,而某次执行耗时 1.5 秒,则下一任务立即执行;若耗时 0.5 秒,则下一任务等待至下一 1 秒间隔开始。
64
+ - 创建任务时可指定会话参数,也可后续通过 ` ALTER TASK ` 修改,例如 :
63
65
``` sql
64
66
ALTER TASK simple_task SET
65
67
enable_query_result_cache = 1 ,
68
70
69
71
### Cron 表达式重要说明
70
72
71
- - ` SCHEDULE ` 的 cron 表达式必须包含 ** 6 个字段** :
72
- 1 . ** 秒** (0-59)
73
- 2 . ** 分钟** (0-59)
74
- 3 . ** 小时** (0-23)
75
- 4 . ** 日** (1-31)
76
- 5 . ** 月** (1-12 或 JAN-DEC)
77
- 6 . ** 星期** (0-6,0=周日 或 SUN-SAT)
73
+ - ` SCHEDULE ` 参数中的 cron 表达式必须** 恰好包含 6 个字段** 。
74
+ - 各字段含义如下:
75
+ 1 . ** 秒** (0–59)
76
+ 2 . ** 分钟** (0–59)
77
+ 3 . ** 小时** (0–23)
78
+ 4 . ** 日** (1–31)
79
+ 5 . ** 月** (1–12 或 JAN–DEC)
80
+ 6 . ** 星期** (0–6,0 表示星期日,或 SUN–SAT)
78
81
79
82
#### Cron 表达式示例:
80
83
81
- - ** 太平洋时间每天 9:00:00:**
84
+ - ** 太平洋时间每天上午 9:00:00:**
82
85
- ` USING CRON '0 0 9 * * *' 'America/Los_Angeles' `
83
86
84
87
- ** 每分钟:**
85
88
- ` USING CRON '0 * * * * *' 'UTC' `
86
- - 每分钟开始时执行
89
+ - 在每分钟开始时运行任务。
87
90
88
91
- ** 每小时第 15 分钟:**
89
92
- ` USING CRON '0 15 * * * *' 'UTC' `
90
- - 每小时过 15 分钟时执行
93
+ - 在每小时的第 15 分钟运行任务。
91
94
92
- - ** 每周一 12:00:00:**
95
+ - ** 每周一中午 12:00:00:**
93
96
- ` USING CRON '0 0 12 * * 1' 'UTC' `
94
- - 每周一中午执行
97
+ - 在每周一中午运行任务。
95
98
96
- - ** 每月首日午夜 :**
99
+ - ** 每月第一天午夜 :**
97
100
- ` USING CRON '0 0 0 1 * *' 'UTC' `
98
- - 每月第一天午夜执行
101
+ - 在每月第一天的午夜运行任务。
99
102
100
- - ** 工作日 8:30:00:**
103
+ - ** 每个工作日上午 8:30:00:**
101
104
- ` USING CRON '0 30 8 * * 1-5' 'UTC' `
102
- - 周一至周五 8:30 执行
105
+ - 在周一至周五上午 8:30 运行任务。
103
106
104
107
## 使用示例
105
108
@@ -110,11 +113,11 @@ CREATE TASK my_daily_task
110
113
WAREHOUSE = ' compute_wh'
111
114
SCHEDULE = USING CRON ' 0 0 9 * * *' ' America/Los_Angeles'
112
115
COMMENT = ' Daily summary task'
113
- AS
116
+ AS
114
117
INSERT INTO summary_table SELECT * FROM source_table;
115
118
```
116
119
117
- 此示例创建任务 ` my_daily_task ` ,使用 ** compute_wh** 计算集群(Warehouse)将 source_table 数据插入 summary_table。通过 ** CRON 表达式** 设定 ** 每天太平洋时间 9:00 ** 执行。
120
+ 本例创建名为 ` my_daily_task ` 的任务(Task)。它使用 ** compute_wh** 计算集群(Warehouse)运行 SQL,将数据从 source_table 插入 summary_table,并按 ** CRON 表达式** 于 ** 太平洋时间每天上午 9 点 ** 执行。
118
121
119
122
### 自动挂起
120
123
@@ -124,10 +127,10 @@ CREATE TASK IF NOT EXISTS mytask
124
127
SCHEDULE = 2 MINUTE
125
128
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
126
129
AS
127
- INSERT INTO compaction_test .test VALUES ((1 ));
130
+ INSERT INTO compaction_test .test VALUES ((1 ));
128
131
```
129
132
130
- 此示例创建任务 ` mytask ` (不存在时),分配至 ** system** 计算集群(Warehouse),** 每 2 分钟** 运行。 若** 连续失败 3 次** 则 ** 自动挂起** ,向 compaction_test.test 表插入数据 。
133
+ 本例创建名为 ` mytask ` 的任务(Task)(若不存在)。该任务分配至 ** system** 计算集群(Warehouse),计划 ** 每 2 分钟** 运行一次, 若** 连续失败 3 次** 将 ** 自动挂起** ,并对 compaction_test.test 表执行 INSERT 。
131
134
132
135
### 秒级调度
133
136
@@ -136,38 +139,40 @@ CREATE TASK IF NOT EXISTS daily_sales_summary
136
139
WAREHOUSE = ' analytics'
137
140
SCHEDULE = 30 SECOND
138
141
AS
139
- SELECT sales_date, SUM (amount) AS daily_total
140
- FROM sales_data
141
- GROUP BY sales_date;
142
+ SELECT sales_date, SUM (amount) AS daily_total
143
+ FROM sales_data
144
+ GROUP BY sales_date;
142
145
```
143
146
144
- 此示例创建 ** 秒级调度** 任务 ` daily_sales_summary ` , ** 每 30 秒** 运行。使用 ** analytics** 计算集群(Warehouse)聚合 sales_data 表数据生成每日销售摘要 。
147
+ 本例创建名为 ` daily_sales_summary ` 的任务(Task),具备 ** 秒级调度** ,计划 ** 每 30 秒** 运行一次。它使用 ** analytics** 计算集群(Warehouse), 聚合 sales_data 表数据计算每日销售汇总 。
145
148
146
149
### 任务依赖
147
150
148
151
``` sql
149
152
CREATE TASK IF NOT EXISTS process_orders
150
153
WAREHOUSE = ' etl'
151
154
AFTER task1, task2
152
- ASINSERT INTO data_warehouse . orders
153
- SELECT * FROM staging .orders ;
155
+ AS
156
+ INSERT INTO data_warehouse . orders SELECT * FROM staging .orders ;
154
157
```
155
158
156
- 此示例创建任务 ` process_orders ` ,在 ** task1** 和 ** task2** ** 成功完成后** 运行。通过 ** etl** 计算集群(Warehouse)将暂存区数据迁移至数据仓库,建立 ** DAG(有向无环图)依赖关系** 。
159
+ 本例创建名为 ` process_orders ` 的任务(Task),定义为在 ** task1** 与 ** task2** ** 成功完成后** 运行,用于在任务 ** DAG** 中建立** 依赖关系** 。它使用 ** etl** 计算集群(Warehouse),将数据从 Staging Area 传输至 Data Warehouse。
160
+
161
+ > 提示:使用 AFTER 参数时无需设置 SCHEDULE 参数。
157
162
158
163
### 条件执行
159
164
160
165
``` sql
161
166
CREATE TASK IF NOT EXISTS hourly_data_cleanup
162
167
WAREHOUSE = ' maintenance'
163
- SCHEDULE = ' 0 0 * * * *'
168
+ SCHEDULE = USING CRON ' 0 0 9 * * *' ' America/Los_Angeles '
164
169
WHEN STREAM_STATUS(' db1.change_stream' ) = TRUE
165
170
AS
166
- DELETE FROM archived_data
167
- WHERE archived_date < DATEADD(HOUR, - 24 , CURRENT_TIMESTAMP ());
171
+ DELETE FROM archived_data
172
+ WHERE archived_date < DATEADD(HOUR, - 24 , CURRENT_TIMESTAMP ());
168
173
```
169
174
170
- 此示例创建任务 ` hourly_data_cleanup ` ,使用 ** maintenance** 计算集群(Warehouse)** 每小时** 清理 archived_data 表中 24 小时前数据。仅当 ** STREAM_STATUS** 检测到 ` db1.change_stream ` 含变更数据时执行 。
175
+ 本例创建名为 ` hourly_data_cleanup ` 的任务(Task)。它使用 ** maintenance** 计算集群(Warehouse),计划 ** 每小时** 运行,删除 archived_data 表中 24 小时前的数据,并仅在 ** STREAM_STATUS** 函数确认 ` db1.change_stream ` 包含变更数据时运行 。
171
176
172
177
### 错误集成
173
178
@@ -177,15 +182,15 @@ CREATE TASK IF NOT EXISTS mytask
177
182
SCHEDULE = 30 SECOND
178
183
ERROR_INTEGRATION = ' myerror'
179
184
AS
180
- BEGIN
185
+ BEGIN
181
186
BEGIN ;
182
187
INSERT INTO mytable(ts) VALUES (CURRENT_TIMESTAMP );
183
188
DELETE FROM mytable WHERE ts < DATEADD(MINUTE, - 5 , CURRENT_TIMESTAMP ());
184
189
COMMIT ;
185
- END;
190
+ END;
186
191
```
187
192
188
- 此示例创建任务 ` mytask ` ,使用 ** mywh** 计算集群(Warehouse)** 每 30 秒** 执行含 INSERT/ DELETE 的 ** BEGIN 块** 。失败时触发 ** myerror** ** 错误集成** 。
193
+ 本例创建名为 ` mytask ` 的任务(Task)。它使用 ** mywh** 计算集群(Warehouse),计划 ** 每 30 秒** 运行一次,执行包含 INSERT 与 DELETE 语句的 ** BEGIN 块** ,并在两条语句执行后提交事务。任务失败时将触发名为 ** myerror** 的 ** 错误集成(Error Integration) ** 。
189
194
190
195
### 会话参数
191
196
@@ -197,10 +202,10 @@ CREATE TASK IF NOT EXISTS cache_enabled_task
197
202
enable_query_result_cache = 1 ,
198
203
query_result_cache_min_execute_secs = 5
199
204
AS
200
- SELECT SUM (amount) AS total_sales
201
- FROM sales_data
202
- WHERE transaction_date >= DATEADD(DAY, - 7 , CURRENT_DATE ())
203
- GROUP BY product_category;
205
+ SELECT SUM (amount) AS total_sales
206
+ FROM sales_data
207
+ WHERE transaction_date >= DATEADD(DAY, - 7 , CURRENT_DATE ())
208
+ GROUP BY product_category;
204
209
```
205
210
206
- 此示例创建任务 ` cache_enabled_task ` ,使用 ** analytics ** 计算集群(Warehouse )** 每 5 分钟** 生成销售汇总。通过 ** 会话参数 ** ** ` enable_query_result_cache = 1 ` ** 和 ** ` query_result_cache_min_execute_secs = 5 ` ** 为执行超 5 秒的查询启用结果缓存, ** 提升重复执行性能 ** 。
211
+ 本例创建名为 ` cache_enabled_task ` 的任务(Task),并启用查询结果缓存的 ** 会话参数(Session Parameter )** 。任务计划 ** 每 5 分钟** 运行,使用 ** analytics ** 计算集群(Warehouse)。会话参数(Session Parameter) ** ` enable_query_result_cache = 1 ` ** 与 ** ` query_result_cache_min_execute_secs = 5 ` ** 置于 ** 所有其他任务参数之后 ** ,为执行时间 ≥ 5 秒的查询启用结果缓存。若底层数据未变,可提升相同任务后续执行的性能 。
0 commit comments