4. 异步使用能力增强

一灰灰blog技术组件trace-watch-dog约 2537 字大约 8 分钟

接上文的耗时统计工具类,对于TraceWatch工具类的基本使用,都是基于同步的策略来的,如


// 无返回结果
traceWatch.cost(() -> xxx, "task1");

// 有返回结果
ans = traceWatch.cost(() -> { xxx;  return xxx; }, "task1");

接下来我们考虑对它的能力进行增强,我们希望可以简单的指定需要统计的代码块是同步执行还是异步执行

比如

// 同步执行
traceWatch.sync(() -> xxx, "task1");
// 异步执行
traceWatch.async(() -> xxx, "task1");

// or

// 通过传参控制,false表示同步执行, true表示异步执行
traceWatch.cost(() -> xxx, "task1", false);
traceWatch.cost(() -> xxx, "task1", true);

1. 异步增强

1.1 通用的线程池工具类

既然是异步能力增强,支持代码块的异步调度,那么我们可以封装一个简单的线程池工具类,主要用于异步执行的线程池的创建

/**
 * 异步工具类
 *
 * @author YiHui
 * @date 2024/8/11
 */
public class AsyncUtil {
    public static ExecutorService executorService;

    static {
        initExecutorService(Runtime.getRuntime().availableProcessors() * 2, 50);
    }


    public static void initExecutorService(int core, int max) {
        // 异步工具类的默认线程池构建
        max = Math.max(core, max);
        ThreadFactory THREAD_FACTORY = new ThreadFactory() {
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
            private final AtomicInteger threadNumber = new AtomicInteger(1);

            public Thread newThread(Runnable r) {
                Thread thread = this.defaultFactory.newThread(r);
                if (!thread.isDaemon()) {
                    thread.setDaemon(true);
                }

                thread.setName("trace-watch-dog-" + this.threadNumber.getAndIncrement());
                return thread;
            }
        };
        executorService = new ThreadPoolExecutor(core, max, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

1.2 异步能力增强

当我们希望包装的代码块可以同步/异步执行时,首先是在创建对象的时候,指定一下线程池

public class TraceRecoder implements Closeable {
    /**
     * trace记录名
     */
    private final String traceName;

    /**
     * 一个子任务的执行耗时
     */
    private final Map<String, Long> cost;

    /**
     * 异步调度的线程池
     */
    private final ExecutorService executorService;

    /**
     * 用于标记是否所有的任务执行完毕
     * 执行完毕之后,不在支持继续添加记录
     */
    private volatile boolean markExecuteOver;


    public DefaultTraceRecoder() {
        this(AsyncUtil.executorService, "TraceBridge");
    }

    public DefaultTraceRecoder(ExecutorService executorService, String task) {
        this.traceName = task;
        this.cost = new ConcurrentHashMap<>();
        startRecord(task);
        this.executorService = TtlExecutors.getTtlExecutorService(executorService);
        this.markExecuteOver = false;
    }


    private void start(String name) {
        if (markExecuteOver) {
            // 所有任务执行完毕,不再新增
            System.out.println("allTask ExecuteOver ignore:" + name);
            return;
        }
        cost.put(name, System.currentTimeMillis());
    }

    private void end(String name) {
        long now = System.currentTimeMillis();
        long last = cost.getOrDefault(name, now);
        if (last >= now / 1000) {
            // 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
            cost.put(name, now - last);
        }
    }
}

接下来我们封装一下上面的start, end方法的使用姿势,与前面的工具类的实现,重要的区别在于返回

/**
 * 封装一下执行业务逻辑,记录耗时时间
 *
 * @param run  执行的具体业务逻辑
 * @param name 任务名
 * @return
 */
private Runnable runWithTime(Runnable run, String name) {
    return () -> {
        start(name);
        try {
            run.run();
        } finally {
            end(name);
        }
    };
}

/**
 * 封装一下执行业务逻辑,记录耗时时间
 *
 * @param call 执行的具体业务逻辑
 * @param name 任务名
 * @return 返回结果
 */
private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
    return () -> {
        start(name);
        try {
            return call.get();
        } finally {
            end(name);
        }
    };
}

为什么返回的是 Runnable/Supplier

这个就需要从同步/异步的调用方法来看为什么这么设计了

对于同步执行的场景,我们可以分别为有返回/无返回提供两个方法

/**
 * 同步执行,待返回结果
 *
 * @param supplier 执行任务
 * @param name     耗时标识
 * @param <T>      返回类型
 * @return 任务的执行返回结果
 */
public <T> T sync(Supplier<T> supplier, String name) {
    // 直接执行代码块,并同步返回结果
    return supplyWithTime(supplier, name).get();
}



/**
 * 同步执行,无返回结果
 *
 * @param run  执行任务
 * @param name 耗时标识
 * @return
 */
public void sync(Runnable run, String name) {
    // 直接执行代码块
    runWithTime(run, name).run();
}

对于异步执行的场景,我们需要将xxWithTime的返回丢到线程池中执行

/**
 * 异步执行,带返回结果
 *
 * @param supplier 执行任务
 * @param name     耗时标识
 * @return
 */
public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
    CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
    return ans;
}

/**
 * 异步执行,无返回结果
 *
 * @param run  执行任务
 * @param name 耗时标识
 * @return
 */
public CompletableFuture<Void> async(Runnable run, String name) {
    // 添加一个标识,区分同步执行与异步执行
    // 异步任务的执行,在整体的耗时占比只能作为参考
    CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
    return future;
}

看到上面的实现之后,我们很容易想到,在输出耗时分布时,判断是否所有任务执行完毕,实际上不需要用之前的自旋等待的方式判断是否都执行完毕了,我们可以持有异步的返回的CompletableFuture,通过join的方式来等待所有任务执行完毕

因此我们可以声明一个全局的异步结果缓存容器,在实例化对象的时候进行初始化

/**
 * 异步任务执行的结果
 */
private final List<CompletableFuture<?>> list = new ArrayList<>();

接下来就是在异步调度时,保存一下结果

/**
 * 异步执行,带返回结果
 *
 * @param supplier 执行任务
 * @param name     耗时标识
 * @return
 */
public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
    CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
    list.add(ans);
    return ans;
}

/**
 * 异步执行,无返回结果
 *
 * @param run  执行任务
 * @param name 耗时标识
 * @return
 */
public CompletableFuture<Void> async(Runnable run, String name) {
    // 添加一个标识,区分同步执行与异步执行
    // 异步任务的执行,在整体的耗时占比只能作为参考
    CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
    list.add(ans);
    return future;
}

等待所有任务执行完毕就可以直接借助CompletableFuture.allOf().join来实现了

/**
 * 等待所有的任务执行完毕
 *
 * @return
 */
public TraceRecoder allExecuted() {
    if (!list.isEmpty()) {
        CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
    }
    // 记录整体结束
    end(this.traceName);
    this.markExecuteOver = true;
    return this;
}


public void close() {
    // 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
    this.allExecuted().prettyPrint();
}

到此,我们的异步能力增强已基本实现完毕, 接下来进入实测体验

2. 使用体验

2.1 完整工具类

下面是完整的工具实现类

public class TraceRecorder implements Closeable {
    /**
     * trace记录名
     */
    private final String traceName;

    /**
     * 异步任务执行的结果
     */
    private final List<CompletableFuture<?>> list;
    /**
     * 一个子任务的执行耗时
     */
    private final Map<String, Long> cost;

    /**
     * 异步调度的线程池
     */
    private final ExecutorService executorService;

    /**
     * 用于标记是否所有的任务执行完毕
     * 执行完毕之后,不在支持继续添加记录
     */
    private volatile boolean markExecuteOver;


    public TraceRecorder() {
        this(AsyncUtil.executorService, "TraceDog");
    }

    public TraceRecorder(ExecutorService executorService, String task) {
        this.traceName = task;
        list = new CopyOnWriteArrayList<>();
        // 支持排序的耗时记录
        cost = new ConcurrentSkipListMap<>();
        start(task);
        this.executorService = executorService;
        this.markExecuteOver = false;
    }

    /**
     * 异步执行,带返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @return
     */
    public <T> CompletableFuture<T> async(Supplier<T> supplier, String name) {
        CompletableFuture<T> ans = CompletableFuture.supplyAsync(supplyWithTime(supplier, name + "(异步)"), this.executorService);
        list.add(ans);
        return ans;
    }

    /**
     * 同步执行,待返回结果
     *
     * @param supplier 执行任务
     * @param name     耗时标识
     * @param <T>      返回类型
     * @return 任务的执行返回结果
     */
    public <T> T sync(Supplier<T> supplier, String name) {
        return supplyWithTime(supplier, name).get();
    }

    /**
     * 异步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    public CompletableFuture<Void> async(Runnable run, String name) {
        // 添加一个标识,区分同步执行与异步执行
        // 异步任务的执行,在整体的耗时占比只能作为参考
        CompletableFuture<Void> future = CompletableFuture.runAsync(runWithTime(run, name + "(异步)"), this.executorService);
        list.add(future);
        return future;
    }

    /**
     * 同步执行,无返回结果
     *
     * @param run  执行任务
     * @param name 耗时标识
     * @return
     */
    public void sync(Runnable run, String name) {
        runWithTime(run, name).run();
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param run  执行的具体业务逻辑
     * @param name 任务名
     * @return
     */
    private Runnable runWithTime(Runnable run, String name) {
        return () -> {
            start(name);
            try {
                run.run();
            } finally {
                end(name);
            }
        };
    }

    /**
     * 封装一下执行业务逻辑,记录耗时时间
     *
     * @param call 执行的具体业务逻辑
     * @param name 任务名
     * @return 返回结果
     */
    private <T> Supplier<T> supplyWithTime(Supplier<T> call, String name) {
        return () -> {
            start(name);
            try {
                return call.get();
            } finally {
                end(name);
            }
        };
    }

    /**
     * 等待所有的任务执行完毕
     *
     * @return
     */
    public TraceRecorder allExecuted() {
        if (!list.isEmpty()) {
            CompletableFuture.allOf(list.toArray(new CompletableFuture[]{})).join();
        }
        // 记录整体结束
        end(this.traceName);
        this.markExecuteOver = true;
        return this;
    }

    private void start(String name) {
        if (markExecuteOver) {
            // 所有任务执行完毕,不再新增
            System.out.println("allTask ExecuteOver ignore:" + name);
            return;
        }
        cost.put(name, System.currentTimeMillis());
    }

    private void end(String name) {
        long now = System.currentTimeMillis();
        long last = cost.getOrDefault(name, now);
        if (last >= now / 1000) {
            // 之前存储的是时间戳,因此我们需要更新成执行耗时 ms单位
            cost.put(name, now - last);
        }
    }

    public Map<String, Long> prettyPrint() {
        // 在格式化输出时,要求所有任务执行完毕
        if (!this.markExecuteOver) {
            this.allExecuted();
        }

        StringBuilder sb = new StringBuilder();
        sb.append('\n');
        long totalCost = cost.remove(traceName);
        sb.append("TraceWatch '").append(traceName).append("': running time = ").append(totalCost).append(" ms");
        sb.append('\n');
        if (cost.isEmpty()) {
            sb.append("No task info kept");
        } else {
            sb.append("---------------------------------------------\n");
            sb.append("ms         %     Task name\n");
            sb.append("---------------------------------------------\n");
            NumberFormat pf = NumberFormat.getPercentInstance();
            pf.setMinimumIntegerDigits(2);
            pf.setMinimumFractionDigits(2);
            pf.setGroupingUsed(false);
            for (Map.Entry<String, Long> entry : cost.entrySet()) {
                sb.append(entry.getValue()).append("\t\t");
                sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
                sb.append(entry.getKey()).append("\n");
            }
        }

        System.out.printf("\n---------------------\n%s\n--------------------\n%n", sb);
        return cost;
    }

    @Override
    public void close() {
        // 做一个兜底,避免业务侧没有手动结束,导致异步任务没有执行完就提前返回结果
        this.allExecuted().prettyPrint();
    }
}

2.2 使用体验

接下来看一下具体的使用实例

private static Random random = new Random();

/**
 * 随机睡眠一段时间
 *
 * @param max
 */
private static void randSleep(String task, int max) {
    randSleepAndRes(task, max);
}

private static int randSleepAndRes(String task, int max) {
    int sleepMillSecond = random.nextInt(max);
    try {
        System.out.println(task + "==> 随机休眠 " + sleepMillSecond + "ms");
        Thread.sleep(sleepMillSecond);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return sleepMillSecond;
}

@Test
public void testCost() {
    try (TraceRecorder recorder = new TraceRecorder()) {
        randSleep("前置", 20);
        int f1 = recorder.sync(() -> randSleepAndRes("task1", 200), "task1");
        CompletableFuture<Integer> f2 = recorder.async(() -> randSleepAndRes("task2", f1), "task2");
        recorder.sync(() -> randSleep("task3", 40), "task3");
        recorder.async(() -> randSleep("task4", 100), "task4");
        System.out.println("打印f2的结果 -> " + f2.join());
    }
}

输出示例

前置==> 随机休眠 15ms
task1==> 随机休眠 194ms
task3==> 随机休眠 30ms
task2==> 随机休眠 152ms
task4==> 随机休眠 97ms
打印f2的结果 -> 152

---------------------

TraceWatch 'TraceDog': running time = 366 ms
---------------------------------------------
ms         %     Task name
---------------------------------------------
194   53.01%    task1
156   42.62%    task2(异步)
42    11.48%    task3
98    26.78%    task4(异步)

--------------------

2.3 小结

从上面的使用来看,现在的工具类相对来说就完整很多了,使用上也比较简单(至少代码阅读起来,还算优雅),但是依然没能解决业务代码侵入问题

此外,在异步的场景下,我们需要注意多线程/线程池场景下的上下文传递问题,若上面的async代码块中,希望获取主线程中的上下文参数,可以正常拿到么?

本文中的相关代码,可以到这里查看 trace-watch-dogopen in new window

Loading...