Skip to content
Open

init #1546

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public class DynamicThreadPoolConfig {

private static final int MAX_POOL_SIZE = AVAILABLE_PROCESSORS * 4;

/**
* 我定义了一个 SpringBoot 的配置类,在配置类中定义了一个 DynamicThreadPoolExecutor 对象交给了 SpringBoot 容器来管理,
* 并且还在该对象上面添加了 @DynamicThreadPool 注解。这样一来,我肯定就可以在
* DynamicThreadPoolPostProcessor 对象处理器的 postProcessAfterInitialization
* 方法中得到该对象,然后判断该对象是否为动态线程池对象,如果是的话,就可以把这个动态线程池的信息注册到服务端
* @return
*/
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
Expand All @@ -78,6 +85,22 @@ public Executor messageConsumeTtlDynamicThreadPool() {
return ttlExecutor;
}

@Bean
@DynamicThreadPool
public Executor messageConsumeTestDynamicThreadPool() {
String threadPoolId = "test";
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(EXECUTE_TIMEOUT)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(AWAIT_TERMINATION_MILLIS)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
return customExecutor;
}

/**
* {@link Bean @Bean} and {@link DynamicThreadPool @DynamicThreadPool}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoo

private static final String FIELD_NAME = "executor";

// 判断传进来的对象是否和当前适配器器对象匹配
@Override
public boolean match(Object executor) {
// 其实就是判断对象的类名是否为ExecutorTtlWrapper,如果是就意味着是第三方线程池
// 这个线程池中持有者动态线程池对象
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}

// 从ExecutorTtlWrapper对象中获得其持有的DynamicThreadPoolExecutor对象
@Override
public ThreadPoolExecutor unwrap(Object executor) {
// 通过反射获得ExecutorTtlWrapper对象的executor成员变量
// 在之前展示的ExecutorTtlWrapper类的代码中,可以看到,动态线程池会赋值给
// ExecutorTtlWrapper的executor成员变量
return (ThreadPoolExecutor) ReflectUtil.getFieldValue(executor, FIELD_NAME);
}

@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
// 将dynamicThreadPoolExecutor对象替换到executor中
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@
public class InstanceInfo {

private static final String UNKNOWN = "unknown";

//应用名称,未设置就是未知
private String appName = UNKNOWN;

//地址
private String hostName;

//这个就是命名空间+项目Id
private String groupKey;

//端口号
private String port;

//客户端服务实例Id,其实就是客户端地址+uuid (127.0.0.1:8088_eceeab1ab6a0471b838b97a47cfa1268)
private String instanceId;

private String ipApplicationName;

//客户端在配置文件中定义的上下文路径
private String clientBasePath;

//客户端回调地址,这个地址非常重要,一会就会为大家解释说明
private String callBackUrl;

//客户端唯一标识符,其实和instanceId一样、
//只不过这个标识符是要在web界面展示给用户的
private String identify;

private String active;
Expand All @@ -61,11 +62,11 @@ public class InstanceInfo {
private volatile ActionType actionType;

private volatile boolean isInstanceInfoDirty = false;

//客户端最后更新时间戳
private volatile Long lastUpdatedTimestamp;

private volatile Long lastDirtyTimestamp;

//服务实例的默认状态为up,也就是上线状态
private volatile InstanceStatus status = InstanceStatus.UP;

private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

/**
* Dynamic thread-pool register parameter.
* 封装动态线程池核心信息的对象
*/
@Data
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@

/**
* Dynamic thread-pool post processor.
* BeanPostProcessor 接口定义了两个主要方法:
* 1. postProcessBeforeInitialization(Object bean, String beanName):在 bean 初始化方法
* (如 @PostConstruct 注解的方法或 InitializingBean 接口的 afterPropertiesSet 方法)调用之前执行。
* 2. postProcessAfterInitialization(Object bean, String beanName):在 bean 初始化方法调用之后执行。
*/
@Slf4j
@AllArgsConstructor
public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

// 配置信息对象
private final BootstrapConfigProperties configProperties;

private static final int DEFAULT_ACTIVE_ALARM = 80;
Expand All @@ -62,32 +67,56 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

private static final String DEFAULT_RECEIVES = "";

// bean前置处理方法
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}

// bean后置处理方法
// 在这里判断bean是否为动态线程池对象,如果是的话就可以把动态线程池信息注册到服务端
// 这个方法就是本类最核心的方法,用来处理DynamicThreadPoolExecutor对象
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 这里会先判断一下传进来的bean是否属于DynamicThreadPoolExecutor类型,如果大家看了我在DynamicThreadPoolConfig类提供的几个例子
// 就会发现我创建动态线程池对象最终是以Executor或者ThreadPoolExecutor形式返回的,如果是以Executor形式返回的,这个Executor接收的还并不是一个DynamicThreadPoolExecutor对象
// 而是一个ExecutorTtlWrapper对象,这个ExecutorTtlWrapper对象的作用我已经在DynamicThreadPoolConfig类中解释了,这时候,ExecutorTtlWrapper对象肯定就不属于DynamicThreadPoolExecutor类型了
// 但是先别急,虽然ExecutorTtlWrapper对象不属于DynamicThreadPoolExecutor类型,但是后面的DynamicThreadPoolAdapterChoose.match(bean)这个条件还是可以通过的,所以仍然可以进入下面的分支
// 那为什么要执行DynamicThreadPoolAdapterChoose.match(bean)这行代码呢?原因也很简单,因为有时候用户可能会使用spring本身的线程池,或者其他第三方形式的线程池,比如ExecutorTtl,比如spring的ThreadPoolTaskExecutor
// 该动态线程池框架也想收集这些线程池的信息,所以就会在DynamicThreadPoolAdapterChoose.match(bean)中判断程序内是否有这些第三方线程池的适配器,如果有,就可以使用这些适配器把这些第三方线程池转换成DynamicThreadPoolExecutor对象
// 之后的逻辑就和处理真正的DynamicThreadPoolExecutor对象一样了,无非就是把线程池信息注册到服务端,然后把线程池保存在线程池全局管理器中
// DynamicThreadPoolAdapterChoose.match(bean)就是判断bean的类型是否为ThreadPoolTaskExecutor、ExecutorTtlWrapper、ExecutorServiceTtlWrapper中的一个,这些都是第三方的线程池
if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
DynamicThreadPool dynamicThreadPool;
try {
// 判断该线程池bean对象上是否存在DynamicThreadPool注解
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
// 如果找不到该注解,就进入下面这个分支
if (Objects.isNull(dynamicThreadPool)) {
// Adapt to lower versions of SpringBoot.
// 这里就是为了适配SpringBoot低版本,使用DynamicThreadPoolAnnotationUtil工具再次查找注解
dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
// 还是找不到则直接返回bean即可
return bean;
}
}
} catch (Exception ex) {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}
// 走到这里意味着当前的bean上有DynamicThreadPool注解,也就意味着是一个动态线程池,下面就要收集动态线程池配置信息了
// 定义一个动态线程池
ThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
// 下面的if分支会先从适配器中获得真正的动态线程池,如果获得的线程池为空,说明当前bean本身就是动态线程池,如果不为空,则正好得到了真正的动态线程池,并且赋值给dynamicThreadPoolExecutor了
// 将bean转换为dynamicThreadPoolExecutor类型,确切地说不是把当前要交给容器的这个bean转换成dynamicThreadPoolExecutor对象
// 实际上ExecutorTtlWrapper只是持有了dynamicThreadPoolExecutor的引用,这里只不过是直接利用反射从ExecutorTtlWrapper把dynamicThreadPoolExecutor对象取出来了
if (dynamicThreadPoolExecutor == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
// 将刚刚得到的dynamicThreadPoolExecutor对象包装成一个DynamicThreadPoolWrapper对象,这个对象会被交给线程池全局管理器来管理
// 之后收集线程池运行信息时都要用到这个对象
// 在这里把动态线程池的信息注册给服务端了
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
Expand All @@ -100,10 +129,15 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
*
* @param threadPoolId dynamic thread-pool id
* @param executor dynamic thread-pool executor
* fillPoolAndRegister 方法实现思路非常简单,但要执行的操作就稍微多一些了,
* 我之所以说该方法实现思路简单,是因为在该方法中,只需要把动态线程池的配置信息封装到一个新的对象,
* 就是我即将要定义的 DynamicThreadPoolRegisterParameter 对象中,
* 然后将这个对象直接通过 HttpAgent 通信组件发送给服务端即可
*/
protected ThreadPoolExecutor fillPoolAndRegister(String threadPoolId, ThreadPoolExecutor executor) {
ExecutorProperties executorProperties = null;
if (configProperties.getExecutors() != null) {
// 从配置文件中获取线程池配置信息
executorProperties = configProperties.getExecutors()
.stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,26 @@

/**
* Bootstrap properties.
* //动态线程池服务端的地址
* spring.dynamic.thread-pool.server-addr=http://localhost:6691
* //客户端namespace
* spring.dynamic.thread-pool.namespace=prescription
* //itemId
* spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
* //访问服务端时需要的用户名和密码
* spring.dynamic.thread-pool.username=admin
* spring.dynamic.thread-pool.password=123456
*/
@Slf4j
@Getter
@Setter
@ConfigurationProperties(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX)
/**
* 既然是要使用 BootstrapProperties 来封装配置类中的信息,那么 BootstrapProperties 类的对象肯定要被
* SpringBoot 容器来管理,所以接下来我要定义一个配置类,在这个配置类中,
* 把 BootstrapProperties 的对象交给 SpringBoot 的容器来管理。
* 这个配置类我也定义好了,DynamicThreadPoolAutoConfiguration
*/
public class BootstrapProperties implements BootstrapPropertiesInterface {

/**
Expand All @@ -50,31 +65,40 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {

/**
* Netty server port
* //netty服务器的端口号,这个是可配置的
* //在hippo4j框架,提供了两种通信方式,一种是http,一种就是netty
* //在该框架中默认使用的是http,所以我就不引入netty了
*/
private String nettyServerPort;

/**
* Report type
* //客户端上报给服务端线程池历史信息的方法,这个也可以使用netty的方式上报
* //我仍然使用内部默认的http了,不引入netty
*/
private String reportType;

/**
* Namespace
* //命名空间
*/
private String namespace;

/**
* Item id
* //项目Id
*/
private String itemId;

/**
* Whether to enable dynamic thread pool
* //是否启动动态线程池
*/
private Boolean enable = true;

/**
* Print dynamic thread pool banner
* //是否在控制台打印hippo4j的启动图案
*/
private Boolean banner = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,15 @@
@Configuration
@AllArgsConstructor
@ConditionalOnBean(MarkerConfiguration.Marker.class)
//这个注解会使BootstrapProperties类上的ConfigurationProperties注解生效,
// BootstrapProperties对象就可以被SpringBoot容器管理了
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {

//在这里把配置文件中的相关信息封封装到这个成员变量中了
//properties对象会被自动注入
private final BootstrapProperties properties;

private final ConfigurableEnvironment environment;
Expand Down Expand Up @@ -137,6 +141,8 @@ public AdaptedThreadPoolDestroyPostProcessor adaptedThreadPoolDestroyPostProcess
return new AdaptedThreadPoolDestroyPostProcessor(applicationContext);
}

//动态线程池处理器,这个处理器其实是就是spring中的一个bean处理器,在这个bean处理器中把动态线程池包装成了DynamicThreadPoolRegisterWrapper对象
//然后开始向服务端注册该动态线程池的信息
@Bean
@ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@SuppressWarnings("all")
Expand Down Expand Up @@ -242,6 +248,7 @@ public DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig(ThreadP
return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties);
}

//远程通信组件,使用的是http通信方式
@Bean
public HttpAgent httpAgent(BootstrapProperties properties) {
return new ServerHttpAgent(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,43 @@

/**
* Server http agent.
* 用来访问服务端的http代理类,ServerHttpAgent实现了HttpAgent接口
*
* 这个组件将会在DynamicThreadPoolAutoConfiguration 中被注入为组件
*/
public class ServerHttpAgent implements HttpAgent {

//配置信息对象
private final BootstrapProperties dynamicThreadPoolProperties;

//服务地址管理器,这个对象中封装着可用的服务端地址列表,当然,服务端地址的列表信息,需要用户提前定义在配置文件中
//nacos中也有这个类
private final ServerListManager serverListManager;

//安全代理类,如果有朋友自己看过nacos客户端源码,肯定对这个类的作用不陌生,这就是要给安全代理类
//主要作用就是用来访问服务端的,通过这个类从服务端获得token,以后每次访问服务端都会携带这个token
//但是在我迭代的nacos客户端代码中,我把这个功能给省略了,如果大家感兴趣可以自己去看看nacos客户端的源码
private SecurityProxy securityProxy;

private ServerHealthCheck serverHealthCheck;

//定时任务执行器,这个定时任务执行器会定期刷新本地缓存的token
private ScheduledExecutorService executorService;

//定时任务执行间隔
private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);

public ServerHttpAgent(BootstrapProperties properties) {
this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
this.securityProxy = new SecurityProxy(properties);
//在这里已经先从服务端获取到了token了
this.securityProxy.applyToken(this.serverListManager.getServerUrls());
//创建定时任务执行器
this.executorService = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.token.security.updater").build());
//向定时任务执行器提交了定期执行的任务
this.executorService.scheduleWithFixedDelay(
//定期访问服务端,刷新本地token
() -> securityProxy.applyToken(serverListManager.getServerUrls()),
0,
securityInfoRefreshIntervalMills,
Expand Down Expand Up @@ -85,6 +98,8 @@ public Result httpGetSimple(String path) {
return HttpUtil.get(buildUrl(path), Result.class);
}

//下面就是具体的访问服务端的方法了,这里只需要强调一点,那就是访问服务端之前
//会调用本类的injectSecurityInfo方法,把本地token封装到请求中一起发送给服务端
@Override
public Result httpPost(String path, Object body) {
isHealthStatus();
Expand Down Expand Up @@ -129,6 +144,7 @@ private void isHealthStatus() {
serverHealthCheck.isHealthStatus();
}

//把本地token封装到map中的方法
private Map injectSecurityInfo(Map<String, String> params) {
if (StringUtil.isNotBlank(securityProxy.getAccessToken())) {
params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken());
Expand Down
Loading