181117-Python异步之asyncio

文章目录
  1. 1. 定义协程
  2. 2. 返回结果
  3. 3. 多任务执行
  4. 4. 线程+协程使用
  • II. 其他
    1. 1. 一灰灰Blog: https://liuyueyi.github.io/hexblog
    2. 2. 声明
    3. 3. 扫描关注
  • 本篇主要是asyncio这个包的使用,如何使用协程,以及协程和线程可以怎么配合使用,得到更加的使用效果

    1. 定义协程

    通过 async 来定义协程,并将协程丢到事件循环中执行,常用套路如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio
    import time

    now = lambda: time.time()


    # 定义一个协程,丢到事件循环中执行
    async def do_some_job(x):
    print("do some : ", x)
    time.sleep(1)


    print("------- start coroutine ----------")

    start = now()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_some_job(2))
    print("end cost: ", now() - start)

    说明

    • 通过关键字async修饰函数,然后直接调用函数的方式,返回的就是一个协程coroutine
    • 协程需要丢到事件循环中执行,通过 asyncio.get_event_loop() 获取事件循环 loop
    • 执行协程 loop.run_until_complete(coroutine)

    上面执行的输出如下

    1
    2
    3
    ------- start coroutine ----------
    do some : 2
    end cost: 1.0031189918518066

    2. 返回结果

    上面的函数如果有返回结果,丢到事件循环中执行后,可以怎么获取返回呢?

    case1:直接使用方式

    直接获取loop.run_until_complete(xxx)返回即可,ans就是协程返回的value

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio
    import time

    now = lambda: time.time()


    # 定义一个协程,丢到事件循环中执行
    async def do_some_job(x):
    print("do some : ", x)
    time.sleep(1)
    return x * x


    print("------- start coroutine ----------")
    start = now()
    loop = asyncio.get_event_loop()
    ans = loop.run_until_complete(do_some_job(2))
    print("end cost: ", now() - start, ans)

    case2:使用任务

    另外一种方式就是使用任务,任务还有一种更强的方式就是绑定回调函数,当协程处理完毕之后,将结果丢到回调函数中,继续执行某些操作

    一个简单的case如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    print("------- start callback ----------")


    async def do_some_return_job(data):
    print('do return data: ', data)
    return data * 2


    def callback(future):
    print("do callback: ", future.result())


    # 创建一个task,并绑定返回结果的回调
    start = now()
    task = asyncio.ensure_future(do_some_return_job(3))
    task.add_done_callback(callback)
    ans = loop.run_until_complete(task)
    print("end callback cost: ", now() - start, ans)

    这种使用场景也比较经典,步骤如下

    • 使用asyncio.ensure_future(coroutine)来创建一个task任务
    • 为task任务绑定回调task.add_done_callback(callback)
    • 将task丢到事件回调中执行

    3. 多任务执行

    前面两个只能算是演示使用协程的方式,但实际上因为只有一个task,所以并不能体现协程的并行优势;下面开始演示下多个任务的情况

    在这种场景下,我们执行的任务加一个耗时的等待,模拟让出cpu给其他线程执行的case

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # 函数内部通过调用 `asyncio.sleep(1)` + `await` 来表示现在执行耗时的操作了,让出cpu
    async def do_some_wait_job(data):
    print("do wait job:", data)
    await asyncio.sleep(1)
    return data * 2

    print("------- start 多任务并行 ---------")

    start = now()
    tasks = [
    asyncio.ensure_future(do_some_wait_job(2)),
    asyncio.ensure_future(do_some_wait_job(3)),
    asyncio.ensure_future(do_some_wait_job(4)),
    ]
    # await方式,返回的结果通过task进行获取
    # loop.run_until_complete(asyncio.wait(tasks))
    # for task in tasks:
    # print('task return : ', task.result())
    # print("multi task cost : ", now() - start)

    # gather 方式直接获取返回的结果
    start = now()
    ans = loop.run_until_complete(asyncio.gather(*tasks))
    for v in ans:
    print('gather task return: ', v)
    print("multi2 task cost : ", now() - start)

    多个任务执行中,首先当然也是创建task任务列表,然后将所有的任务列表丢到事件循环中即可

    • 定义task列表 [ asyncio.ensure_futre(xxx)...]
    • 执行所有的任务
      • loop.run_until_complete(asyncio.wait(tasks)) 获取任务返回的结果就需要通过task#result来获取
      • loop.run_until_complete(asyncio.gather(*tasks)) 直接返回任务执行的结果

    上面代码执行之后输出如下

    1
    2
    3
    4
    5
    6
    7
    8
    ------- start 多任务并行 ---------
    do wait job: 2
    do wait job: 3
    do wait job: 4
    gather task return: 4
    gather task return: 6
    gather task return: 8
    multi2 task cost : 1.0029327869415283

    根据上面的输出结果,可以看到三个任务总共执行1s多一点,如果串行那么最少要3s

    4. 线程+协程使用

    我们考虑将定义的协程任务放在一个子线程中执行,然后主线程就负责往子线程中注册协程,这样就可以实现子线程负责干活,主线程则负责分配任务的功能,这种case可以如何处理呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import asyncio
    import threading
    import time

    '''
    主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环

    然后在主线程中通过`run_coroutine_threadsafe`来注册协程对象,子线程运行时间循环,主线程也不会被阻塞
    '''


    def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


    async def do_some_work(x):
    print("do some work: ", x)
    await asyncio.sleep(1)
    print("do next work: ", x)
    return x * x


    now = lambda: time.time()

    new_loop = asyncio.new_event_loop()

    # 创建线程,在线程中开启事件循环
    t = threading.Thread(target=start_loop, args=(new_loop,), name='loopThread')
    t.start()

    start = now()
    result = asyncio.run_coroutine_threadsafe(do_some_work(3), new_loop)
    print("haha---->", now() - start, result)

    result = asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
    print("haha---->", now() - start)
    print("end", now(), ' cost ', now() - start, result.result())

    上面需要注意的几点是

    • 创建一个新的事件循环, asyncio.new_event_loop()
    • 主线程注册协程的逻辑 asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop), 两个参数,第一个为协程,第二个为前面创建的时间循环
    • 如果需要获取返回的结果,则根据返回的 result() 函数来get

    II. 其他

    1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

    一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

    2. 声明

    尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

    3. 扫描关注

    一灰灰blog

    QrCode

    知识星球

    goals

    评论

    Your browser is out-of-date!

    Update your browser to view this website correctly. Update my browser now

    ×