Skip to content

Commit 7eaec54

Browse files
muverystrongfengyizhu
authored andcommitted
feat: openapi as worker
1 parent 9837152 commit 7eaec54

File tree

13 files changed

+802
-54
lines changed

13 files changed

+802
-54
lines changed

api/sdk/src/main/java/com/ke/bella/openapi/metadata/Condition.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import lombok.EqualsAndHashCode;
99
import lombok.NoArgsConstructor;
1010

11+
import java.util.HashSet;
1112
import java.util.List;
1213
import java.util.Set;
1314

@@ -76,6 +77,7 @@ public static class ChannelCondition extends PageCondition {
7677
private String ownerType;
7778
private String ownerCode;
7879
private String queueName;
80+
private HashSet<Integer> queueModes;
7981
}
8082

8183
@NoArgsConstructor

api/server/src/main/java/com/ke/bella/openapi/TaskExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.concurrent.Executors;
66
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.ThreadFactory;
8+
import java.util.concurrent.TimeUnit;
89
import java.util.concurrent.atomic.AtomicInteger;
910

1011
public class TaskExecutor {
@@ -17,6 +18,10 @@ public static CompletableFuture<Void> submit(Runnable r) {
1718
return CompletableFuture.runAsync(r, executor);
1819
}
1920

21+
public static void scheduleAtFixedRate(Runnable r, int period) {
22+
executor.scheduleAtFixedRate(r, 0, period, TimeUnit.SECONDS);
23+
}
24+
2025
public static class NamedThreadFactory implements ThreadFactory {
2126
private final String prefix;
2227
private final AtomicInteger threadNumber = new AtomicInteger(1);

api/server/src/main/java/com/ke/bella/openapi/db/repo/ChannelRepo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ private SelectSeekStep1<ChannelRecord, Long> constructSql(Condition.ChannelCondi
6464
.and(StringUtils.isEmpty(op.getOwnerType()) ? DSL.noCondition() : CHANNEL.OWNER_TYPE.eq(op.getOwnerType()))
6565
.and(StringUtils.isEmpty(op.getOwnerCode()) ? DSL.noCondition() : CHANNEL.OWNER_CODE.eq(op.getOwnerCode()))
6666
.and(StringUtils.isEmpty(op.getQueueName()) ? DSL.noCondition() : CHANNEL.QUEUE_NAME.eq(op.getQueueName()))
67+
.and(CollectionUtils.isEmpty(op.getQueueModes()) ? DSL.noCondition() : CHANNEL.QUEUE_MODE.in(op.getQueueModes()))
6768
.orderBy(CHANNEL.ID.desc());
6869
}
6970

api/server/src/main/java/com/ke/bella/openapi/protocol/limiter/LimiterManager.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.google.common.collect.Lists;
44
import com.ke.bella.openapi.EndpointProcessData;
5-
65
import com.ke.bella.openapi.script.LuaScriptExecutor;
76
import com.ke.bella.openapi.script.ScriptType;
87
import com.ke.bella.openapi.utils.DateTimeUtils;
@@ -64,16 +63,15 @@ public void incrementRequestCountPerMinute(String akCode, String entityCode, Str
6463
}
6564
}
6665

67-
6866
public Long getRequestCountPerMinute(String akCode, String entityCode) {
6967
String countKey = String.format(RPM_COUNT_KEY_FORMAT, entityCode, akCode);
7068
Object count = redisson.getBucket(countKey).get();
7169
return count != null ? Long.parseLong(count.toString()) : 0L;
7270
}
73-
71+
7472
public void incrementConcurrentCount(String akCode, String entityCode) {
7573
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
76-
List<Object> keys = Lists.newArrayList(concurrentKey);
74+
List<Object> keys = Lists.newArrayList(concurrentKey, entityCode);
7775
List<Object> params = new ArrayList<>();
7876
params.add("INCR");
7977
try {
@@ -82,10 +80,10 @@ public void incrementConcurrentCount(String akCode, String entityCode) {
8280
log.warn(e.getMessage(), e);
8381
}
8482
}
85-
83+
8684
public void decrementConcurrentCount(String akCode, String entityCode) {
8785
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
88-
List<Object> keys = Lists.newArrayList(concurrentKey);
86+
List<Object> keys = Lists.newArrayList(concurrentKey, entityCode);
8987
List<Object> params = new ArrayList<>();
9088
params.add("DECR");
9189
try {
@@ -94,7 +92,7 @@ public void decrementConcurrentCount(String akCode, String entityCode) {
9492
throw new RuntimeException(e);
9593
}
9694
}
97-
95+
9896
public Long getCurrentConcurrentCount(String akCode, String entityCode) {
9997
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
10098
Object count = redisson.getBucket(concurrentKey).get();

api/server/src/main/java/com/ke/bella/openapi/service/ChannelService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.alicp.jetcache.anno.CacheType;
66
import com.alicp.jetcache.anno.Cached;
77
import com.alicp.jetcache.template.QuickConfig;
8+
import com.google.common.collect.Sets;
89
import com.ke.bella.openapi.db.repo.ChannelRepo;
910
import com.ke.bella.openapi.db.repo.Page;
1011
import com.ke.bella.openapi.metadata.Channel;
@@ -182,6 +183,13 @@ public List<ChannelDB> listActives(String entityType, String entityCode) {
182183
return listActivesWithDb(entityType, entityCode);
183184
}
184185

186+
public List<ChannelDB> listAllWorkerChannels() {
187+
return listByCondition(Condition.ChannelCondition.builder()
188+
.status(ACTIVE)
189+
.queueModes(Sets.newHashSet(QueueMode.PULL.getCode(), QueueMode.BOTH.getCode()))
190+
.build());
191+
}
192+
185193
public List<String> listSuppliers() {
186194
return channelRepo.listSuppliers();
187195
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package com.ke.bella.openapi.worker;
2+
3+
import com.ke.bella.openapi.protocol.limiter.LimiterManager;
4+
import com.ke.bella.openapi.script.LuaScriptExecutor;
5+
import com.ke.bella.openapi.script.ScriptType;
6+
import com.ke.bella.openapi.tables.pojos.ChannelDB;
7+
import com.ke.bella.openapi.utils.DateTimeUtils;
8+
import com.ke.bella.openapi.utils.JacksonUtils;
9+
import lombok.AllArgsConstructor;
10+
import lombok.Data;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.commons.collections4.MapUtils;
13+
import org.redisson.api.RedissonClient;
14+
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.Objects;
19+
import java.util.stream.Collectors;
20+
21+
@Slf4j
22+
@SuppressWarnings("all")
23+
public class CapacityCalculator {
24+
25+
private String channelCode;
26+
private ChannelDB channel;
27+
private CapacityFittingAlgorithm fittingAlgorithm;
28+
private RedissonClient redissonClient;
29+
private LimiterManager limiterManager;
30+
private LuaScriptExecutor luaScriptExecutor;
31+
32+
private volatile double cachedCapacity = -1.0;
33+
private volatile long cacheTimestamp = 0;
34+
private volatile long maxFinishRpm = 0;
35+
private static final long CACHE_DURATION_MS = 5 * 60 * 1000;
36+
private static final String RPM_429_HISTORY_METRIC = "rpm_429_history";
37+
38+
public CapacityCalculator(ChannelDB channelDB, RedissonClient redissonClient) {
39+
this.channel = channelDB;
40+
this.channelCode = channelDB.getChannelCode();
41+
this.fittingAlgorithm = new EmaFittingAlgorithm(0.3);
42+
this.redissonClient = redissonClient;
43+
}
44+
45+
public CapacityCalculator(ChannelDB channelDB, RedissonClient redissonClient, LuaScriptExecutor luaScriptExecutor) {
46+
this.channel = channelDB;
47+
this.channelCode = channelDB.getChannelCode();
48+
this.fittingAlgorithm = new EmaFittingAlgorithm(0.3);
49+
this.redissonClient = redissonClient;
50+
this.luaScriptExecutor = luaScriptExecutor;
51+
}
52+
53+
public double getRemainingCapacity() {
54+
double capacity = getCapacity();
55+
if(capacity == 0) {
56+
capacity = 0.7 * getCurrentMaxRpm();
57+
}
58+
if(capacity == 0) {
59+
return 1.0;
60+
}
61+
62+
long requestCapacity = getCurrentRequests() + getCompletedRpm();
63+
double remainingCapacity = 1.0 - (requestCapacity / capacity);
64+
return Math.max(0.0, Math.min(1.0, remainingCapacity));
65+
}
66+
67+
private long getCurrentMaxRpm() {
68+
long currentRpm = getCompletedRpm();
69+
70+
if(currentRpm > maxFinishRpm) {
71+
maxFinishRpm = currentRpm;
72+
}
73+
74+
return Math.max(currentRpm, maxFinishRpm);
75+
}
76+
77+
private long getCompletedRpm() {
78+
try {
79+
List<Object> keys = Arrays.asList(channelCode);
80+
List<Object> args = Arrays.asList(DateTimeUtils.getCurrentSeconds());
81+
Object result = luaScriptExecutor.execute("/get_completed", ScriptType.metrics, keys, args);
82+
83+
if(result != null) {
84+
return Long.parseLong(result.toString());
85+
}
86+
} catch (Exception e) {
87+
log.warn("Failed to get completed RPM for channel {}: {}", channelCode, e.getMessage());
88+
}
89+
return 0;
90+
}
91+
92+
private long getCurrentRequests() {
93+
String concurrentKey = "bella-openapi-channel-concurrent:" + channel.getEntityCode();
94+
Object count = redissonClient.getBucket(concurrentKey).get();
95+
return count != null ? Long.parseLong(count.toString()) : 0L;
96+
}
97+
98+
public double getCapacity() {
99+
long currentTime = System.currentTimeMillis();
100+
101+
if(cachedCapacity >= 0 && (currentTime - cacheTimestamp) < CACHE_DURATION_MS) {
102+
return cachedCapacity;
103+
}
104+
105+
double capacity = computeCapacity();
106+
cachedCapacity = capacity;
107+
cacheTimestamp = currentTime;
108+
return capacity;
109+
}
110+
111+
private double computeCapacity() {
112+
String historyKey = "bella-openapi-channel-metrics:" + channelCode + ":" + RPM_429_HISTORY_METRIC;
113+
List<MetricsPoint> dataPoints = redissonClient.getList(historyKey).range(0, -1)
114+
.stream()
115+
.map(this::parseMetrics)
116+
.filter(Objects::nonNull)
117+
.collect(Collectors.toList());
118+
119+
if(dataPoints.isEmpty()) {
120+
return 0.0;
121+
}
122+
123+
return fittingAlgorithm.calculateCapacity(dataPoints);
124+
}
125+
126+
private MetricsPoint parseMetrics(Object metric) {
127+
try {
128+
Map<String, Object> data = JacksonUtils.toMap(metric.toString());
129+
Long timestamp = MapUtils.getLong(data, "timestamp");
130+
Double rpm = MapUtils.getDouble(data, "rpm");
131+
Double avgResponseTime = MapUtils.getDouble(data, "avg_response_time", 60 * 1000.0);
132+
return new MetricsPoint(timestamp, rpm, avgResponseTime);
133+
} catch (Exception e) {
134+
log.warn("Failed to parse data point: {}", metric, e);
135+
return null;
136+
}
137+
}
138+
139+
@Data
140+
@AllArgsConstructor
141+
public static class MetricsPoint {
142+
private long timestamp;
143+
private double rpm;
144+
private double avgResponseTime;
145+
}
146+
147+
public interface CapacityFittingAlgorithm {
148+
double calculateCapacity(List<MetricsPoint> historyData);
149+
150+
String getAlgorithmName();
151+
}
152+
153+
public static class EmaFittingAlgorithm implements CapacityFittingAlgorithm {
154+
private final double alpha;
155+
156+
private static final double DEFAULT_EMA_ALPHA = 0.3;
157+
private static final int DEFAULT_HISTORY_SIZE = 10;
158+
private static final int DEFAULT_RECORD_MAX_INTERVAL = 120;
159+
private static final double DEFAULT_RECORD_CHANGE_THRESHOLD = 0.15;
160+
private static final int DEFAULT_DECAY_HALF_LIFE = 180;
161+
private static final double DEFAULT_QUICK_CALC_CURRENT_WEIGHT = 0.3;
162+
private static final double DEFAULT_QUICK_CALC_DECAYED_WEIGHT = 0.7;
163+
164+
public EmaFittingAlgorithm(double alpha) {
165+
this.alpha = alpha;
166+
}
167+
168+
@Override
169+
public double calculateCapacity(List<MetricsPoint> historyData) {
170+
if(historyData.isEmpty()) {
171+
return 0.0;
172+
}
173+
174+
double rpmEma = historyData.get(0).getRpm();
175+
176+
for (int i = 1; i < historyData.size(); i++) {
177+
MetricsPoint point = historyData.get(i);
178+
rpmEma = alpha * point.getRpm() + (1 - alpha) * rpmEma;
179+
}
180+
181+
return Math.floor(rpmEma + 0.5);
182+
}
183+
184+
@Override
185+
public String getAlgorithmName() {
186+
return "ema";
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)