210412-JDK 常见并发控制手段

文章目录
  1. 1.1 同步代码块
  2. 1.2 CAS自旋方式
  3. 1.3 锁
  4. 1.4 阻塞队列
  5. 1.5 信号量Semaphore
  6. 1.6 计数器CountDownLatch
  7. 1.7 栅栏 CyclicBarrier
  8. 1.8 guava令牌桶
  9. 1.9 滑动窗口TimeWindow
  10. 1.10 小结
  • II. 其他
    1. 1. 一灰灰Blog: https://liuyueyi.github.io/hexblog
    2. 2. 声明
    3. 3. 扫描关注
  • 单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些

    • 同步代码块
    • CAS自旋
    • 阻塞队列,令牌桶等

    1.1 同步代码块

    通过同步代码块,来确保同一时刻只会有一个线程执行对应的业务逻辑,常见的使用姿势如下

    1
    2
    3
    public synchronized doProcess() {
    // 同步代码块,只会有一个线程执行
    }

    一般推荐使用最小区间使用原则,尽量不要直接在方法上加synchronized,比如经典的双重判定单例模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Single {
    private static volatile Single instance;
    private Single() {}
    public static Single getInstance() {
    if (instance == null) {
    synchronized(Single.class) {
    if (instance == null) instance = new Single();
    }
    }
    return instance;
    }
    }

    1.2 CAS自旋方式

    比如AtomicXXX原子类中的很多实现,就是借助unsafe的CAS来实现的,如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
    }


    // unsafe 实现
    // cas + 自选,不断的尝试更新设置,直到成功为止
    public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
    var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
    }

    1.3 锁

    jdk本身提供了不少的锁,为了实现单实例的并发控制,我们需要选择写锁;如果支持多读,单实例写,则可以考虑读写锁;一般使用姿势也比较简单

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private void doSome(ReentrantReadWriteLock.WriteLock writeLock) {
    try {
    writeLock.lock();
    System.out.println("持有锁成功 " + Thread.currentThread().getName());
    Thread.sleep(1000);
    System.out.println("执行完毕! " + Thread.currentThread().getName());
    writeLock.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Test
    public void lock() throws InterruptedException {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();

    Thread.sleep(20000);
    }

    1.4 阻塞队列

    借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化n个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制

    demo版演示,下面指定队列长度为2,表示最大并发数控制为2;设置为1时,可以实现单线程的访问控制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    AtomicInteger cnt = new AtomicInteger();

    private void consumer(LinkedBlockingQueue<Integer> queue) {
    try {
    // 同步阻塞拿去数据
    int val = queue.take();
    Thread.sleep(2000);
    System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    // 添加数据
    System.out.println("结束 " + Thread.currentThread());
    queue.offer(cnt.getAndAdd(1));
    }
    }

    @Test
    public void blockQueue() throws InterruptedException {
    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
    queue.add(cnt.getAndAdd(1));
    queue.add(cnt.getAndAdd(1));


    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();

    Thread.sleep(10000);
    }

    1.5 信号量Semaphore

    上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private void semConsumer(Semaphore semaphore) {
    try {
    //同步阻塞,尝试获取信号
    semaphore.acquire(1);
    System.out.println("成功拿到信号,执行: " + Thread.currentThread());
    Thread.sleep(2000);
    System.out.println("执行完毕,释放信号: " + Thread.currentThread());
    semaphore.release(1);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Test
    public void semaphore() throws InterruptedException {
    Semaphore semaphore = new Semaphore(2);

    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();

    Thread.sleep(20_000);
    }

    1.6 计数器CountDownLatch

    计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一

    这种场景下的使用姿势一般如下

    • 重点:countDownLatch 计数为0时放行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Test
    public void countDown() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
    try {
    System.out.println("do something in " + Thread.currentThread());
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    countDownLatch.countDown();
    }
    }).start();

    new Thread(() -> {
    try {
    System.out.println("do something in t2: " + Thread.currentThread());
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    countDownLatch.countDown();
    }
    }).start();

    countDownLatch.await();
    System.out.printf("结束");
    }

    1.7 栅栏 CyclicBarrier

    CyclicBarrier的作用与上面的CountDownLatch相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而CountDownLatch则不行

    一个简单的demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {
    // 等待达到条件才放行
    try {
    System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());
    Thread.sleep(sleep);
    int index = barrier.await();
    System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Test
    public void testCyclicBarrier() throws InterruptedException {
    // 到达两个工作线程才能继续往后面执行
    CyclicBarrier barrier = new CyclicBarrier(2);
    // 三秒之后,下面两个线程的才会输出 开始执行
    new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();

    Thread.sleep(4000);
    // 重置,可以再次使用
    barrier.reset();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    Thread.sleep(10000);
    }

    1.8 guava令牌桶

    guava封装了非常简单的并发控制工具类RateLimiter,作为单机的并发控制首选

    一个控制qps为2的简单demo如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private void guavaProcess(RateLimiter rateLimiter) {
    try {
    // 同步阻塞方式获取
    System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());
    rateLimiter.acquire();
    System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDateTime.now());
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Test
    public void testGuavaRate() throws InterruptedException {
    // 1s 中放行两个请求
    RateLimiter rateLimiter = RateLimiter.create(2.0d);
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();

    Thread.sleep(20_000);
    }

    输出:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
    准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
    准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
    准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
    准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
    准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
    准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
    执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
    执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
    执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
    执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
    执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
    执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
    执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219

    1.9 滑动窗口TimeWindow

    没有找到通用的滑动窗口jar包,一般来讲滑动窗口更适用于平滑的限流,解决瞬时高峰问题

    一个供参考的实现方式:

    固定大小队列,队列中每个数据代表一个时间段的计数,

    访问 -》 队列头拿数据(注意不出队)-》判断是否跨时间段 -》 同一时间段,计数+1 -》跨时间段,新增数据入队,若扔不进去,表示时间窗满,队尾数据出队

    问题:当流量稀疏时,导致不会自动释放过期的数据
    解决方案:根据时间段设置定时任务,模拟访问操作,只是将计数改为 + 0

    1.10 小结

    本文给出了几种单机版的并发控制的技术手段,主要目的是介绍了一些可选的方案,技术细节待后续补全完善,当然如果有其他的建议,欢迎评论交流

    II. 其他

    1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

    一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

    2. 声明

    尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

    3. 扫描关注

    一灰灰blog

    QrCode

    # JDK

    评论

    Your browser is out-of-date!

    Update your browser to view this website correctly. Update my browser now

    ×