9.耗时输出重定向

一灰灰blog技术组件trace-watch-dog约 2182 字大约 7 分钟

前面实现的耗时分布输出直接以日志的方式进行打印,在实际的业务场景中,可能希望以其他的方式(比如Prometheus)来收集耗时,基于这种场景,我们的组件又可以怎么进行演进呢?

1. 控制日志是否输出

首先我们来看一下,若我不希望输出日志,最简单的实现方式就是直接在TraceRecoder中添加变量,用于控制是否进行输出日志

首先改造一下DefaultTraceRecoder,新增logEnable变量

/**
 * 控制是否打印日志的条件
 */
private boolean logEnable;

public DefaultTraceRecoder() {
    this(AsyncUtil.executorService, "TraceDog", true);
}

public DefaultTraceRecoder(ExecutorService executorService, String task, boolean logEnable) {
    this.traceName = task;
    list = new CopyOnWriteArrayList<>();
    // 支持排序的耗时记录
    cost = new ConcurrentSkipListMap<>();
    this.executorService = TtlExecutors.getTtlExecutorService(executorService);
    this.markExecuteOver = false;
    this.logEnable = logEnable;
    start(task);
    MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
}

接着是输出日志的时候,加一个判断

@Override
public Map<String, Long> prettyPrint() {
    // 在格式化输出时,要求所有任务执行完毕
    if (!this.markExecuteOver) {
        this.allExecuted();
    }

    if (!logEnable) {
        return cost;
    }

    // 省略
    ...
}

除此之外,我们还需要改造一下 trace-watch-dog-spring 工程中的 @TraceDog 注解,需要新增一个是否输入日志的逻辑

考虑到适用的场景,我们新增两个属性来判定是否输出日志

  • boolean logEnable() default true: 简单版,控制是否输出日志
  • String logSpEL() default ""; SpEL版,根据动态参数来判断是否输出日志

上面两个属性搭配使用;当不存在 logSpEL 时,我们直接以 logEnable 来判断是否要打印日志;若存在 logSpEL,那么我们就要要求上面两个同时为true才能输出日志;

核心的实现就是logSpEL的执行

private Boolean buildLogCondition(ProceedingJoinPoint joinPoint, TraceDog dog) {
    if (!dog.logEnable()) {
        return false;
    }

    if (dog.logSpEL() == null || dog.logSpEL().isEmpty()) {
        return true;
    }

    StandardEvaluationContext context = new StandardEvaluationContext();
    context.setBeanResolver(new BeanFactoryResolver(applicationContext));

    // 将请求参数也作为上下文参数
    MethodSignature methodSignature = ((MethodSignature) joinPoint.getSignature());
    String[] parameterNames = methodSignature.getParameterNames();
    Object[] args = joinPoint.getArgs();
    for (int i = 0; i < parameterNames.length; i++) {
        context.setVariable(parameterNames[i], args[i]);
    }
    return (Boolean) parser.parseExpression(dog.logSpEL()).getValue(context);
}

从上面的实际实现来看,控制是否打印日志比较简单,那么这个真的有用么?

下面给出几个应用场景:

  • 某个执行链路,我们希望测试环境打印日志,生产环境不打印日志(根据运行环境判断是否需要打印日志)
  • 某个执行链路,我们通过一个动态的配置来控制是否需要打印日志,当需要进行排查耗时情况分析时,打开配置输出日志;其他情况则进行关闭

2. 日志输出重定向

到目前为止,我们所有的耗时输出都是基于控制台/日志文件,当我们的项目集成了类似Prometheus监控系统时,我们可能更希望将耗时上报到Prometheus,因此就有了输出重定向的诉求

为了支持输出重定向,我们新增一个扩展,在prettyPrint方法中,根据用户自定义的实现来处理

先定义一个函数方法

@FunctionalInterface
public interface CostOutput {

    /**
     * 输出
     *
     * @param cost      任务耗时分布
     * @param traceName Trace
     */
    void output(Map<String, Long> cost, String traceName);
}

接入两个参数,其中 cost 记录总耗时 + 各子任务的耗时,traceName 表示入口任务名,主要用于获取总耗时

然后调整一下 DefaultTraceRecoder

  1. 新增private List<CostOutput> output; 保存输出重定向的规则
  2. 添加默认的log.info输出规则
  3. 支持传入自定义的输出重定向规则
private List<CostOutput> output;

public DefaultTraceRecoder() {
    this(AsyncUtil.executorService, "TraceDog", true);
}

public DefaultTraceRecoder(ExecutorService executorService, String task, boolean logEnable) {
    this.traceName = task;
    list = new CopyOnWriteArrayList<>();
    // 支持排序的耗时记录
    cost = new ConcurrentSkipListMap<>();
    this.executorService = TtlExecutors.getTtlExecutorService(executorService);
    this.markExecuteOver = false;
    this.logEnable = logEnable;
    this.output = new ArrayList<>();
    // 默认加载全局的输出重定向规则
    this.output.addAll(TraceWatch.getGlobalOutputStrategy());
    start(task);
    MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
}

/**
 * 新增一个耗时定向
 *
 * @param costOutput
 * @return
 */
public DefaultTraceRecoder addOutput(CostOutput costOutput) {
    output.add(costOutput);
    return this;
}

调整下 prettyPrint 方法的实现

@Override
public Map<String, Long> prettyPrint() {
    // 在格式化输出时,要求所有任务执行完毕
    if (!this.markExecuteOver) {
        this.allExecuted();
    }

    // 根据自定义规则,对耗时输出进行处理
    outputList.forEach(o -> {
        if (!logEnable && o == LogOutput.defaultLogOutput) {
            return;
        }
        o.output(cost, traceName);
    });
    return cost;
}

然后就是默认的日志输出

public class LogOutput {
    private static final Logger log = LoggerFactory.getLogger(DefaultTraceRecoder.class);
    public static final CostOutput defaultLogOutput = LogOutput::logPrint;

    /**
     * 输出日志
     *
     * @param cost      耗时分布
     * @param traceName 总任务
     */
    public static void logPrint(Map<String, Long> cost, String traceName) {
        StringBuilder sb = new StringBuilder();
        sb.append('\n');
        long totalCost = cost.get(traceName);
        sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
        sb.append('\n');
        if (cost.isEmpty()) {
            sb.append("No task info kept");
        } else {
            sb.append("---------------------------------------------\n");
            sb.append("ms         %     Task name\n");
            sb.append("---------------------------------------------\n");
            NumberFormat pf = NumberFormat.getPercentInstance();
            pf.setMinimumIntegerDigits(2);
            pf.setMinimumFractionDigits(2);
            pf.setGroupingUsed(false);
            for (Map.Entry<String, Long> entry : cost.entrySet()) {
                if (entry.getKey().equals(traceName)) {
                    // 总耗时情况,不打印在分布中
                    continue;
                }

                sb.append(entry.getValue()).append("\t\t");
                sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
                sb.append(entry.getKey()).append("\n");
            }
        }

        if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
            // 若项目中没有Slfj4的实现,则直接使用标准输出
            System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
        } else if (log.isInfoEnabled()) {
            log.info("\n---------------------\n{}\n--------------------\n", sb);
        }
    }
}

TraceWatch 中维持全局的重定向规则

public class TraceWatch {

    private static final TransmittableThreadLocal<ITraceRecoder> THREAD_LOCAL = new TransmittableThreadLocal<>();

    /**
     * 全局的重定向策略
     */
    private static Set<CostOutput> globalOutputStrategy;

    static {
        globalOutputStrategy = new HashSet<>();
        globalOutputStrategy.add(LogOutput.defaultLogOutput);
    }

    /**
     * 注册全局的输出重定向规则
     *
     * @param costOutput
     */
    public static void registerOutput(CostOutput costOutput) {
        globalOutputStrategy.add(costOutput);
    }

    /**
     * 获取默认的全局输出重定向策略
     *
     * @return
     */
    public static Set<CostOutput> getGlobalOutputStrategy() {
        return globalOutputStrategy;
    }

}

我们在输出重定向的实现中,主要是借助函数方法来提供能力增强,由于TraceRecoder都是以实例的方式来运行的,如果一个自定义的重定向,需要在每个使用地方都手动注册,显然对使用者极不友好,因此我们提供了全局的注册逻辑,这样就可以实现一次注册,全局生效;同时单个实例也可以有自己的个性化逻辑

3. 测试

接下来我们写个测试用例,使用自定义的输出代替默认的日志输出规则

public class SelfOutputDemo {
    private static final Logger log = LoggerFactory.getLogger(SelfOutputDemo.class);
    private static Random random = new Random();

    /**
     * 随机睡眠一段时间
     *
     * @param max
     */
    private static void randSleep(int max) {
        int sleepMillSecond = random.nextInt(max);
        try {
            Thread.sleep(sleepMillSecond);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private String fun1WithReturn() {
        long start = System.currentTimeMillis();
        randSleep(50);
        log.info("fun1WithReturn执行完毕 -> " + (System.currentTimeMillis() - start));
        return "fun1";
    }

    private void fun2NoReturn(String txt) {
        long start = System.currentTimeMillis();
        randSleep(50);
        log.info("fun2 -->" + txt + " --> " + (System.currentTimeMillis() - start));
    }


    /**
     * 异步执行,不关心返回结果
     *
     * @param ans
     */
    private void runAsyncNoReturn(String ans) {
        long start = System.currentTimeMillis();
        randSleep(15);
        log.info("runAsyncNoReturn ->" + ans + " --> " + (System.currentTimeMillis() - start));
    }

    private String runAsyncWithReturn(String ans) {
        long start = System.currentTimeMillis();
        randSleep(15);
        log.info("runAsyncWithReturn ->" + ans + " --> " + (System.currentTimeMillis() - start));
        return ans + "_over";
    }

    private String runAsyncWithReturn2(String ans) {
        long start = System.currentTimeMillis();
        randSleep(25);
        log.info("runAsyncWithReturn2 ->" + ans + " --> " + (System.currentTimeMillis() - start));
        return ans + "_over2";
    }

    /**
     * 预热逻辑
     */
    private void preLoad() {
        String uuid = MdcUtil.newGlobalTraceId();
        System.out.println("uuid -> " + uuid);
        // 这里的执行,主要是为了解决 TraceWatch等相关类的初始化耗时对整体结果的影响
        try (ITraceRecoder recoder = TraceWatch.startTrace("预热", false)) {
            recoder.async(() -> {
                randSleep(30);
            }, "preLoad0");
            recoder.sync(() -> {
                randSleep(20);
            }, "preLoad1");
        } finally {
            log.info("========================= 预热 ========================= \n\n");
        }
    }


    /**
     * 根据实际的场景,对一些逻辑调整为异步执行
     */
    @Test
    public void testWithAsyncTrace() {
        //  注册自定义的输出重定向
        TraceWatch.registerOutput(new CostOutput() {
            @Override
            public void output(Map<String, Long> cost, String traceName) {
                Long totalCost = cost.get(traceName);
                log.info("总耗时:{} 明细:{}", totalCost, cost);
            }
        });

        preLoad();
        MdcUtil.initTraceIdAutoGen(true);
        long start = System.currentTimeMillis();
        // 通过设置创建 TraceWatch 时,制定false,关闭默认的日志输出
        try (ITraceRecoder recoder = TraceWatch.startTrace("traceLog", false)) {
            String ans = recoder.sync(this::fun1WithReturn, "1.fun1WithReturn");
            recoder.sync(() -> fun2NoReturn(ans), "2.fun2NoReturn");
            recoder.async(() -> runAsyncNoReturn(ans), "3.runAsyncNoReturn");
            log.info("异步结果阻塞获取前耗时:" + (System.currentTimeMillis() - start));

            // 下面两个演示的是异步返回的场景,在合适的地方,将异步返回结果进行拼接
            CompletableFuture<String> a2 = recoder.async(() -> runAsyncWithReturn(ans), "4.runAsyncWithReturn");
            CompletableFuture<String> a3 = recoder.async(() -> runAsyncWithReturn2(ans), "5.runAsyncWithReturn");
            CompletableFuture.allOf(a2, a3).whenComplete((unused, throwable) -> log.info("最终的结果是: -> " + (a2.join() + a3.join())));
        }
        long end = System.currentTimeMillis();
        log.info("总耗时: " + (end - start));
    }
}

重点关注上面的testWithAsyncTrace的逻辑,首先是注册全局的自定义耗时输出规则; 然后就是在创建TraceRecoder时,通过传入false,表示不使用原来的日志输出

执行成功之后,我们将得到如下输出:

uuid -> e21a3eecbe4345be870cda5609137b2a
2024-09-02 15:45:10,306 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.output(SelfOutputDemo.java:108) - 总耗时:25 明细:{preLoad0(异步)=19, preLoad1=7, 预热=25}
2024-09-02 15:45:10,312 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.preLoad(SelfOutputDemo.java:93) - ========================= 预热 ========================= 


2024-09-02 15:45:10,331 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.fun1WithReturn(SelfOutputDemo.java:42) - fun1WithReturn执行完毕 -> 18
2024-09-02 15:45:10,358 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.fun2NoReturn(SelfOutputDemo.java:49) - fun2 -->fun1 --> 27
2024-09-02 15:45:10,359 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.testWithAsyncTrace(SelfOutputDemo.java:120) - 异步结果阻塞获取前耗时:47
2024-09-02 15:45:10,371 [trace-watch-dog-3] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.runAsyncWithReturn(SelfOutputDemo.java:67) - runAsyncWithReturn ->fun1 --> 12
2024-09-02 15:45:10,372 [trace-watch-dog-2] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.runAsyncNoReturn(SelfOutputDemo.java:61) - runAsyncNoReturn ->fun1 --> 13
2024-09-02 15:45:10,384 [trace-watch-dog-4] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.runAsyncWithReturn2(SelfOutputDemo.java:74) - runAsyncWithReturn2 ->fun1 --> 25
2024-09-02 15:45:10,384 [trace-watch-dog-4] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.lambda$testWithAsyncTrace$6(SelfOutputDemo.java:125) - 最终的结果是: -> fun1_overfun1_over2
2024-09-02 15:45:10,384 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.output(SelfOutputDemo.java:108) - 总耗时:72 明细:{1.fun1WithReturn=18, 2.fun2NoReturn=27, 3.runAsyncNoReturn(异步)=14, 4.runAsyncWithReturn(异步)=12, 5.runAsyncWithReturn(异步)=25, traceLog=72}
2024-09-02 15:45:10,384 [main] INFO  e21a3eecbe4345be870cda5609137b2a c.g.l.hhui.trace.test.SelfOutputDemo.testWithAsyncTrace(SelfOutputDemo.java:128) - 总耗时: 72
Loading...