三种策略模式:
1)断路器模式:设置超时或者失败等熔断策略
2)后备策略模式:断路器模式触发后,如果存在后备则执行后备(后备模式 方法需写在类里,否则不起作用)
3)舱壁模式:类似于货船,将货船分为多个,当货船发生危险时,所在货仓进行隔离以降低整艘船的风险。Hystrix是通过线程池管理调用外部资源,默认情况下所有服务调用都公用一个线程池。一个性能低下的服务会耗尽Hystrix 线程池资源,进而牵连其他远程调用,最后会耗尽Java容器资源。舱壁模式就是为各个服务分别指定线程池。
1.引入依赖
<!-- hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
2.在启动类添加 @EnableCircuitBreaker // 启动 Hystrix 注解
3.使用注解形式
/**
* 使用 HystrixCommand 注解
* */
@Slf4j
@Service
public class UseHystrixCommandAnnotation {
private final NacosClientService nacosClientService;
public UseHystrixCommandAnnotation(NacosClientService nacosClientService) {
this.nacosClientService = nacosClientService;
}
@HystrixCommand(
// 用于对 Hystrix 命令进行分组, 分组之后便于统计展示于仪表盘、上传报告和预警等等
// 内部进行度量统计时候的分组标识, 数据上报和统计的最小维度就是 groupKey
groupKey = "NacosClientService",
// HystrixCommand 的名字, 默认是当前类的名字, 主要方便 Hystrix 进行监控、报警等
commandKey = "NacosClientService",
// 舱壁模式
threadPoolKey = "NacosClientService",
// 后备模式
fallbackMethod = "getNacosClientInfoFallback",
// 断路器模式
commandProperties = {
// 超时时间, 单位毫秒, 超时进 fallback
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500"),
// 判断熔断的最少请求数, 默认是10; 只有在一定时间内请求数量达到该值, 才会进行成功率的计算
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
// 熔断的阈值默认值 50, 表示在一定时间内有50%的请求处理失败, 会触发熔断
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),
},
// 舱壁模式
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "101"),
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
// 在时间窗口中, 收集统计信息的次数; 在 1440ms 的窗口中一共统计 12 次
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
// 时间窗口, 从监听到第一次失败开始计时
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")
}
)
public List<ServiceInstance> getNacosClientInfo(String serviceId) {
log.info("use hystrix command annotation to get nacos client info: [{}], [{}]",
serviceId, Thread.currentThread().getName());
return nacosClientService.getNacosClientInfo(serviceId);
}
/**
* getNacosClientInfo 的兜底策略 - Hystrix 后备模式
* */
public List<ServiceInstance> getNacosClientInfoFallback(String serviceId) {
log.warn("can not get nacos client, trigger hystrix fallback: [{}], [{}]",
serviceId, Thread.currentThread().getName());
return Collections.emptyList();
}4.编码形式
/**
* <h1>给 NacosClientService 实现包装</h1>
* Hystrix 舱壁模式:
* 1. 线程池
* 2. 信号量: 算法 + 数据结构, 有限状态机
* */
@Slf4j
public class NacosClientHystrixCommand extends HystrixCommand<List<ServiceInstance>> {
/** 需要保护的服务 */
private final NacosClientService nacosClientService;
/** 方法需要传递的参数 */
private final String serviceId;
public NacosClientHystrixCommand(NacosClientService nacosClientService, String serviceId) {
super(
Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("NacosClientPool"))
// 线程池 key 配置
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(THREAD) // 线程池隔离策略
.withFallbackEnabled(true) // 开启降级
.withCircuitBreakerEnabled(true) // 开启熔断器
)
);
// 可以配置信号量隔离策略
// Setter semaphore =
// Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("NacosClientService"))
// .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
// .andCommandPropertiesDefaults(
// HystrixCommandProperties.Setter()
// .withCircuitBreakerRequestVolumeThreshold(10)
// .withCircuitBreakerSleepWindowInMilliseconds(5000)
// .withCircuitBreakerErrorThresholdPercentage(50)
// .withExecutionIsolationStrategy(SEMAPHORE) // 指定使用信号量隔离
// //.....
// );
this.nacosClientService = nacosClientService;
this.serviceId = serviceId;
}
/**
* <h2>要保护的方法调用写在 run 方法中</h2>
* */
@Override
protected List<ServiceInstance> run() throws Exception {
log.info("NacosClientService In Hystrix Command to Get Service Instance: [{}], [{}]",
this.serviceId, Thread.currentThread().getName());
return this.nacosClientService.getNacosClientInfo(this.serviceId);
}
/**
* <h2>降级处理策略</h2>
* */
@Override
protected List<ServiceInstance> getFallback() {
log.warn("NacosClientService run error: [{}], [{}]",
this.serviceId, Thread.currentThread().getName());
return Collections.emptyList();
}调用模式
@GetMapping("/simple-hystrix-command")
public List<ServiceInstance> getServiceInstanceByServiceId(
@RequestParam String serviceId) throws Exception {
// 第一种方式
List<ServiceInstance> serviceInstances01 = new NacosClientHystrixCommand(
nacosClientService, serviceId
).execute(); // 同步阻塞
log.info("use execute to get service instances: [{}], [{}]",
JSON.toJSONString(serviceInstances01), Thread.currentThread().getName());
// 第二种方式(常用)
List<ServiceInstance> serviceInstances02;
Future<List<ServiceInstance>> future = new NacosClientHystrixCommand(
nacosClientService, serviceId
).queue(); // 异步非阻塞
// 这里可以做一些别的事, 需要的时候再去拿结果
serviceInstances02 = future.get();
log.info("use queue to get service instances: [{}], [{}]",
JSON.toJSONString(serviceInstances02), Thread.currentThread().getName());
// 第三种方式
Observable<List<ServiceInstance>> observable = new NacosClientHystrixCommand(
nacosClientService, serviceId
).observe(); // 热响应调用
List<ServiceInstance> serviceInstances03 = observable.toBlocking().single();
log.info("use observe to get service instances: [{}], [{}]",
JSON.toJSONString(serviceInstances03), Thread.currentThread().getName());
// 第四种方式
Observable<List<ServiceInstance>> toObservable = new NacosClientHystrixCommand(
nacosClientService, serviceId
).toObservable(); // 异步冷响应调用
List<ServiceInstance> serviceInstances04 = toObservable.toBlocking().single();
log.info("use toObservable to get service instances: [{}], [{}]",
JSON.toJSONString(serviceInstances04), Thread.currentThread().getName());
// execute = queue + get
return serviceInstances01;
}5.信号量隔离策略
/**
* <h1>HystrixCommand, 隔离策略是基于信号量实现的</h1>
* */
@Slf4j
public class NacosClientHystrixObservableCommand
extends HystrixObservableCommand<List<ServiceInstance>> {
/** 要保护的服务 */
private final NacosClientService nacosClientService;
/** 方法需要传递的参数 */
private final List<String> serviceIds;
public NacosClientHystrixObservableCommand(NacosClientService nacosClientService,
List<String> serviceIds) {
super(
HystrixObservableCommand.Setter
.withGroupKey(HystrixCommandGroupKey
.Factory.asKey("NacosClientService"))
.andCommandKey(HystrixCommandKey
.Factory.asKey("NacosClientHystrixObservableCommand"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withFallbackEnabled(true) // 开启降级
.withCircuitBreakerEnabled(true) // 开启熔断器
)
);
this.nacosClientService = nacosClientService;
this.serviceIds = serviceIds;
}
/**
* <h2>要保护的方法调用写在这里</h2>
* */
@Override
protected Observable<List<ServiceInstance>> construct() {
return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
// Observable 有三个关键的事件方法, 分别是 onNext、onCompleted、onError
@Override
public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
log.info("subscriber command task: [{}], [{}]",
JSON.toJSONString(serviceIds),
Thread.currentThread().getName());
serviceIds.forEach(
s -> subscriber
.onNext(nacosClientService.getNacosClientInfo(s))
);
subscriber.onCompleted();
log.info("command task completed: [{}], [{}]",
JSON.toJSONString(serviceIds),
Thread.currentThread().getName());
}
} catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
/**
* <h2>服务降级</h2>
* */
@Override
protected Observable<List<ServiceInstance>> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<List<ServiceInstance>>() {
@Override
public void call(Subscriber<? super List<ServiceInstance>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
log.info("(fallback) subscriber command task: [{}], [{}]",
JSON.toJSONString(serviceIds),
Thread.currentThread().getName());
subscriber.onNext(Collections.emptyList());
subscriber.onCompleted();
log.info("(fallback) command task completed: [{}], [{}]",
JSON.toJSONString(serviceIds),
Thread.currentThread().getName());
}
} catch (Exception ex) {
subscriber.onError(ex);
}
}
});
}
}@GetMapping("/hystrix-observable-command")
public List<ServiceInstance> getServiceInstancesByServiceIdObservable(
@RequestParam String serviceId) {
List<String> serviceIds = Arrays.asList(serviceId, serviceId, serviceId);
List<List<ServiceInstance>> result = new ArrayList<>(serviceIds.size());
NacosClientHystrixObservableCommand observableCommand =
new NacosClientHystrixObservableCommand(nacosClientService, serviceIds);
// 异步执行命令
Observable<List<ServiceInstance>> observe = observableCommand.observe();
// 注册获取结果
observe.subscribe(
new Observer<List<ServiceInstance>>() {
// 执行 onNext 之后再去执行 onCompleted
@Override
public void onCompleted() {
log.info("all tasks is complete: [{}], [{}]",
serviceId, Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(List<ServiceInstance> instances) {
result.add(instances);
}
}
);
log.info("observable command result is : [{}], [{}]",
JSON.toJSONString(result), Thread.currentThread().getName());
return result.get(0);
}