parent
70b0eb3cbe
commit
a29e4e7b11
@ -0,0 +1,90 @@
|
||||
package org.example.filter;
|
||||
|
||||
import com.netflix.hystrix.strategy.HystrixPlugins;
|
||||
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
|
||||
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
|
||||
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
|
||||
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
|
||||
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
|
||||
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
|
||||
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.servlet.*;
|
||||
import javax.servlet.annotation.WebFilter;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* <h1>初始化 Hystrix 请求上下文环境</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
@WebFilter(
|
||||
filterName = "HystrixRequestContextServletFilter",
|
||||
urlPatterns = "/*", // 所有的 URL 都经过该请求
|
||||
asyncSupported = true // 异步的请求调用也是支持的
|
||||
)
|
||||
public class HystrixRequestContextServletFilter implements Filter {
|
||||
|
||||
@Override
|
||||
public void doFilter(ServletRequest request, ServletResponse response,
|
||||
FilterChain chain) throws IOException, ServletException {
|
||||
|
||||
// 初始化 Hystrix 请求上下文
|
||||
// 在不同的 context 中缓存是不共享的
|
||||
// 这个初始化是必须的
|
||||
HystrixRequestContext context = HystrixRequestContext.initializeContext();
|
||||
|
||||
try {
|
||||
// 配置
|
||||
hystrixConcurrencyStrategyConfig();
|
||||
// 请求正常通过
|
||||
chain.doFilter(request, response);
|
||||
} finally {
|
||||
// 关闭 Hystrix 请求上下文
|
||||
context.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>配置 Hystrix 的并发策略</h2>
|
||||
* */
|
||||
public void hystrixConcurrencyStrategyConfig() {
|
||||
|
||||
try {
|
||||
|
||||
HystrixConcurrencyStrategy target =
|
||||
HystrixConcurrencyStrategyDefault.getInstance();
|
||||
HystrixConcurrencyStrategy strategy =
|
||||
HystrixPlugins.getInstance().getConcurrencyStrategy();
|
||||
if (strategy instanceof HystrixConcurrencyStrategyDefault) {
|
||||
// 如果已经就是我们想要配置的
|
||||
return;
|
||||
}
|
||||
|
||||
// 将原来其他的配置保存下来
|
||||
HystrixCommandExecutionHook commandExecutionHook =
|
||||
HystrixPlugins.getInstance().getCommandExecutionHook();
|
||||
HystrixEventNotifier eventNotifier =
|
||||
HystrixPlugins.getInstance().getEventNotifier();
|
||||
HystrixMetricsPublisher metricsPublisher =
|
||||
HystrixPlugins.getInstance().getMetricsPublisher();
|
||||
HystrixPropertiesStrategy propertiesStrategy =
|
||||
HystrixPlugins.getInstance().getPropertiesStrategy();
|
||||
|
||||
// 先重置, 再把我们自定义的配置与原来的配置写回去
|
||||
HystrixPlugins.reset();
|
||||
HystrixPlugins.getInstance().registerConcurrencyStrategy(target);
|
||||
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
|
||||
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
|
||||
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
|
||||
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
|
||||
|
||||
log.info("config hystrix concurrency strategy success");
|
||||
} catch (Exception ex) {
|
||||
log.error("Failed to register Hystrix Concurrency Strategy: [{}]",
|
||||
ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package org.example.hystrix;
|
||||
|
||||
import com.netflix.hystrix.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.example.service.NacosClientService4HystrixDemo;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
|
||||
|
||||
/**
|
||||
* <h1>给 NacosClientService 实现包装</h1>
|
||||
* Hystrix 舱壁模式:
|
||||
* 1. 线程池
|
||||
* 2. 信号量: 算法 + 数据结构, 有限状态机 (伪线程隔离, 只是用代码的方式进行操作, 看似线程隔离, 不常用, 资源实际未被隔离)
|
||||
* */
|
||||
@Slf4j
|
||||
public class NacosClientHystrixCommand extends HystrixCommand<List<ServiceInstance>> {
|
||||
|
||||
/** 需要保护的服务 */
|
||||
private final NacosClientService4HystrixDemo nacosClientService;
|
||||
|
||||
/** 方法需要传递的参数 */
|
||||
private final String serviceId;
|
||||
|
||||
public NacosClientHystrixCommand(NacosClientService4HystrixDemo nacosClientService, String serviceId) {
|
||||
|
||||
// 线程隔离
|
||||
super(
|
||||
Setter.withGroupKey(
|
||||
HystrixCommandGroupKey.Factory.asKey("NacosClientService4HystrixDemo"))
|
||||
.andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
|
||||
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("NacosClientPool"))
|
||||
// 线程池 key 配置
|
||||
.andCommandPropertiesDefaults(
|
||||
HystrixCommandProperties.Setter()
|
||||
.withExecutionIsolationStrategy(THREAD) // 线程池隔离策略
|
||||
.withFallbackEnabled(true) // 开启降级
|
||||
.withCircuitBreakerEnabled(true) // 开启熔断器
|
||||
)
|
||||
);
|
||||
|
||||
// 可以配置信号量隔离策略
|
||||
// Setter semaphore =
|
||||
// Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("NacosClientService4HystrixDemo"))
|
||||
// .andCommandKey(HystrixCommandKey.Factory.asKey("NacosClientHystrixCommand"))
|
||||
// .andCommandPropertiesDefaults(
|
||||
// HystrixCommandProperties.Setter()
|
||||
// .withCircuitBreakerRequestVolumeThreshold(10) // 至少有10个请求开始, 熔断器才开始熔断计算
|
||||
// .withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断器的请求 5s 后会进入一个半打开的状态, 放开部分请求会进行一个重试
|
||||
// .withCircuitBreakerErrorThresholdPercentage(50) // 当错误率到 50% 就开启熔断保护
|
||||
// .withExecutionIsolationStrategy(SEMAPHORE) // 指定使用信号量隔离
|
||||
// //..... 可以参考上面的线程隔离策略的配置
|
||||
// );
|
||||
|
||||
this.nacosClientService = nacosClientService;
|
||||
this.serviceId = serviceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>要保护的方法调用写在 run 方法中</h2>
|
||||
* */
|
||||
@Override
|
||||
protected List<ServiceInstance> run() throws Exception {
|
||||
// 保护逻辑
|
||||
log.info("NacosClientService4HystrixDemo In Hystrix Command to Get Service Instance: [{}], [{}]",
|
||||
this.serviceId, Thread.currentThread().getName());
|
||||
return this.nacosClientService.getNacosClientInfo(this.serviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>降级处理策略</h2>
|
||||
* */
|
||||
@Override
|
||||
protected List<ServiceInstance> getFallback() {
|
||||
|
||||
log.warn("NacosClientService4HystrixDemo run error: [{}], [{}]",
|
||||
this.serviceId, Thread.currentThread().getName());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
package org.example.hystrix;
|
||||
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.netflix.hystrix.HystrixCommandGroupKey;
|
||||
import com.netflix.hystrix.HystrixCommandKey;
|
||||
import com.netflix.hystrix.HystrixCommandProperties;
|
||||
import com.netflix.hystrix.HystrixObservableCommand;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.example.service.NacosClientService4HystrixDemo;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import rx.Observable;
|
||||
import rx.Subscriber;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <h1>HystrixCommand, 隔离策略是基于信号量实现的</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
public class NacosClientHystrixObservableCommand extends HystrixObservableCommand<List<ServiceInstance>> {
|
||||
/** 要保护的服务 */
|
||||
private final NacosClientService4HystrixDemo nacosClientService4HystrixDemo;
|
||||
|
||||
/** 方法需要传递的参数 */
|
||||
private final List<String> serviceIds;
|
||||
|
||||
public NacosClientHystrixObservableCommand(NacosClientService4HystrixDemo nacosClientService4HystrixDemo,
|
||||
List<String> serviceIds) {
|
||||
super(
|
||||
HystrixObservableCommand.Setter
|
||||
.withGroupKey(HystrixCommandGroupKey
|
||||
.Factory.asKey("NacosClientService4HystrixDemo"))
|
||||
.andCommandKey(HystrixCommandKey
|
||||
.Factory.asKey("NacosClientHystrixObservableCommand"))
|
||||
.andCommandPropertiesDefaults(
|
||||
HystrixCommandProperties.Setter()
|
||||
.withFallbackEnabled(true) // 开启降级
|
||||
.withCircuitBreakerEnabled(true) // 开启熔断器
|
||||
)
|
||||
);
|
||||
|
||||
this.nacosClientService4HystrixDemo = nacosClientService4HystrixDemo;
|
||||
this.serviceIds = serviceIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>要保护的方法调用写在这里</h2>
|
||||
* */
|
||||
@Override
|
||||
protected Observable<List<ServiceInstance>> construct() {
|
||||
|
||||
// Observable 有三个关键的事件方法, 分别是 onNext、onCompleted、onError
|
||||
return Observable.unsafeCreate(subscriber -> {
|
||||
|
||||
try {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
log.info("subscriber command task: [{}], [{}]",
|
||||
JSON.toJSONString(serviceIds),
|
||||
Thread.currentThread().getName());
|
||||
serviceIds.forEach(
|
||||
s -> subscriber
|
||||
.onNext(nacosClientService4HystrixDemo.getNacosClientInfo(s))
|
||||
);
|
||||
subscriber.onCompleted();
|
||||
log.info("command task completed: [{}], [{}]",
|
||||
JSON.toJSONString(serviceIds),
|
||||
Thread.currentThread().getName());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
subscriber.onError(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>服务降级</h2>
|
||||
* */
|
||||
@Override
|
||||
protected Observable<List<ServiceInstance>> resumeWithFallback() {
|
||||
|
||||
return Observable.unsafeCreate(subscriber -> {
|
||||
|
||||
try {
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
log.info("(fallback) subscriber command task: [{}], [{}]",
|
||||
JSON.toJSONString(serviceIds),
|
||||
Thread.currentThread().getName());
|
||||
subscriber.onNext(Collections.emptyList());
|
||||
subscriber.onCompleted();
|
||||
log.info("(fallback) command task completed: [{}], [{}]",
|
||||
JSON.toJSONString(serviceIds),
|
||||
Thread.currentThread().getName());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
subscriber.onError(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue