基于trace
之前写的两篇关于基于 trace_id 的链路追踪的文章:
- 基于trace_id的链路追踪(含Feign、Hystrix、线程池等场景)
- 基于trace_id的链路追踪(ForkJoinPool场景)
一、引言
在之前的文章中,我们讨论了基于 trace_id 的链路追踪的常见场景。然而,最近我意识到在微服务架构中,我们还缺少对一个非常常见场景的探讨:在网关中如何处理 trace_id,尤其是在 Reactor 异步模式下的处理。因此,我决定记录下这些思考和解决方案。
二、具体场景
在Spring Cloud Gateway网关中,我们需要实现请求访问日志的打印功能,以便更好地排查问题。具体的实现方式包括两个全局过滤器:
- TraceIdGlobalFilter:实现 trace_id 全局拦截(先执行)。
- AccessLogGlobalFilter:实现请求访问日志的打印(后执行)。
在正常情况下,这两个过滤器可以打印请求的 request 日志和 response 日志,并且日志中都包含相同的 trace_id。然而,在开发调试过程中,我发现了一种异常情况:request 日志中总能打印出 trace_id,而 response 日志中则有时能打印出 trace_id,有时却不能。这导致了 request 日志和 response 日志无法关联的问题。
三、分析
1. 为什么 response 日志没有打印 trace_id?
通过分析日志,我发现打印 response 日志的线程与打印 request 日志的线程并不是同一个线程。基于此,我们可以判断,trace_id 没有传递到打印 response 日志的线程中。
2. 为什么 trace_id 没有传递到打印 response 日志的线程中?
我们知道 Spring Cloud Gateway 是基于 WebFlux Reactor 异步模式实现的,因此一个请求的 request 和 response 可能由不同的线程来执行。在 TraceIdGlobalFilter 中,我们使用了 MDC来传递 trace_id。然而,MDC 在普通的多线程环境中有效,但在 Reactor 异步模式下并不起作用。这是因为 Reactor 异步模式需要通过另外一种方式来传递 trace_id。
四、解决方案
在 WebFlux Reactor 异步模式下,我们需要使用 reactor.util.context.Context 来传递 trace_id。核心逻辑如下:
透传 trace_id: 通过 Mono.contextWrite(context) 往 context 中设置 trace_id。
取出 trace_id: 通过 Flux.deferContextual(context) 从 context 中获取 trace_id。
具体实现代码示例如下:
// 设置 trace_id Mono.contextWrite(context -> context.put("trace_id", traceId)); // 获取 trace_id Flux.deferContextual(context -> { String traceId = context.get("trace_id"); // 可将 traceId 设置到MDC中供当前线程使用 return Flux.just(traceId); });
通过这种方式,我们可以确保 trace_id 在整个请求处理链路中都能被正确传递和使用,解决了 request 日志和 response 日志断联的问题。
五、具体代码
TraceIdGlobalFilter
/** * trace_id 全局拦截器 */ @Slf4j @Component public class TraceIdGlobalFilter implements GlobalFilter, Ordered { @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); String traceId = request.getHeaders().getFirst(TraceConsts.TRACE_ID); // trace_id traceId = MdcUtil.attachTraceId(traceId); // 将traceId传递给下游微服务 String finalTraceId = traceId; Consumer headersConsumer = httpHeaders -> { httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId); }; ServerHttpRequest requestNew = exchange.getRequest().mutate().headers(headersConsumer).build(); return chain.filter(exchange.mutate().request(requestNew).build()) .doFinally(s -> { // 清除MDC MdcUtil.detachTraceId(); }); } @Override public int getOrder() { return -100; } }
AccessLogGlobalFilter
/** * 请求访问日志 全局拦截器 */ @Slf4j @Component public class AccessLogGlobalFilter implements GlobalFilter, Ordered { /** * gateway access log 日志开关 *
* 特别注意:高并发业务场景下,可以关闭日志来提升性能 */ @Value("${com.gateway.access.log.enabled:true}") private boolean logEnabled; private final HandlerStrategies handlerStrategies = HandlerStrategies.withDefaults(); @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ServerHttpRequest httpRequest = exchange.getRequest(); // 日志开关,直接进入下一个Filter if (!logEnabled) { return chain.filter(exchange).then(Mono.fromRunnable(() -> { stopWatch.stop(); // 为了方便排查问题,还是打印一个简单的日志 if (log.isDebugEnabled()) { log.debug("请求参数 [{}] [{}] query:{}, time: {} ms", httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis()); } })); } // Request 处理 ServerRequest request = ServerRequest.create(exchange, handlerStrategies.messageReaders()); // header 参数 HttpHeaders httpHeaders = request.headers().asHttpHeaders(); // 是否为文件上传,若是文件上传,则不打印body boolean isFile = null != httpHeaders.getContentType() && AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString()); // response 包装 ServerHttpResponseDecorator responseDecorator = responseDecoratorAndRecordLog(exchange, stopWatch); if (isFile) { // 打印请求日志 this.reqLog(request, isFile, null); // 执行过滤器 return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build()) // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它 // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId())); } Mono modifiedBody = request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body -> { // 打印请求日志 this.reqLog(request, isFile, body); return Mono.just(body); }); // 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次 // BodyInserters.fromPublisher 不支持文件上传,所以不能用 BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { // request 包装 ServerHttpRequestDecorator requestDecorator = requestDecorator(exchange, headers, outputMessage); // 执行过滤器 return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build()) // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它 // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId())); })); } @Override public int getOrder() { return -90; } /** * 打印 request log */ private void reqLog(ServerRequest request, boolean isFile, String body) { // URL query 参数 String queryString = request.uri().getRawQuery(); // header 参数 HttpHeaders headers = request.headers().asHttpHeaders(); String headersParams = headersToString(headers); if (isFile) { if (log.isInfoEnabled()) { log.info("请求参数 [{}] [{}] query:{}, headers:{}", request.uri().getPath(), request.methodName(), queryString, headersParams); } return; } // request body 长度处理,避免太长,打印耗性能 String requestBody = AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN); if (log.isInfoEnabled()) { log.info("请求参数 [{}] [{}] query:{}, headers:{}, body:{}", request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody); } } /** * 过滤headers,避免打印过多的日志 */ private String headersToString(HttpHeaders headers) { Map map = new HashMap(); for (Map.Entry entry : headers.entrySet()) { if (RequestParamUtil.containsHeader(entry.getKey())) { map.put(entry.getKey(), entry.getValue().toString()); } } return JSON.toJSONString(map); } /** * Request装饰器,重新计算 headers */ private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } @Override public Flux getBody() { return outputMessage.getBody(); } }; } /** * Response装饰器,记录响应日志 *
* 通过 DataBufferFactory 解决响应体分段传输问题。 */ private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) { ServerHttpResponse response = exchange.getResponse(); DataBufferFactory bufferFactory = response.bufferFactory(); return new ServerHttpResponseDecorator(response) { @Override public Mono writeWith(Publisher