Administrator
发布于 2024-01-24 / 2 阅读
0

SpringCloud Netflix Hystrix

三种策略模式:

1)断路器模式:设置超时或者失败等熔断策略

2)后备策略模式:断路器模式触发后,如果存在后备则执行后备(后备模式 方法需写在类里,否则不起作用)

3)舱壁模式:类似于货船,将货船分为多个,当货船发生危险时,所在货仓进行隔离以降低整艘船的风险。Hystrix是通过线程池管理调用外部资源,默认情况下所有服务调用都公用一个线程池。一个性能低下的服务会耗尽Hystrix 线程池资源,进而牵连其他远程调用,最后会耗尽Java容器资源。舱壁模式就是为各个服务分别指定线程池。

1.引入依赖

<!--  hystrix -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

2.在启动类添加 @EnableCircuitBreaker // 启动 Hystrix 注解

3.使用注解形式


/**
 * 使用 HystrixCommand 注解
 * */
@Slf4j
@Service
public class UseHystrixCommandAnnotation {

    private final NacosClientService nacosClientService;

    public UseHystrixCommandAnnotation(NacosClientService nacosClientService) {
        this.nacosClientService = nacosClientService;
    }

    @HystrixCommand(
            // 用于对 Hystrix 命令进行分组, 分组之后便于统计展示于仪表盘、上传报告和预警等等
            // 内部进行度量统计时候的分组标识, 数据上报和统计的最小维度就是 groupKey
            groupKey = "NacosClientService",
            // HystrixCommand 的名字, 默认是当前类的名字, 主要方便 Hystrix 进行监控、报警等
            commandKey = "NacosClientService",
            // 舱壁模式
            threadPoolKey = "NacosClientService",
            // 后备模式
            fallbackMethod = "getNacosClientInfoFallback",
            // 断路器模式
            commandProperties = {
                    // 超时时间, 单位毫秒, 超时进 fallback
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500"),
                    // 判断熔断的最少请求数, 默认是10; 只有在一定时间内请求数量达到该值, 才会进行成功率的计算
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
                    // 熔断的阈值默认值 50, 表示在一定时间内有50%的请求处理失败, 会触发熔断
                    @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),
            },
            // 舱壁模式
            threadPoolProperties = {
                    @HystrixProperty(name = "coreSize", value = "30"),
                    @HystrixProperty(name = "maxQueueSize", value = "101"),
                    @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
                    // 在时间窗口中, 收集统计信息的次数; 在 1440ms 的窗口中一共统计 12 次
                    @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
                    // 时间窗口, 从监听到第一次失败开始计时
                    @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")
            }
    )
    public List<ServiceInstance> getNacosClientInfo(String serviceId) {

        log.info("use hystrix command annotation to get nacos client info: [{}], [{}]",
                serviceId, Thread.currentThread().getName());
        return nacosClientService.getNacosClientInfo(serviceId);
    }

    /**
     * getNacosClientInfo 的兜底策略 - Hystrix 后备模式
     * */
    public List<ServiceInstance> getNacosClientInfoFallback(String serviceId) {

        log.warn("can not get nacos client, trigger hystrix fallback: [{}], [{}]",
                serviceId, Thread.currentThread().getName());
        return Collections.emptyList();
    }

4.编码形式


/**
 * <h1>给 NacosClientService 实现包装</h1>
 * Hystrix 舱壁模式:
 *  1. 线程池
 *  2. 信号量: 算法 + 数据结构, 有限状态机
 * */
@Slf4j
public class NacosClientHystrixCommand extends HystrixCommand<List<ServiceInstance>> {

    /** 需要保护的服务 */
    private final NacosClientService nacosClientService;

    /** 方法需要传递的参数 */
    private final String serviceId;

    public NacosClientHystrixCommand(NacosClientService nacosClientService, String serviceId) {

        super(
                Setter.withGroupKey(
                        HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("NacosClientPool"))
                // 线程池 key 配置
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(THREAD) // 线程池隔离策略
                        .withFallbackEnabled(true)  // 开启降级
                        .withCircuitBreakerEnabled(true)    // 开启熔断器
                )
        );

        // 可以配置信号量隔离策略
//        Setter semaphore =
//                Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
//                .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
//                .andCommandPropertiesDefaults(
//                        HystrixCommandProperties.Setter()
//                        .withCircuitBreakerRequestVolumeThreshold(10)
//                        .withCircuitBreakerSleepWindowInMilliseconds(5000)
//                        .withCircuitBreakerErrorThresholdPercentage(50)
//                        .withExecutionIsolationStrategy(SEMAPHORE)  // 指定使用信号量隔离
//                        //.....
//                );

        this.nacosClientService = nacosClientService;
        this.serviceId = serviceId;
    }

    /**
     * <h2>要保护的方法调用写在 run 方法中</h2>
     * */
    @Override
    protected List<ServiceInstance> run() throws Exception {

        log.info("NacosClientService In Hystrix Command to Get Service Instance: [{}], [{}]",
                this.serviceId, Thread.currentThread().getName());
        return this.nacosClientService.getNacosClientInfo(this.serviceId);
    }

    /**
     * <h2>降级处理策略</h2>
     * */
    @Override
    protected List<ServiceInstance> getFallback() {

        log.warn("NacosClientService run error: [{}], [{}]",
                this.serviceId, Thread.currentThread().getName());
        return Collections.emptyList();
    }

调用模式

@GetMapping("/simple-hystrix-command")
    public List<ServiceInstance> getServiceInstanceByServiceId(
            @RequestParam String serviceId) throws Exception {

        // 第一种方式
        List<ServiceInstance> serviceInstances01 = new NacosClientHystrixCommand(
                nacosClientService, serviceId
        ).execute();    // 同步阻塞
        log.info("use execute to get service instances: [{}], [{}]",
                JSON.toJSONString(serviceInstances01), Thread.currentThread().getName());

        // 第二种方式(常用)
        List<ServiceInstance> serviceInstances02;
        Future<List<ServiceInstance>> future = new NacosClientHystrixCommand(
                nacosClientService, serviceId
        ).queue();      // 异步非阻塞
        // 这里可以做一些别的事, 需要的时候再去拿结果
        serviceInstances02 = future.get();
        log.info("use queue to get service instances: [{}], [{}]",
                JSON.toJSONString(serviceInstances02), Thread.currentThread().getName());

        // 第三种方式
        Observable<List<ServiceInstance>> observable = new NacosClientHystrixCommand(
                nacosClientService, serviceId
        ).observe();        // 热响应调用
        List<ServiceInstance> serviceInstances03 = observable.toBlocking().single();
        log.info("use observe to get service instances: [{}], [{}]",
                JSON.toJSONString(serviceInstances03), Thread.currentThread().getName());

        // 第四种方式
        Observable<List<ServiceInstance>> toObservable = new NacosClientHystrixCommand(
                nacosClientService, serviceId
        ).toObservable();        // 异步冷响应调用
        List<ServiceInstance> serviceInstances04 = toObservable.toBlocking().single();
        log.info("use toObservable to get service instances: [{}], [{}]",
                JSON.toJSONString(serviceInstances04), Thread.currentThread().getName());

        // execute = queue + get
        return serviceInstances01;
    }

5.信号量隔离策略


/**
 * <h1>HystrixCommand, 隔离策略是基于信号量实现的</h1>
 * */
@Slf4j
public class NacosClientHystrixObservableCommand
        extends HystrixObservableCommand<List<ServiceInstance>> {

    /** 要保护的服务 */
    private final NacosClientService nacosClientService;

    /** 方法需要传递的参数 */
    private final List<String> serviceIds;

    public NacosClientHystrixObservableCommand(NacosClientService nacosClientService,
                                               List<String> serviceIds) {
        super(
                HystrixObservableCommand.Setter
                        .withGroupKey(HystrixCommandGroupKey
                                .Factory.asKey("NacosClientService"))
                        .andCommandKey(HystrixCommandKey
                                .Factory.asKey("NacosClientHystrixObservableCommand"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                        .withFallbackEnabled(true)          // 开启降级
                        .withCircuitBreakerEnabled(true)    // 开启熔断器
                )
        );

        this.nacosClientService = nacosClientService;
        this.serviceIds = serviceIds;
    }

    /**
     * <h2>要保护的方法调用写在这里</h2>
     * */
    @Override
    protected Observable<List<ServiceInstance>> construct() {

        return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
            // Observable 有三个关键的事件方法, 分别是 onNext、onCompleted、onError
            @Override
            public void call(Subscriber<? super List<ServiceInstance>> subscriber) {

                try {
                    if (!subscriber.isUnsubscribed()) {
                        log.info("subscriber command task: [{}], [{}]",
                                JSON.toJSONString(serviceIds),
                                Thread.currentThread().getName());
                        serviceIds.forEach(
                                s -> subscriber
                                        .onNext(nacosClientService.getNacosClientInfo(s))
                        );
                        subscriber.onCompleted();
                        log.info("command task completed: [{}], [{}]",
                                JSON.toJSONString(serviceIds),
                                Thread.currentThread().getName());
                    }
                } catch (Exception ex) {
                    subscriber.onError(ex);
                }
            }
        });
    }

    /**
     * <h2>服务降级</h2>
     * */
    @Override
    protected Observable<List<ServiceInstance>> resumeWithFallback() {

        return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
            @Override
            public void call(Subscriber<? super List<ServiceInstance>> subscriber) {

                try {
                    if (!subscriber.isUnsubscribed()) {
                        log.info("(fallback) subscriber command task: [{}], [{}]",
                                JSON.toJSONString(serviceIds),
                                Thread.currentThread().getName());
                        subscriber.onNext(Collections.emptyList());
                        subscriber.onCompleted();
                        log.info("(fallback) command task completed: [{}], [{}]",
                                JSON.toJSONString(serviceIds),
                                Thread.currentThread().getName());
                    }
                } catch (Exception ex) {
                    subscriber.onError(ex);
                }
            }
        });
    }
}
@GetMapping("/hystrix-observable-command")
    public List<ServiceInstance> getServiceInstancesByServiceIdObservable(
            @RequestParam String serviceId) {

        List<String> serviceIds = Arrays.asList(serviceId, serviceId, serviceId);
        List<List<ServiceInstance>> result = new ArrayList<>(serviceIds.size());

        NacosClientHystrixObservableCommand observableCommand =
                new NacosClientHystrixObservableCommand(nacosClientService, serviceIds);

        // 异步执行命令
        Observable<List<ServiceInstance>> observe = observableCommand.observe();

        // 注册获取结果
        observe.subscribe(
                new Observer<List<ServiceInstance>>() {

                    // 执行 onNext 之后再去执行 onCompleted
                    @Override
                    public void onCompleted() {
                        log.info("all tasks is complete: [{}], [{}]",
                                serviceId, Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onNext(List<ServiceInstance> instances) {
                        result.add(instances);
                    }
                }
        );

        log.info("observable command result is : [{}], [{}]",
                JSON.toJSONString(result), Thread.currentThread().getName());
        return result.get(0);
    }