Spring Cloud 基础组件 - Hystrix 之线程上下文

April . 26 . 2021

前言

项目地址 spring-cloud-hystrix-demo

       Hystrix 在执行时可以使用两种不同的隔离策咯 - Thread(线程)和 Semaphore(信号量)来运行。在默认情况下,Hystrix 以 Thread 隔离策略运行。每个 Hystrix 命令都在一个单独的线程池中运行,该线程池不与父线程共享它的上下文,可以自行的控制线程的执行与中断,而不必担心中断父线程其他活动。

      基于 Semaphore 的隔离,Hystrix 不需要启动新线程,如果调用超时,就会中断父线程。在容器服务环境(Tomcat)中, 中断父线程将导致抛出开发人员无法捕获的异常。要控制命令池的隔离设置,可以在 @HystrixCommand 注解上设置 commandProperties 属性,如下所示:

@HystrixCommand(
 	commandProperties = {
        @HystrixProperty(name="execution.isolation.strategy", value="SEMAPHORE")
    }
)

      Hystrix 团队建议开发人员对大多数命令使用 Thread 隔离策咯。这可以保持开发人员与父线程之间更高层次的隔离。Thread 隔离比 Semaphore 隔离更重,Semaphore 隔离模型更轻量级,Semaphore 隔离模型适用于服务量很大且正在使用异步I/O编程模型(Netty)运行的情况。



ThreadLocal 与 Hystrix

      在默认情况下(Thread 隔离级别),Hystrix不会将父线程的上下文传播到自身所管理的子线程中,在父线程中设置的 ThreadLocal 值在子线程中都是不可用的。

      Hystrix 允许开发人员定义一种自定义的并发策略,它将包装 Hystrix 调用,并允许开发人员将附加的父线程上下文注入由 Hystrix 管理的子线程中。

      开发人员需要执行以下3个操作:

  1. 自定义Hystrix 并发策略类
  2. 定义 Callable 子类,将 ThreadLocal 注入到 Hystrix 命令中
  3. 配置 Spring Cloud 以使用自定义 hystrix 并发策略



自定义 Hystrix 并发策略类

ThreadLocalAwareStrategy

public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {

    private final HystrixConcurrencyStrategy existingConcurrencyStrategy;

    public ThreadLocalAwareStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
        this.existingConcurrencyStrategy = existingConcurrencyStrategy;
    }

    @Override
    public BlockingQueue getBlockingQueue(int maxQueueSize) {
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
                : super.getBlockingQueue(maxQueueSize);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixProperty corePoolSize,
                                            HystrixProperty maximumPoolSize,
                                            HystrixProperty keepAliveTime,
                                            TimeUnit unit,
                                            BlockingQueue workQueue) {
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
                maximumPoolSize, keepAliveTime, unit, workQueue)
                : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }

    @Override
    public  Callable wrapCallable(Callable callable) {
        // 注入 Callable 实现, 他将设置UserContext
        return existingConcurrencyStrategy != null
                ? existingConcurrencyStrategy.wrapCallable(DelegatingUserContextCallable.create(callable, UserContextHolder.getContext()))
                : super.wrapCallable(DelegatingUserContextCallable.create(callable, UserContextHolder.getContext()));
    }
}

      因为 Spring Cloud 已经定义了一个 HystrixConcurrencyStrategy,所以需要检查现有的策略类是否存在,然后调用现有的并发策略的方法或调用基类的并发策略方法。



定义 Callable 子类

DelegatingUserContextCallable

@Slf4j
public class DelegatingUserContextCallable implements Callable {

    private final Callable delegate;
    private UserContext originalUserContext;

    public DelegatingUserContextCallable(Callable delegate, UserContext userContext) {
        this.delegate = delegate;
        this.originalUserContext = userContext;
    }

    /**
     * 方法会在被 {@link HystrixCommand} 注解保护的方法之前调用
     *
     * @author nza
     * @createTime 2021/3/7 13:58
     */
    @Override
    public V call() throws Exception {
        // 设置当前线程的上下文对象
        UserContextHolder.setUserContext(originalUserContext);

        try {
            return delegate.call();
        } finally {
            this.originalUserContext = null;
            log.info("删除 hystrix 线程本地变量, currentIp: {}", UserContextHolder.getContext().getCurrentIp());
            UserContextHolder.remove();
        }
    }

    public static  Callable create(Callable delegate, UserContext userContext) {
        return new DelegatingUserContextCallable(delegate, userContext);
    }
}

      将父线程的 ThreadLocal 值设置到子线程中。



配置Spring Cloud 使用自定义并发策略

ThreadLocalConfiguration

@Configuration
public class ThreadLocalConfiguration {

    @Autowired(required = false)
    private HystrixConcurrencyStrategy existingConcurrencyStrategy;

    @PostConstruct
    public void init() {

        // 保留现有的插件引用
        HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                .getEventNotifier();
        HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                .getMetricsPublisher();
        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                .getPropertiesStrategy();
        HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
                .getCommandExecutionHook();

        HystrixPlugins.reset();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
        HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
        HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
        HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
    }
}

      保留已有的插件引用,并将自定义的并发策略类挂勾到 Spring Cloud 与 Hystrix 中。



小结

      Hystrix 支持两种不同的隔离策咯 - Thread(线程)和 Semaphore(信号量),默认使用 Thread 模式。

      Hystrix 允许自定义并发策略的实现,将父线程上下文注入 HyStrix 管理的子线程中。