【SpringCloud】Hystrix源码解析
hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理
1、服务降级
CAP原则是分布式系统的一个理论基础,它的三个关键属性分别是一致性、可用性和容错性。当服务实例所在服务器承受过大的压力或者受到网络因素影响没法及时响应请求时,整个任务将处于阻塞状态,这样的系统容错性不高,稍有不慎就会陷入瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性
服务降级和@HystrixCommand注解绑定,查看它的源码
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {
...
String fallbackMethod() default "";
}
源码提供的信息很少,想要分析注解的功能,还得找到处理注解信息的类:HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {
...
// 环绕通知
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
Method method = AopUtils.getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
} else {
MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
try {
Object result;
if (!metaHolder.isObservable()) {
// 代理执行方法
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = this.executeObservable(invokable, executionType, metaHolder);
}
return result;
} catch (HystrixBadRequestException var9) {
throw var9.getCause();
} catch (HystrixRuntimeException var10) {
throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
}
}
}
}
从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法
调用链: -> CommandExecutor.execute -> castToExecutable(invokable, executionType).execute() -> HystrixCommand.execute -> this.queue().get()
public Future queue() {
// 获取Future对象
final Future delegate = this.toObservable().toBlocking().toFuture();
Future f = new Future() {
...
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
...
}
HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象
public Observable toObservable() {
...
final Func0 applyHystrixSemantics = new Func0() {
public Observable call() {
return
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() :
// 传入指令执行任务
AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
}
};
...
return Observable.defer(new Func0() {
public Observable call() {
...
// 有订阅者订阅了才创建Observable对象
Observable hystrixObservable =
Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
Observable afterCache;
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
toCache.unsubscribe();
AbstractCommand.this.isResponseFromCache = true;
return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
}
afterCache = toCache.toObservable();
} else {
afterCache = hystrixObservable;
}
return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
...
}
});
}
Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法
private Observable applyHystrixSemantics(AbstractCommand _cmd) {
this.executionHook.onStart(_cmd);
// 是否允许请求,判断熔断状态
if (this.circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
Action0 singleSemaphoreRelease = new Action0() {
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
Action1 markExceptionThrown = new Action1() {
public void call(Throwable t) {
AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
// 执行任务
return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException var7) {
return Observable.error(var7);
}
} else {
return this.handleSemaphoreRejectionViaFallback();
}
} else {
// 处于熔断状态,执行备用任务
return this.handleShortCircuitViaFallback();
}
}
this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法
private Observable executeCommandAndObserve(AbstractCommand _cmd) {
...
Observable execution;
if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
// 添加了超时监控
execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
} else {
execution = this.executeCommandWithSpecifiedIsolation(_cmd);
}
...
// handleFallback:不同异常状况下使用不同的处理方法
Func1 handleFallback = new Func1() {
public Observable call(Throwable t) {
Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// 抛出超时异常时,做超时处理
return AbstractCommand.this.handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return AbstractCommand.this.handleBadRequestByEmittingError(e);
} else if (e instanceof HystrixBadRequestException) {
AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
return Observable.error(e);
} else {
return AbstractCommand.this.handleFailureViaFallback(e);
}
}
};
...
return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
// 调用handleFallback处理异常
.onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator implements Observable.Operator {
final AbstractCommand originalCommand;
public HystrixObservableTimeoutOperator(AbstractCommand originalCommand) {
this.originalCommand = originalCommand;
}
public Subscriber
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

