|
| 1 | +# openapi作为bella-queue Worker - 设计文档 |
| 2 | + |
| 3 | +## 1. 概述 |
| 4 | + |
| 5 | +openapi作为 bella-queue Worker 的一个具体实现,通过充分利用渠道闲时余量来处理排队任务,基于实时容量指标动态调整任务消费,在不影响在线流量的前提下最大化渠道利用率。 |
| 6 | + |
| 7 | +### 目标 |
| 8 | +- 能够较准确评估渠道的实时负载,基于负载容量动态调整消费任务 |
| 9 | +## 2. 设计 |
| 10 | + |
| 11 | +### 主流程 |
| 12 | + |
| 13 | +WorkerManager 作为核心协调组件,负责管理 Worker 生命周期和任务执行流程: |
| 14 | + |
| 15 | +#### 2.1 流程概述 |
| 16 | + |
| 17 | +1. **Worker 初始化** |
| 18 | + - 定期(每10分钟)扫描可用渠道,刷新 Worker 列表(剔除不可用、非queue_mode为pull的渠道,新增符合条件的渠道) |
| 19 | + - 为每个渠道创建对应的 WorkerContext 实例,封装 Worker、CapacityCalculator 和 TaskProcessor |
| 20 | + |
| 21 | +2. **Worker 启动** |
| 22 | + - WorkerContext 启动 BackoffTask |
| 23 | + - BackoffTask 运行在单独的线程中,负责任务轮询和退避策略 |
| 24 | + |
| 25 | +3. **任务执行轮询循环** |
| 26 | + - **容量检查**:CapacityCalculator 基于历史429错误记录评估渠道最大容量 |
| 27 | + - **条件执行**:仅在余量大于一定比例时(默认70%)时调用 `worker.takeAndRun(take)` |
| 28 | + - **连续处理**:通过 do-while 循环持续获取任务(每次1个)直到队列为空 |
| 29 | + - **委托执行**:TaskProcessor 处理具体任务,转换协议格式并委托给 Channel 处理 |
| 30 | + - **退避策略**:BackoffTask 实现指数退避,成功时减少间隔,失败时增加间隔 |
| 31 | + |
| 32 | +#### 2.2 决策流程 |
| 33 | + |
| 34 | +``` |
| 35 | +┌─────────────────┐ |
| 36 | +│ BackoffTask │ |
| 37 | +│ 轮询循环 │ |
| 38 | +└─────────┬───────┘ |
| 39 | + │ |
| 40 | + ▼ |
| 41 | +┌─────────────────┐ ┌─────────────────┐ |
| 42 | +│ CapacityCalculator│─────▶│ 余量 > 70%? │ |
| 43 | +│ 评估渠道容量 │ └─────────┬───────┘ |
| 44 | +└─────────────────┘ │ |
| 45 | + │ |
| 46 | + ┌──────────────┼──────────────┐ |
| 47 | + │ 是 │ 否 │ |
| 48 | + ▼ ▼ |
| 49 | + ┌─────────────────┐ ┌─────────────────┐ |
| 50 | + │ 批量获取任务 │ │ 跳过本轮 │ |
| 51 | + │ takeAndRun() │ │ 退避等待 │ |
| 52 | + └─────────┬───────┘ └─────────────────┘ |
| 53 | + │ |
| 54 | + ▼ |
| 55 | + ┌─────────────────┐ |
| 56 | + │ TaskProcessor │ |
| 57 | + │ 协议转换+执行 │ |
| 58 | + └─────────┬───────┘ |
| 59 | + │ |
| 60 | + ▼ |
| 61 | + ┌─────────────────┐ |
| 62 | + │ 处理响应状态码 │ |
| 63 | + │ 200:完成 │ |
| 64 | + │ 429/503:重试 │ |
| 65 | + │ 其他:失败 │ |
| 66 | + └─────────┬───────┘ |
| 67 | + │ |
| 68 | + ▼ |
| 69 | + ┌─────────────────┐ |
| 70 | + │ 记录Metrics │ |
| 71 | + │ (429错误更新容量)│ |
| 72 | + └─────────────────┘ |
| 73 | +``` |
| 74 | + |
| 75 | +### 组件交互 |
| 76 | + |
| 77 | +``` |
| 78 | +┌─────────────────┐ |
| 79 | +│ WorkerManager │ ──── 每10分钟刷新Workers |
| 80 | +│ │ ──── 管理WorkerContext生命周期 |
| 81 | +└─────────┬───────┘ |
| 82 | + │ 创建和管理 |
| 83 | + ▼ |
| 84 | +┌──────────────────────────────┐ |
| 85 | +│ WorkerContext │ ──── 封装单个渠道的Worker组件 |
| 86 | +│ - Worker (任务消费) │ ──── 启动BackoffTask |
| 87 | +│ - CapacityCalculator(容量) │ ──── 协调各组件交互 |
| 88 | +│ - TaskProcessor (处理器) │ |
| 89 | +└──────────┬───────────────────┘ |
| 90 | + │ |
| 91 | + ▼ |
| 92 | +┌──────────────────────────────┐ |
| 93 | +│ BackoffTask │ ──── 退避策略轮询 |
| 94 | +│ │ ──── 容量检查 |
| 95 | +│ │ ──── 任务批量获取 |
| 96 | +└──────────┬───────────────────┘ |
| 97 | + │ |
| 98 | + ▼ |
| 99 | +┌──────────────────────────────┐ |
| 100 | +│ CapacityCalculator │ ──── 基于429历史数据 |
| 101 | +│ │ ──── EMA算法拟合容量 |
| 102 | +│ │ ──── 本地缓存(5分钟) |
| 103 | +└──────────┬───────────────────┘ |
| 104 | + │ |
| 105 | + │ Redis存储 |
| 106 | + ▼ |
| 107 | +┌──────────────────────────────┐ |
| 108 | +│ Redis (429历史记录) │ ──── rpm_429_history指标 |
| 109 | +│ │ ──── 时间戳+RPM+响应时间 |
| 110 | +│ │ ──── 支持多种拟合算法 |
| 111 | +└──────────────────────────────┘ |
| 112 | +``` |
| 113 | + |
| 114 | + |
| 115 | + |
| 116 | +### 基于429历史的渠道容量评估 |
| 117 | + |
| 118 | +通过收集渠道历史429限流错误时的RPM数据,使用拟合算法预测渠道的最大处理能力,为Worker任务消费决策提供依据。 |
| 119 | + |
| 120 | +#### 1. 容量计算核心组件 |
| 121 | + |
| 122 | +**CapacityCalculator** 负责渠道容量计算,主要包含: |
| 123 | +- **本地缓存**:5分钟缓存避免频繁计算 |
| 124 | +- **历史数据获取**:从Redis读取`rpm_429_history`指标 |
| 125 | +- **拟合算法**:支持可插拔的容量计算算法 |
| 126 | + |
| 127 | +#### 2. 429历史数据结构 |
| 128 | + |
| 129 | +系统在渠道出现429限流错误时记录关键指标到Redis: |
| 130 | + |
| 131 | +**存储Key**: `bella-openapi-channel-metrics:{channelCode}:rpm_429_history` |
| 132 | + |
| 133 | +**数据格式**: |
| 134 | +```json |
| 135 | +{ |
| 136 | + "timestamp": 1634567890000, |
| 137 | + "rpm": 485.0, |
| 138 | + "avg_response_time": 60000.0 |
| 139 | +} |
| 140 | +``` |
| 141 | + |
| 142 | +#### 3. EMA拟合算法实现 |
| 143 | + |
| 144 | +**EmaFittingAlgorithm** 作为默认容量计算算法: |
| 145 | + |
| 146 | +**算法特性**: |
| 147 | +- **指数移动平均**:默认α=0.3,平滑处理历史RPM数据 |
| 148 | +- **响应时间惩罚**:响应时间越长,容量估算越保守 |
| 149 | +- **向下取整**:确保容量估算不会过于乐观 |
| 150 | + |
| 151 | +#### 4. 容量评估示例 |
| 152 | + |
| 153 | +**场景1: 充足历史数据** |
| 154 | +``` |
| 155 | +历史429记录: |
| 156 | + [时间戳1: RPM=400, 响应时间=5s] |
| 157 | + [时间戳2: RPM=450, 响应时间=8s] |
| 158 | + [时间戳3: RPM=380, 响应时间=12s] |
| 159 | +
|
| 160 | +EMA计算: |
| 161 | + RPM_EMA = 0.3*380 + 0.7*(0.3*450 + 0.7*400) = 408 |
| 162 | + ResponseTime_EMA = 0.3*12000 + 0.7*(0.3*8000 + 0.7*5000) = 8600ms |
| 163 | + |
| 164 | +响应时间惩罚 = min(1.0, 10000/8600) = 1.0 |
| 165 | +最终容量 = floor(408 * 1.0 + 0.5) = 408 |
| 166 | +``` |
| 167 | + |
| 168 | +**场景2: 响应时间过长惩罚** |
| 169 | +``` |
| 170 | +历史429记录: |
| 171 | + [时间戳1: RPM=500, 响应时间=25s] |
| 172 | + |
| 173 | +EMA计算: |
| 174 | + RPM_EMA = 500 |
| 175 | + ResponseTime_EMA = 25000ms |
| 176 | + |
| 177 | +响应时间惩罚 = min(1.0, 10000/25000) = 0.4 |
| 178 | +最终容量 = floor(500 * 0.4 + 0.5) = 200 |
| 179 | +``` |
| 180 | + |
| 181 | +**场景3: 无历史数据** |
| 182 | +``` |
| 183 | +历史429记录: 空 |
| 184 | +返回容量: 取得当前rpm值 |
| 185 | +``` |
| 186 | + |
| 187 | +## 3. 退避策略详细设计 |
| 188 | + |
| 189 | +### 3.1 退避策略概述 |
| 190 | + |
| 191 | +BackoffTask 实现智能退避机制,根据任务获取成功/失败情况动态调整轮询间隔,避免无效轮询的同时确保及时处理新任务。 |
| 192 | + |
| 193 | +### 3.2 退避参数配置 |
| 194 | +#### 退避策略 |
| 195 | +- **有任务时**: 保持最小间隔(5ms)持续轮询,确保及时处理 |
| 196 | +- **无任务时**: 从5秒开始退避,按1.5倍增长到最大5分钟 |
| 197 | +- **最大等待后**: 达到5分钟后保持该间隔,直到再次获取到任务 |
| 198 | +- **任务恢复**: 一旦有任务,立即重置到最小间隔持续轮询 |
| 199 | + |
| 200 | +### 3.3 双重退避策略 |
| 201 | + |
| 202 | +BackoffTask 实现两种不同的退避策略,根据失败原因选择合适的等待机制: |
| 203 | + |
| 204 | +#### 容量不足退避(基于响应时间) |
| 205 | +- **触发条件**: `余量 <= 70%` |
| 206 | +- **等待时间**: 使用渠道平均响应时间 |
| 207 | + |
| 208 | +#### 任务队列空退避 |
| 209 | +- **触发条件**: 有余量但 `takeAndRun()` 返回 0 |
| 210 | +- **调整策略**: |
| 211 | + - 有任务时:立即重置到最小间隔(5ms)持续轮询 |
| 212 | + - 无任务时:从5秒开始按1.5倍增长到5分钟 |
| 213 | +- **最大等待**: 达到5分钟后保持该间隔不再增长 |
| 214 | +- **立即响应**: 一旦检测到任务,立即返回高频轮询模式 |
| 215 | + |
| 216 | +## 4. 计费逻辑集成 |
| 217 | + |
| 218 | +### 4.2 现有计费系统架构 |
| 219 | +#### 计费流程 |
| 220 | +``` |
| 221 | +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ |
| 222 | +│ TaskProcessor │───▶│ EndpointLogger │───▶│ Disruptor队列 │ |
| 223 | +│ 成功处理任务 │ │ logger.log() │ │ 异步事件处理 │ |
| 224 | +└─────────────────┘ └─────────────────┘ └─────────┬───────┘ |
| 225 | + │ |
| 226 | +┌─────────────────┐ ┌─────────────────┐ ┌─────────▼───────┐ |
| 227 | +│ 数据库更新 │◀───│ CostCounter │◀───│ CostLogHandler │ |
| 228 | +│ 原子性费用累计 │ │ 内存累计(1分钟) │ │ 计算具体费用 │ |
| 229 | +└─────────────────┘ └─────────────────┘ └─────────────────┘ |
| 230 | +``` |
| 231 | + |
| 232 | +### 4.3 Worker计费集成实现 |
| 233 | +在TaskProcessor的`executeTask`调用具体能力点的处理方法,拿到响应后,同步计费日志: |
| 234 | +```java |
| 235 | +EndpointProcessData processData = EndpointContext.getProcessData(); |
| 236 | +processData.setResponse(response); |
| 237 | +logger.log(processData); |
| 238 | +``` |
| 239 | +## 5. 如何防止在线(blocking, streaming)饿死 |
| 240 | +问题出现在有一段时间,无法知道系统的实际容量,或者容量估算不准确。 |
0 commit comments