Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -76,6 +77,7 @@ public static class ChannelCondition extends PageCondition {
private String ownerType;
private String ownerCode;
private String queueName;
private HashSet<Integer> queueModes;
}

@NoArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TaskExecutor {
Expand All @@ -17,6 +18,10 @@ public static CompletableFuture<Void> submit(Runnable r) {
return CompletableFuture.runAsync(r, executor);
}

public static void scheduleAtFixedRate(Runnable r, int period) {
executor.scheduleAtFixedRate(r, 0, period, TimeUnit.SECONDS);
}

public static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private SelectSeekStep1<ChannelRecord, Long> constructSql(Condition.ChannelCondi
.and(StringUtils.isEmpty(op.getOwnerType()) ? DSL.noCondition() : CHANNEL.OWNER_TYPE.eq(op.getOwnerType()))
.and(StringUtils.isEmpty(op.getOwnerCode()) ? DSL.noCondition() : CHANNEL.OWNER_CODE.eq(op.getOwnerCode()))
.and(StringUtils.isEmpty(op.getQueueName()) ? DSL.noCondition() : CHANNEL.QUEUE_NAME.eq(op.getQueueName()))
.and(CollectionUtils.isEmpty(op.getQueueModes()) ? DSL.noCondition() : CHANNEL.QUEUE_MODE.in(op.getQueueModes()))
.orderBy(CHANNEL.ID.desc());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.collect.Lists;
import com.ke.bella.openapi.EndpointProcessData;

import com.ke.bella.openapi.script.LuaScriptExecutor;
import com.ke.bella.openapi.script.ScriptType;
import com.ke.bella.openapi.utils.DateTimeUtils;
Expand Down Expand Up @@ -64,16 +63,15 @@ public void incrementRequestCountPerMinute(String akCode, String entityCode, Str
}
}


public Long getRequestCountPerMinute(String akCode, String entityCode) {
String countKey = String.format(RPM_COUNT_KEY_FORMAT, entityCode, akCode);
Object count = redisson.getBucket(countKey).get();
return count != null ? Long.parseLong(count.toString()) : 0L;
}

public void incrementConcurrentCount(String akCode, String entityCode) {
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
List<Object> keys = Lists.newArrayList(concurrentKey);
List<Object> keys = Lists.newArrayList(concurrentKey, entityCode);
List<Object> params = new ArrayList<>();
params.add("INCR");
try {
Expand All @@ -82,10 +80,10 @@ public void incrementConcurrentCount(String akCode, String entityCode) {
log.warn(e.getMessage(), e);
}
}

public void decrementConcurrentCount(String akCode, String entityCode) {
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
List<Object> keys = Lists.newArrayList(concurrentKey);
List<Object> keys = Lists.newArrayList(concurrentKey, entityCode);
List<Object> params = new ArrayList<>();
params.add("DECR");
try {
Expand All @@ -94,7 +92,7 @@ public void decrementConcurrentCount(String akCode, String entityCode) {
throw new RuntimeException(e);
}
}

public Long getCurrentConcurrentCount(String akCode, String entityCode) {
String concurrentKey = String.format(CONCURRENT_KEY_FORMAT, entityCode, akCode);
Object count = redisson.getBucket(concurrentKey).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.Cached;
import com.alicp.jetcache.template.QuickConfig;
import com.google.common.collect.Sets;
import com.ke.bella.openapi.db.repo.ChannelRepo;
import com.ke.bella.openapi.db.repo.Page;
import com.ke.bella.openapi.metadata.Channel;
Expand All @@ -18,6 +19,7 @@
import com.ke.bella.openapi.tables.pojos.EndpointDB;
import com.ke.bella.openapi.tables.pojos.ModelDB;
import com.ke.bella.openapi.utils.JacksonUtils;
import com.ke.bella.queue.QueueMode;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -174,6 +176,13 @@ public List<ChannelDB> listActives(String entityType, String entityCode) {
return listActivesWithDb(entityType, entityCode);
}

public List<ChannelDB> listAllWorkerChannels() {
return listByCondition(Condition.ChannelCondition.builder()
.status(ACTIVE)
.queueModes(Sets.newHashSet(QueueMode.PULL.getCode(), QueueMode.BOTH.getCode()))
.build());
}

public List<String> listSuppliers() {
return channelRepo.listSuppliers();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package com.ke.bella.openapi.worker;

import com.ke.bella.openapi.protocol.limiter.LimiterManager;
import com.ke.bella.openapi.script.LuaScriptExecutor;
import com.ke.bella.openapi.script.ScriptType;
import com.ke.bella.openapi.tables.pojos.ChannelDB;
import com.ke.bella.openapi.utils.DateTimeUtils;
import com.ke.bella.openapi.utils.JacksonUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.redisson.api.RedissonClient;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
@SuppressWarnings("all")
public class CapacityCalculator {

private String channelCode;
private ChannelDB channel;
private CapacityFittingAlgorithm fittingAlgorithm;
private RedissonClient redissonClient;
private LimiterManager limiterManager;
private LuaScriptExecutor luaScriptExecutor;

private volatile double cachedCapacity = -1.0;
private volatile long cacheTimestamp = 0;
private volatile long maxFinishRpm = 0;
private static final long CACHE_DURATION_MS = 5 * 60 * 1000;
private static final String RPM_429_HISTORY_METRIC = "rpm_429_history";

public CapacityCalculator(ChannelDB channelDB, RedissonClient redissonClient) {
this.channel = channelDB;
this.channelCode = channelDB.getChannelCode();
this.fittingAlgorithm = new EmaFittingAlgorithm(0.3);
this.redissonClient = redissonClient;
}

public CapacityCalculator(ChannelDB channelDB, RedissonClient redissonClient, LuaScriptExecutor luaScriptExecutor) {
this.channel = channelDB;
this.channelCode = channelDB.getChannelCode();
this.fittingAlgorithm = new EmaFittingAlgorithm(0.3);
this.redissonClient = redissonClient;
this.luaScriptExecutor = luaScriptExecutor;
}

public double getRemainingCapacity() {
double capacity = getCapacity();
if(capacity == 0) {
capacity = 0.7 * getCurrentMaxRpm();
}
if(capacity == 0) {
return 1.0;
}

long requestCapacity = getCurrentRequests() + getCompletedRpm();
double remainingCapacity = 1.0 - (requestCapacity / capacity);
return Math.max(0.0, Math.min(1.0, remainingCapacity));
}

private long getCurrentMaxRpm() {
long currentRpm = getCompletedRpm();

if(currentRpm > maxFinishRpm) {
maxFinishRpm = currentRpm;
}

return Math.max(currentRpm, maxFinishRpm);
}

private long getCompletedRpm() {
try {
List<Object> keys = Arrays.asList(channelCode);
List<Object> args = Arrays.asList(DateTimeUtils.getCurrentSeconds());
Object result = luaScriptExecutor.execute("/get_completed", ScriptType.metrics, keys, args);

if(result != null) {
return Long.parseLong(result.toString());
}
} catch (Exception e) {
log.warn("Failed to get completed RPM for channel {}: {}", channelCode, e.getMessage());
}
return 0;
}

private long getCurrentRequests() {
String concurrentKey = "bella-openapi-channel-concurrent:" + channel.getEntityCode();
Object count = redissonClient.getBucket(concurrentKey).get();
return count != null ? Long.parseLong(count.toString()) : 0L;
}

public double getCapacity() {
long currentTime = System.currentTimeMillis();

if(cachedCapacity >= 0 && (currentTime - cacheTimestamp) < CACHE_DURATION_MS) {
return cachedCapacity;
}

double capacity = computeCapacity();
cachedCapacity = capacity;
cacheTimestamp = currentTime;
return capacity;
}

private double computeCapacity() {
String historyKey = "bella-openapi-channel-metrics:" + channelCode + ":" + RPM_429_HISTORY_METRIC;
List<MetricsPoint> dataPoints = redissonClient.getList(historyKey).range(0, -1)
.stream()
.map(this::parseMetrics)
.filter(Objects::nonNull)
.collect(Collectors.toList());

if(dataPoints.isEmpty()) {
return 0.0;
}

return fittingAlgorithm.calculateCapacity(dataPoints);
}

private MetricsPoint parseMetrics(Object metric) {
try {
Map<String, Object> data = JacksonUtils.toMap(metric.toString());
Long timestamp = MapUtils.getLong(data, "timestamp");
Double rpm = MapUtils.getDouble(data, "rpm");
Double avgResponseTime = MapUtils.getDouble(data, "avg_response_time", 60 * 1000.0);
return new MetricsPoint(timestamp, rpm, avgResponseTime);
} catch (Exception e) {
log.warn("Failed to parse data point: {}", metric, e);
return null;
}
}

@Data
@AllArgsConstructor
public static class MetricsPoint {
private long timestamp;
private double rpm;
private double avgResponseTime;
}

public interface CapacityFittingAlgorithm {
double calculateCapacity(List<MetricsPoint> historyData);

String getAlgorithmName();
}

public static class EmaFittingAlgorithm implements CapacityFittingAlgorithm {
private final double alpha;

private static final double DEFAULT_EMA_ALPHA = 0.3;
private static final int DEFAULT_HISTORY_SIZE = 10;
private static final int DEFAULT_RECORD_MAX_INTERVAL = 120;
private static final double DEFAULT_RECORD_CHANGE_THRESHOLD = 0.15;
private static final int DEFAULT_DECAY_HALF_LIFE = 180;
private static final double DEFAULT_QUICK_CALC_CURRENT_WEIGHT = 0.3;
private static final double DEFAULT_QUICK_CALC_DECAYED_WEIGHT = 0.7;

public EmaFittingAlgorithm(double alpha) {
this.alpha = alpha;
}

@Override
public double calculateCapacity(List<MetricsPoint> historyData) {
if(historyData.isEmpty()) {
return 0.0;
}

double rpmEma = historyData.get(0).getRpm();

for (int i = 1; i < historyData.size(); i++) {
MetricsPoint point = historyData.get(i);
rpmEma = alpha * point.getRpm() + (1 - alpha) * rpmEma;
}

return Math.floor(rpmEma + 0.5);
}

@Override
public String getAlgorithmName() {
return "ema";
}
}
}
Loading