6. 日志输出与全链路traceId透传

一灰灰blog技术组件trace-watch-dog约 2857 字大约 10 分钟

前面封装耗时任务分布工具类的输出,主要是通过System.out.println进行控制台输出,这显然不符合实际的生产使用,接下来我们使用Slf4j进行输出的替换,额外需要注意的就是异步场景下,避免出现全链路的traceId的丢失

1. 日志集成

1.1 slf4j日志输出

在项目中使用日志比较简单,先添加依赖

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.16</version>
</dependency>

然后就是更新下prettyPrint的日志输出

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TraceRecoder implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(TraceRecoder.class);

    public Map<String, Long> prettyPrint() {
        // 在格式化输出时,要求所有任务执行完毕
        if (!this.markExecuteOver) {
            this.allExecuted();
        }


        StringBuilder sb = new StringBuilder();
        sb.append('\n');
        long totalCost = cost.remove(traceName);
        sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
        sb.append('\n');
        if (cost.isEmpty()) {
            sb.append("No task info kept");
        } else {
            sb.append("---------------------------------------------\n");
            sb.append("ms         %     Task name\n");
            sb.append("---------------------------------------------\n");
            NumberFormat pf = NumberFormat.getPercentInstance();
            pf.setMinimumIntegerDigits(2);
            pf.setMinimumFractionDigits(2);
            pf.setGroupingUsed(false);
            for (Map.Entry<String, Long> entry : cost.entrySet()) {
                sb.append(entry.getValue()).append("\t\t");
                sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
                sb.append(entry.getKey()).append("\n");
            }
        }

        if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
            // 若项目中没有Slfj4的实现,则直接使用标准输出
            System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
        } else if (log.isInfoEnabled()) {
            log.info("\n---------------------\n{}\n--------------------\n", sb);
        }
        return cost;
    }
}

重点关注下上面的实现,LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory 通过这一行来判断当前使用的项目中,是否已经集成了日志打印,如果是就利用 log.info() 打印日志,若没有集成类似logback/log4j之类的日志框架,那就依然使用System.out.println来输出耗时情况

1.2 全链路的日志集成

对全链路有过了解的小伙伴应该知晓,在整个执行链路中,即便是出现了异步(线程池调度)、跨进程(RPC)、跨服务等场景,仍然需要一个traceId从头到尾一直携带到最后

那么我们的工具类中的异步代码块执行,如何将这个traceId携带进去呢?

  • 借助MDC来实现

作为一个工具提供方,我们需要知晓如何从MDC中获取全链路的traceId,如果没有自定义的全链路traceId生成携带策略,我们也可以提供一个默认的实现进行支持

因此我们先封装一个MdcUtil工具类,来读写上下文中的traceId

public class MdcUtil {
    private static final Logger log = LoggerFactory.getLogger(MdcUtil.class);
    private static final String DEFAULT_TRACE_ID_TAG_KEY = "globalMsgId";
    /**
     * 生成msgId的方法
     */
    private static Supplier<String> genIdFunc = null;

    /**
     * 获取MDC上下文中持有msgId的tagKey
     */
    private static String traceIdTagKey = DEFAULT_TRACE_ID_TAG_KEY;


    /**
     * true 表示上下文中没有全链路traceId时,使用默认的生成策略来记录全链路id; 有则使用上下文中的全链路id
     * false 表示只有上下文中存在traceId时,才进行子线程的透传,不会额外生成
     */
    private static volatile Boolean traceIdAutoGen = false;


    /**
     * 注册traceId生成规则
     *
     * @param gen
     */
    public static void registerIdGenFunc(Supplier<String> gen) {
        genIdFunc = gen;
    }

    /**
     * 注册全链路traceId存储的key
     *
     * @param tagKey
     */
    public static void registerTraceTagKeyGetFunc(String tagKey) {
        traceIdTagKey = tagKey;
    }

    /**
     * 控制是否使用上下文的traceId
     *
     * @param traceIdAutoGen true 表示上下文中没有全链路traceId时,使用默认的生成策略来记录全链路id; 有则使用上下文中的全链路id
     *                       false 表示只有上下文中存在traceId时,才进行子线程的透传,不会额外生成
     */
    public static void setTraceIdAutoGen(Boolean traceIdAutoGen) {
        MdcUtil.traceIdAutoGen = traceIdAutoGen;
    }

    private static void autoInit() {
        if (genIdFunc == null) {
            registerIdGenFunc(MdcUtil::defaultGenGlobalTraceId);
        }
    }

    /**
     * 根据配置,来判断没有traceId时,是直接返回还是新创建一个
     *
     * @return traceId
     */
    public static String fetchGlobalMsgIdForTraceRecoder() {
        if (Objects.equals(Boolean.TRUE, traceIdAutoGen)) {
            return getOrInitGlobalTraceId();
        } else {
            return getGlobalTraceId();
        }
    }

    private static String getGlobalTraceId() {
        return MDC.get(traceIdTagKey);
    }

    /**
     * 获取全局的traceId,若不存在,则进行初始化
     *
     * @return traceId
     */
    private static String getOrInitGlobalTraceId() {
        String traceId = getGlobalTraceId();
        if (traceId == null || traceId.isEmpty()) {
            return newGlobalTraceId();
        }
        return traceId;
    }

    public static void setGlobalTraceId(String msgId) {
        if (msgId == null) {
            return;
        }

        try {
            MDC.put(traceIdTagKey, msgId);
        } catch (Exception e) {
            log.error("failed to init MDC globalMsgId! msgId:{}", msgId, e);
        }
    }

    public static String newGlobalTraceId() {
        autoInit();
        String id = genIdFunc.get();
        MDC.put(traceIdTagKey, id);
        return id;
    }

    public static void clear() {
        MDC.clear();
    }


    /**
     * 默认的全链路id生成规则
     *
     * @return traceId
     */
    public static String defaultGenGlobalTraceId() {
        // fixme: 对于已经有自己的一套全链路的监控的场景,需要在这里进行替换
        return UUID.randomUUID().toString().replace("-", "");
    }
}

上面的实现,这里主要提供了一个全局的配置traceIdAutoGen来控制,当上下文中拿不到traceId时,我们应该是重新生成一个还是压根就不管它

然后我们就需要在之前的实现层TraceRecoder,做一些改造,以实现异步执行时的traceId透传

  • 在执行代码块的封装层,在业务代码执行前初始化traceId (需要注意,不要将获取traceId的逻辑放在代码块中了)
/**
 * 封装一下执行业务逻辑,记录耗时时间
 *
 * @param run  执行的具体业务逻辑
 * @param name 任务名
 * @return
 */
private Runnable runWithTime(Runnable run, String name) {
    String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
    return () -> {
        MdcUtil.setGlobalTraceId(traceId);
        start(name);
        try {
            run.run();
        } finally {
            end(name);
            MdcUtil.clear();
        }
    };
}

/**
 * 封装一下执行业务逻辑,记录耗时时间
 *
 * @param call 执行的具体业务逻辑
 * @param name 任务名
 * @return 返回结果
 */
private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
    String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
    return () -> {
        MdcUtil.setGlobalTraceId(traceId);
        start(name);
        try {
            return call.get();
        } finally {
            end(name);
            MdcUtil.clear();
        }
    };
}

1.3 全链路traceId测试

接下来就需要我们来验证一下集成情况了,首先再看一下完整修改后的TraceRecoder工具类

public class TraceRecoder implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(TraceRecoder.class);
    /**
     * trace记录名
     */
    private final String traceName;

    /**
     * 异步任务执行的结果
     */
    private final List<CompletableFuture<?>> list;
    /**
     * 一个子任务的执行耗时
     */
    private final Map<String, Long> cost;

    /**
     * 异步调度的线程池
     */
    private final ExecutorService executorService;

    /**
     * 用于标记是否所有的任务执行完毕
     * 执行完毕之后,不在支持继续添加记录
     */
    private volatile boolean markExecuteOver;


    public TraceRecoder() {
        this(AsyncUtil.executorService, "TraceDog");
    }

    public TraceRecoder(ExecutorService executorService, String task) {
        this.traceName = task;
        list = new CopyOnWriteArrayList<>();
        // 支持排序的耗时记录
        cost = new ConcurrentSkipListMap<>();
        start(task);
        this.executorService = executorService;
        this.markExecuteOver = false;
        // ===> 这里是新增逻辑
        MdcUtil.setGlobalTraceId(MdcUtil.fetchGlobalMsgIdForTraceRecoder());
    }

    /**
     * 异步执行,带返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @return
     */
    public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
        CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
        list.add(ans);
        return ans;
    }

    /**
     * 同步执行,待返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @param <T>      返回类型
     * @return 任务的执行返回结果
     */
    public <T> T sync(Supplier<T> supplier, String name) {
        return supplyWithTime(supplier, name).get();
    }

    /**
     * 异步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    public CompletableFuture<Void> async(Runnable run, String name) {
        // 添加一个标识,区分同步执行与异步执行
        // 异步任务的执行,在整体的耗时占比只能作为参考
        CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
        list.add(future);
        return future;
    }

    /**
     * 同步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    public void sync(Runnable run, String name) {
        runWithTime(run, name).run();
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param run  执行的具体业务逻辑
     * @param name 任务名
     * @return
     */
    private Runnable runWithTime(Runnable run, String name) {
        // ===> 这里是新增逻辑
        String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
        return () -> {
            MdcUtil.setGlobalTraceId(traceId);
            start(name);
            try {
                run.run();
            } finally {
                end(name);
            }
        };
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param call 执行的具体业务逻辑
     * @param name 任务名
     * @return 返回结果
     */
    private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
        // ===> 这里是新增逻辑
        String traceId = MdcUtil.fetchGlobalMsgIdForTraceRecoder();
        return () -> {
            MdcUtil.setGlobalTraceId(traceId);
            start(name);
            try {
                return call.get();
            } finally {
                end(name);
            }
        };
    }

    /**
     * 等待所有的任务执行完毕
     *
     * @return
     */
    public TraceRecoder allExecuted() {
        if (!list.isEmpty()) {
            CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
        }
        // 记录整体结束
        end(this.traceName);
        this.markExecuteOver = true;
        return this;
    }

    private void start(String name) {
        if (markExecuteOver) {
            // 所有任务执行完毕,不再新增
            System.out.println("allTask ExecuteOver ignore:" + name);
            return;
        }
        cost.put(name, System.currentTimeMillis());
    }

    private void end(String name) {
        long now = System.currentTimeMillis();
        long last = cost.getOrDefault(name, now);
        if (last >= now / 1000) {
            // 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
            cost.put(name, now - last);
        }
    }

    public Map<String, Long> prettyPrint() {
        // 在格式化输出时,要求所有任务执行完毕
        if (!this.markExecuteOver) {
            this.allExecuted();
        }

        StringBuilder sb = new StringBuilder();
        sb.append('\n');
        long totalCost = cost.remove(traceName);
        sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
        sb.append('\n');
        if (cost.isEmpty()) {
            sb.append("No task info kept");
        } else {
            sb.append("---------------------------------------------\n");
            sb.append("ms         %     Task name\n");
            sb.append("---------------------------------------------\n");
            NumberFormat pf = NumberFormat.getPercentInstance();
            pf.setMinimumIntegerDigits(2);
            pf.setMinimumFractionDigits(2);
            pf.setGroupingUsed(false);
            for (Map.Entry<String, Long> entry : cost.entrySet()) {
                sb.append(entry.getValue()).append("\t\t");
                sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
                sb.append(entry.getKey()).append("\n");
            }
        }

        if (LoggerFactory.getILoggerFactory() instanceof NOPLoggerFactory) {
            // 若项目中没有Slfj4的实现,则直接使用标准输出
            System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
        } else if (log.isInfoEnabled()) {
            log.info("\n---------------------\n{}\n--------------------\n", sb);
        }
        return cost;
    }

    @Override
    public void close() {
        // 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
        this.allExecuted().prettyPrint();
    }
}

上面将本次新增的逻辑标记了出来,对原有的改造较小,接下来,再写个测试用例,基本使用姿势与之前无异,唯一的区别在于我们再异步代码块中,使用日志输出看看traceId是否能打印出来

要使用slf4j,先添加一个具体的日志实现,比如logback

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.4.14</version>
    <scope>test</scope>
</dependency>

然后再资源目录下,添加配置文件 resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
    <!-- appender是configuration的子节点,是负责写日志的组件。 -->
    <!-- ConsoleAppender:把日志输出到控制台 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d [%t] %-5level %mdc{globalMsgId} %logger{36}.%M\(%file:%line\) - %msg%n</pattern>
            <!-- 控制台也要使用UTF-8,不要使用GBK,否则会中文乱码 -->
            <charset>UTF-8</charset>
        </encoder>
    </appender>


    <!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 -->
    <!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE  -->
    <!-- additivity=false 表示匹配之后,不再继续传递给其他的logger-->
    <logger name="com.github.liuyueyi.hhui" level="INFO" additivity="false">
        <appender-ref ref="STDOUT"/>
    </logger>

    <!-- 控制台输出日志级别 -->
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

然后就是测试用例

private static Random random = new Random();

/**
 * 随机睡眠一段时间
 *
 * @param max
 */
private static void randSleep(String task, int max) {
    randSleepAndRes(task, max);
}

private static int randSleepAndRes(String task, int max) {
    int sleepMillSecond = random.nextInt(max);
    try {
        System.out.println(task + "==> 随机休眠 " + sleepMillSecond + "ms");
        Thread.sleep(sleepMillSecond);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return sleepMillSecond;
}

@Test
public void testCost1() {
    MdcUtil.setGlobalTraceId("666-666");
    try (TraceRecoder recorder = new TraceRecoder()) {
        randSleep("前置", 20);
        int ans = recorder.sync(() -> {
            int r = randSleepAndRes("task1", 200);
            log.info("task1 内部执行 --> {}", r);
            return r;
        }, "task1");
        recorder.async(() -> {
            int r = randSleepAndRes("task2", 100);
            log.info("task2 异步执行 --->{}", r);
        }, "task2");
        recorder.sync(() -> randSleep("task3", 40), "task3");
    }
}

先看下,直接借助已有的traceId场景

再看下,上下文中没有traceId,使用默认的的traceId生成策略

@Test
public void testCost2() {
    MdcUtil.initTraceIdAutoGen(true);
    try (TraceRecoder recorder = new TraceRecoder()) {
        randSleep("前置", 20);
        int ans = recorder.sync(() -> {
            int r = randSleepAndRes("task1", 200);
            log.info("task1 内部执行 --> {}", r);
            return r;
        }, "task1");
        recorder.async(() -> {
            int r = randSleepAndRes("task2", 100);
            log.info("task2 异步执行 --->{}", r);
        }, "task2");
        recorder.sync(() -> randSleep("task3", 40), "task3");
    }
}

1.4 小结

到这里我们就已经实现了trace-watch-dog的日志集成了,并且为全链路的traceId透传也提供了相应的解决方案

从上面的实现来看,我们还有有个更推荐的写法,再任务的执行前后各添加一个钩子,然后再钩子中进行任务执行前后的执行单元注册,将全链路的traceId透传放在钩子中执行,这样也可以提供更强的扩展能力

至于这个如何设计后续再来介绍

本文中的相关代码,可以到这里查看 trace-watch-dogopen in new window

Loading...