基于trace

07-21 564阅读

之前写的两篇关于基于 trace_id 的链路追踪的文章:

基于trace
(图片来源网络,侵删)
  • 基于trace_id的链路追踪(含Feign、Hystrix、线程池等场景)
  • 基于trace_id的链路追踪(ForkJoinPool场景)

    一、引言

    在之前的文章中,我们讨论了基于 trace_id 的链路追踪的常见场景。然而,最近我意识到在微服务架构中,我们还缺少对一个非常常见场景的探讨:在网关中如何处理 trace_id,尤其是在 Reactor 异步模式下的处理。因此,我决定记录下这些思考和解决方案。

    二、具体场景

    在Spring Cloud Gateway网关中,我们需要实现请求访问日志的打印功能,以便更好地排查问题。具体的实现方式包括两个全局过滤器:

    • TraceIdGlobalFilter:实现 trace_id 全局拦截(先执行)。
    • AccessLogGlobalFilter:实现请求访问日志的打印(后执行)。

      在正常情况下,这两个过滤器可以打印请求的 request 日志和 response 日志,并且日志中都包含相同的 trace_id。然而,在开发调试过程中,我发现了一种异常情况:request 日志中总能打印出 trace_id,而 response 日志中则有时能打印出 trace_id,有时却不能。这导致了 request 日志和 response 日志无法关联的问题。

      三、分析

      1. 为什么 response 日志没有打印 trace_id?

      通过分析日志,我发现打印 response 日志的线程与打印 request 日志的线程并不是同一个线程。基于此,我们可以判断,trace_id 没有传递到打印 response 日志的线程中。

      2. 为什么 trace_id 没有传递到打印 response 日志的线程中?

      我们知道 Spring Cloud Gateway 是基于 WebFlux Reactor 异步模式实现的,因此一个请求的 request 和 response 可能由不同的线程来执行。在 TraceIdGlobalFilter 中,我们使用了 MDC来传递 trace_id。然而,MDC 在普通的多线程环境中有效,但在 Reactor 异步模式下并不起作用。这是因为 Reactor 异步模式需要通过另外一种方式来传递 trace_id。

      四、解决方案

      在 WebFlux Reactor 异步模式下,我们需要使用 reactor.util.context.Context 来传递 trace_id。核心逻辑如下:

      透传 trace_id: 通过 Mono.contextWrite(context) 往 context 中设置 trace_id。

      取出 trace_id: 通过 Flux.deferContextual(context) 从 context 中获取 trace_id。

      具体实现代码示例如下:

      // 设置 trace_id 
      Mono.contextWrite(context -> context.put("trace_id", traceId));
      // 获取 trace_id
      Flux.deferContextual(context -> {
          String traceId = context.get("trace_id");
          // 可将 traceId 设置到MDC中供当前线程使用
          return Flux.just(traceId);
      });
      

      通过这种方式,我们可以确保 trace_id 在整个请求处理链路中都能被正确传递和使用,解决了 request 日志和 response 日志断联的问题。

      五、具体代码

      TraceIdGlobalFilter

      /**
       * trace_id 全局拦截器
       */
      @Slf4j
      @Component
      public class TraceIdGlobalFilter implements GlobalFilter, Ordered {
          @Override
          public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
              ServerHttpRequest request = exchange.getRequest();
              String traceId = request.getHeaders().getFirst(TraceConsts.TRACE_ID);
              // trace_id
              traceId = MdcUtil.attachTraceId(traceId);
              // 将traceId传递给下游微服务
              String finalTraceId = traceId;
              Consumer headersConsumer = httpHeaders -> {
                  httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId);
              };
              ServerHttpRequest requestNew = exchange.getRequest().mutate().headers(headersConsumer).build();
              return chain.filter(exchange.mutate().request(requestNew).build())
                      .doFinally(s -> {
                          // 清除MDC
                          MdcUtil.detachTraceId();
                      });
          }
          @Override
          public int getOrder() {
              return -100;
          }
      }
      

      AccessLogGlobalFilter

      /**
       * 请求访问日志 全局拦截器
       */
      @Slf4j
      @Component
      public class AccessLogGlobalFilter implements GlobalFilter, Ordered {
          /**
           * gateway access log 日志开关
           * 

      * 特别注意:高并发业务场景下,可以关闭日志来提升性能 */ @Value("${com.gateway.access.log.enabled:true}") private boolean logEnabled; private final HandlerStrategies handlerStrategies = HandlerStrategies.withDefaults(); @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ServerHttpRequest httpRequest = exchange.getRequest(); // 日志开关,直接进入下一个Filter if (!logEnabled) { return chain.filter(exchange).then(Mono.fromRunnable(() -> { stopWatch.stop(); // 为了方便排查问题,还是打印一个简单的日志 if (log.isDebugEnabled()) { log.debug("请求参数 [{}] [{}] query:{}, time: {} ms", httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis()); } })); } // Request 处理 ServerRequest request = ServerRequest.create(exchange, handlerStrategies.messageReaders()); // header 参数 HttpHeaders httpHeaders = request.headers().asHttpHeaders(); // 是否为文件上传,若是文件上传,则不打印body boolean isFile = null != httpHeaders.getContentType() && AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString()); // response 包装 ServerHttpResponseDecorator responseDecorator = responseDecoratorAndRecordLog(exchange, stopWatch); if (isFile) { // 打印请求日志 this.reqLog(request, isFile, null); // 执行过滤器 return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build()) // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它 // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId())); } Mono modifiedBody = request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body -> { // 打印请求日志 this.reqLog(request, isFile, body); return Mono.just(body); }); // 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次 // BodyInserters.fromPublisher 不支持文件上传,所以不能用 BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { // request 包装 ServerHttpRequestDecorator requestDecorator = requestDecorator(exchange, headers, outputMessage); // 执行过滤器 return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build()) // 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它 // webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id .contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId())); })); } @Override public int getOrder() { return -90; } /** * 打印 request log */ private void reqLog(ServerRequest request, boolean isFile, String body) { // URL query 参数 String queryString = request.uri().getRawQuery(); // header 参数 HttpHeaders headers = request.headers().asHttpHeaders(); String headersParams = headersToString(headers); if (isFile) { if (log.isInfoEnabled()) { log.info("请求参数 [{}] [{}] query:{}, headers:{}", request.uri().getPath(), request.methodName(), queryString, headersParams); } return; } // request body 长度处理,避免太长,打印耗性能 String requestBody = AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN); if (log.isInfoEnabled()) { log.info("请求参数 [{}] [{}] query:{}, headers:{}, body:{}", request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody); } } /** * 过滤headers,避免打印过多的日志 */ private String headersToString(HttpHeaders headers) { Map map = new HashMap(); for (Map.Entry entry : headers.entrySet()) { if (RequestParamUtil.containsHeader(entry.getKey())) { map.put(entry.getKey(), entry.getValue().toString()); } } return JSON.toJSONString(map); } /** * Request装饰器,重新计算 headers */ private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } @Override public Flux getBody() { return outputMessage.getBody(); } }; } /** * Response装饰器,记录响应日志 *

      * 通过 DataBufferFactory 解决响应体分段传输问题。 */ private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) { ServerHttpResponse response = exchange.getResponse(); DataBufferFactory bufferFactory = response.bufferFactory(); return new ServerHttpResponseDecorator(response) { @Override public Mono writeWith(Publisher

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]