【SpringCloud】Hystrix源码解析

07-01 1195阅读

【SpringCloud】Hystrix源码解析

hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理

1、服务降级

CAP原则是分布式系统的一个理论基础,它的三个关键属性分别是一致性、可用性和容错性。当服务实例所在服务器承受过大的压力或者受到网络因素影响没法及时响应请求时,整个任务将处于阻塞状态,这样的系统容错性不高,稍有不慎就会陷入瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性

服务降级和@HystrixCommand注解绑定,查看它的源码

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {
    ...
    String fallbackMethod() default "";
}

源码提供的信息很少,想要分析注解的功能,还得找到处理注解信息的类:HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {
    ...
    
    // 环绕通知
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {
            MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
            try {
                Object result;
                if (!metaHolder.isObservable()) {
                    // 代理执行方法
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = this.executeObservable(invokable, executionType, metaHolder);
                }
                return result;
            } catch (HystrixBadRequestException var9) {
                throw var9.getCause();
            } catch (HystrixRuntimeException var10) {
                throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
            }
        }
    }
}

从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法

调用链:
-> CommandExecutor.execute
-> castToExecutable(invokable, executionType).execute()
-> HystrixCommand.execute
-> this.queue().get()
public Future queue() {
	// 获取Future对象
	final Future delegate = this.toObservable().toBlocking().toFuture();
	Future f = new Future() {
		...
		public R get() throws InterruptedException, ExecutionException {
			return delegate.get();
		}
		public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
			return delegate.get(timeout, unit);
		}
	};
	...
}

HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象

public Observable toObservable() {
   ...
   final Func0 applyHystrixSemantics = new Func0() {
        public Observable call() {
              return 
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() : 
              // 传入指令执行任务
              AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
        }
   };
   ...
	return Observable.defer(new Func0() {
		public Observable call() {
                ...
                // 有订阅者订阅了才创建Observable对象
				Observable hystrixObservable = 
                Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
				Observable afterCache;
				if (requestCacheEnabled && cacheKey != null) {
					HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
					HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
					if (fromCache != null) {
						toCache.unsubscribe();
						AbstractCommand.this.isResponseFromCache = true;
						return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
					}
					afterCache = toCache.toObservable();
				} else {
					afterCache = hystrixObservable;
				}
				return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
			...
		}
	});        
}

Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法

private Observable applyHystrixSemantics(AbstractCommand _cmd) {
	this.executionHook.onStart(_cmd);
	// 是否允许请求,判断熔断状态
	if (this.circuitBreaker.allowRequest()) {
		final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
		final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
		Action0 singleSemaphoreRelease = new Action0() {
			public void call() {
				if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
					executionSemaphore.release();
				}
			}
		};
		Action1 markExceptionThrown = new Action1() {
			public void call(Throwable t) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
			}
		};
		if (executionSemaphore.tryAcquire()) {
			try {
				this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
				// 执行任务
				return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
			} catch (RuntimeException var7) {
				return Observable.error(var7);
			}
		} else {
			return this.handleSemaphoreRejectionViaFallback();
		}
	} else {
		// 处于熔断状态,执行备用任务
		return this.handleShortCircuitViaFallback();
	}
}

this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法

private Observable executeCommandAndObserve(AbstractCommand _cmd) {
	...
	Observable execution;
	if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
		// 添加了超时监控
		execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
	} else {
		execution = this.executeCommandWithSpecifiedIsolation(_cmd);
	}
   ...
	// handleFallback:不同异常状况下使用不同的处理方法
	Func1 handleFallback = new Func1() {
		public Observable call(Throwable t) {
			Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
			AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
			if (e instanceof RejectedExecutionException) {
				return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
			} else if (t instanceof HystrixTimeoutException) {
				// 抛出超时异常时,做超时处理
				return AbstractCommand.this.handleTimeoutViaFallback();
			} else if (t instanceof HystrixBadRequestException) {
				return AbstractCommand.this.handleBadRequestByEmittingError(e);
			} else if (e instanceof HystrixBadRequestException) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
				return Observable.error(e);
			} else {
				return AbstractCommand.this.handleFailureViaFallback(e);
			}
		}
	};
   ...
	return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
		  // 调用handleFallback处理异常
		  .onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator implements Observable.Operator {
        final AbstractCommand originalCommand;
        public HystrixObservableTimeoutOperator(AbstractCommand originalCommand) {
            this.originalCommand = originalCommand;
        }
        public Subscriber
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]