db的读写操作可以算是io型,对于简单的sql,获取数据的数据可能远小于传输的时间,针对这种操作,异步的访问方式就比较占优势了。本篇主要介绍在python中如何借助aiomysql来实现db的异步读写
I. 异步基本使用 1. 配置 本地测试时的环境参数如下
mac操作系统
python3.7.1
aiomysql 0.0.19
PyMySQL 0.9.2
asyncio 3.4.3
db参数
mysq 5.7.22
配置参数: user=root, password=, port=6579, host=127.0.0.1, db=test
表信息及数据如下图
1 2 3 4 5 6 7 8 9 10 CREATE TABLE `user` ( `id` int (11 ) unsigned NOT NULL AUTO_INCREMENT, `name` varchar (80 ) NOT NULL DEFAULT '' COMMENT '用户名' , `pwd` varchar (26 ) NOT NULL DEFAULT '' COMMENT '密码' , `isDeleted` tinyint (1 ) NOT NULL DEFAULT '0' , `created` varchar (13 ) NOT NULL DEFAULT '0' , `updated` varchar (13 ) NOT NULL DEFAULT '0' , PRIMARY KEY (`id` ), KEY `name` (`name` ) ) ENGINE =InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET =utf8mb4;
2. 基础使用 在进入正式的操作之前,有必要先过一下使用步骤,先回顾一下使用PyMySql时的基本使用姿势
首先是与db建立连接 pymysql.connect(host='127.0.0.1', port=3306, user='root', password='', db='test', charset='utf8')
根据连接得到的connect来创建光标cursor
然后通过cursor.execute('sql')
来执行具体的sql语句
针对增加,删除和修改的sql命令,需要额外的提交来connect.commit()
来确保所有的操作被mysql接收并落盘
如果是查询,则可以根据 cursor.fetchXXX
来获取返回结果中的一条,多条或者所有数据
最后回收线程,关闭连接
对于异步的操作,基本步骤其实和上面也差不多,无非是某些耗时的io操作可以使用await
来修饰,从而借助协程来提升性能,那么一个简单的使用case可以如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioimport aiomysqlasync def basic_test () : conn = await aiomysql.connect(host="127.0.0.1" , port=3306 , user='root' , password='' , db='test' , charset='utf8' ) cursor = await conn.cursor() await cursor.execute('select * from user limit 3' ) result = await cursor.fetchall() for record in result: print("record: " , record) await cursor.close() conn.close() loop = asyncio.get_event_loop() loop.run_until_complete(basic_test())
上面的使用姿势,和前面的对比,基本上没有什么区别,唯一的不同就是前面的直接调用;这里需要将调用封装成协程丢到事件循环中执行
下面将主要介绍下,如何使用连接池来实现各种db的异步操作
3. 初始化 接下来我们的目的是尽量开发一个通用的sql读写工具类,所以在脚本启动时,实现db的连接初始化;然后其他的db读写地方,从连接池中获取连接执行db操作,用完之后释放连接;最后在脚本结束时,关闭连接池实现资源回收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async def register () : ''' 初始化,获取数据库连接池 :return: ''' try : print("start to connect db!" ) pool = await aiomysql.create_pool(host='127.0.0.1' , port=3306 , user='root' , password='' , db='test' , charset='utf8' ) print("succeed to connect db!" ) return pool except asyncio.CancelledError: raise asyncio.CancelledError except Exception as ex: print("mysql数据库连接失败:{}" .format(ex.args[0 ])) return False
4. 获取连接和Cursor 根据前面的学习,我们要操作db,前提是要现拿到连接connection, 然后通过connection来绑定光标cursor,最后通过cursor执行sql语句,所以我们接下来就是需要根据前面的pool来获取所需的参数
1 2 3 4 5 6 7 8 9 async def getCurosr (pool) : ''' 获取db连接和cursor对象,用于db的读写操作 :param pool: :return: ''' conn = await pool.acquire() cur = await conn.cursor() return conn, cur
5. 批量写入操作 接下来的db操作就比较简单了,基本上和前面基础篇的使用姿势一样,并没有什么太大的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def batchInsert (pool, sql, values) : start = now() conn, cur = await getCurosr(pool) try : await cur.executemany(sql, values) await conn.commit() return cur.rowcount finally : await pool.release(conn) print("execute insert cost: " , now() - start)
简单看一下上面的使用姿势
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 now = lambda : time.time() * 1000 loop = asyncio.get_event_loop() pool = loop.run_until_complete(register()) records = [('一灰灰1' , 'asdf' , 0 , int(time.time()), int(time.time())), ('一灰灰2' , 'qwer' , 0 , int(time.time()), int(time.time()))] sql = "insert into user(`name`, `pwd`, `isDeleted`, `created`, `updated`) values (%s, %s, %s, %s, %s)" task = asyncio.ensure_future(batchInsert(pool, sql, records)) result = loop.run_until_complete(task) print("insert res:" , result) loop.run_until_complete(close(pool)) loop.close()
输出结果如下
1 2 execute insert cost: 2.943115234375 insert res: 2
6. 查询 经过上面的写入操作之后,查询的逻辑也比较顺了,基本上也没有啥区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def query (pool, sql) : ''' 查询, 一般流程是首先获取连接,光标,获取数据之后,则需要释放连接 :param pool: :return: ''' start = now() conn, cur = await getCurosr(pool) try : await cur.execute(sql) return await cur.fetchall() finally : await pool.release(conn) print("execute %s cost %d" % (sql, now() - start))
对应的测试case如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 loop = asyncio.get_event_loop() pool = loop.run_until_complete(register()) start = now() tasks = [ asyncio.ensure_future(query(pool, 'select * from user where id<3 order by id desc limit 2' )), asyncio.ensure_future(query(pool, 'select * from user where id<7 order by id desc limit 2' )), asyncio.ensure_future(query(pool, 'select * from user where id<9 order by id desc limit 2' )), ] result = loop.run_until_complete(asyncio.gather(*tasks)) print("total cose: " , now() - start) for res in result: print("record: " , res) loop.run_until_complete(close(pool)) loop.close()
输出结果如下:
1 2 3 4 5 6 7 8 9 start to connect db! succeed to connect db! execute select * from user where id<3 order by id desc limit 2 cost 3 execute select * from user where id<7 order by id desc limit 2 cost 3 execute select * from user where id<9 order by id desc limit 2 cost 3 total cose: 4.31298828125 record: ((2, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (1, '一灰灰_1542431218' , '123456' , 0, '1542431218' , '1542431218' )) record: ((6, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (5, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' )) record: ((8, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ), (7, '一灰灰_1542431296' , '123456' , 0, '1542431296' , '1542431296' ))
7. 关闭 最后就是资源回收关闭,前面的测试用例中也都用到了
1 2 3 4 async def close (pool) : pool.close() await pool.wait_closed() print("close pool!" )
II. 其他 参考
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
2. 声明 尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
3. 扫描关注 一灰灰blog
知识星球