安裝 Connector/Python 也會安裝 mysql.connector.aio
套件,該套件將 asyncio 與連接器整合,以便將非同步 MySQL 互動整合到應用程式中。
以下是一些整合 mysql.connector.aio
功能的程式碼範例
基本用法
from mysql.connector.aio import connect
# Connect to a MySQL server and get a cursor
cnx = await connect(user="myuser", password="mypass")
cur = await cnx.cursor()
# Execute a non-blocking query
await cur.execute("SELECT version()")
# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)
# Close cursor and connection
await cur.close()
await cnx.close()
搭配情境管理器的用法
from mysql.connector.aio import connect
# Connect to a MySQL server and get a cursor
async with await connect(user="myuser", password="mypass") as cnx:
async with await cnx.cursor() as cur:
# Execute a non-blocking query
await cur.execute("SELECT version()")
# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)
非同步執行多個任務
這個範例展示如何非同步執行任務以及 to_thread 的用法,to_thread 是非同步執行阻擋函式的基礎。
此範例的同步版本實作了協程,而不是遵循常見的同步方法;這是為了明確展示僅等待協程並不會使程式碼非同步執行。必須使用 asyncio API 中包含的函式才能實現非同步性。
import asyncio
import os
import time
from mysql.connector.aio import connect
# Global variable which will help to format the job sequence output.
# DISCLAIMER: this is an example for showcasing/demo purposes,
# you should avoid global variables usage for production code.
global indent
indent = 0
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
async def job_sleep(n):
"""Take a nap for n seconds.
This job represents any generic task - it may be or not an IO task.
"""
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# Emulating a generic job/task
print(f"{offset}START_SLEEP")
await asyncio.sleep(n)
print(f"{offset}END_SLEEP")
return f"I slept for {n} seconds"
async def job_mysql():
"""Connect to a MySQL Server and do some operations.
Run queries, run procedures, insert data, etc.
"""
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# MySQL operations
print(f"{offset}START_MYSQL_OPS")
async with await connect(**config) as cnx:
async with await cnx.cursor() as cur:
await cur.execute("SELECT @@version")
res = await cur.fetchone()
time.sleep(1) # for simulating that the fetch isn't immediate
print(f"{offset}END_MYSQL_OPS")
# return server version
return res
async def job_io():
"""Emulate an IO operation.
`to_thread` allows to run a blocking function asynchronously.
References:
[asyncio.to_thread]: https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread
"""
# Emulating a native blocking IO procedure
def io():
"""Blocking IO operation."""
time.sleep(5)
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# Showcasing how a native blocking IO procedure can be awaited,
print(f"{offset}START_IO")
await asyncio.to_thread(io)
print(f"{offset}END_IO")
return "I am an IO operation"
async def main_asynchronous():
"""Running tasks asynchronously.
References:
[asyncio.gather]: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
"""
print("-------------------- ASYNCHRONOUS --------------------")
# reset indent
global indent
indent = 0
clock = time.time()
# `asyncio.gather()` allows to run awaitable objects
# in the aws sequence asynchronously.\
# If all awaitables are completed successfully,
# the result is an aggregate list of returned values.
aws = (job_io(), job_mysql(), job_sleep(4))
returned_vals = await asyncio.gather(*aws)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# The order of result values corresponds to the
# order of awaitables in aws.
print(returned_vals, end="\n" * 2)
# Example expected output
# -------------------- ASYNCHRONOUS --------------------
# START_IO
# START_MYSQL_OPS
# START_SLEEP
# END_MYSQL_OPS
# END_SLEEP
# END_IO
# Elapsed time: 5.01
# ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']
async def main_non_asynchronous():
"""Running tasks non-asynchronously"""
print("------------------- NON-ASYNCHRONOUS -------------------")
# reset indent
global indent
indent = 0
clock = time.time()
# Sequence of awaitable objects
aws = (job_io(), job_mysql(), job_sleep(4))
# The line below this docstring is the short version of:
# coro1, coro2, coro3 = *aws
# res1 = await coro1
# res2 = await coro2
# res3 = await coro3
# returned_vals = [res1, res2, res3]
# NOTE: Simply awaiting a coro does not make the code run asynchronously!
returned_vals = [await coro for coro in aws] # this will run synchronously
print(f"Elapsed time: {time.time() - clock:0.2f}")
print(returned_vals, end="\n")
# Example expected output
# ------------------- NON-ASYNCHRONOUS -------------------
# START_IO
# END_IO
# START_MYSQL_OPS
# END_MYSQL_OPS
# START_SLEEP
# END_SLEEP
# Elapsed time: 10.07
# ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']
if __name__ == "__main__":
# `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
# You cannot run a coro without it.
# References:
# [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run
assert asyncio.run(main_asynchronous()) == asyncio.run(main_non_asynchronous())
它顯示這三個工作非同步執行
-
job_io
:模擬 I/O 操作;使用 to_thread 以非同步方式執行阻擋函式。首先啟動,需要五秒才能完成,因此是最後一個完成的工作。
-
job_mysql
:連線到 MySQL 伺服器以執行查詢和預存程序等操作。第二個啟動,需要一秒才能完成,因此是第一個完成的工作。
-
job_sleep
:休眠 n 秒以表示一般任務。最後啟動,需要四秒才能完成,因此是第二個完成的工作。
沒有將鎖定/互斥鎖新增到 indent
變數中,因為未使用多執行緒;相反地,唯一活動的執行緒會執行所有工作。非同步執行的重點是在等待 I/O 操作結果時完成其他工作。
非同步 MySQL 查詢
這是一個類似的範例,使用 MySQL 查詢而不是一般工作。
雖然這些範例中未使用游標,但原理和工作流程可以應用於游標,方法是讓每個連線物件建立一個游標來操作。
建立和填入數百個資料表的同步程式碼
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple
from mysql.connector import connect
if TYPE_CHECKING:
from mysql.connector.abstracts import (
MySQLConnectionAbstract,
)
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
exec_sequence = []
def create_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Creates a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
stmt = f"""
CREATE TABLE IF NOT EXISTS {table_names[i]} (
dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
category TEXT,
dish_name TEXT,
price FLOAT,
servings INT,
order_time TIME
)
"""
cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
cnx.cmd_query(stmt)
exec_seq.append(f"end_{i}")
return True
def drop_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Drops a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
exec_seq.append(f"end_{i}")
return True
def main(
kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
table_names: List[str],
) -> Tuple[List, List]:
exec_seq = []
database_name = "TABLE_CREATOR"
with connect(**config) as cnx:
# Create/Setup database
cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
cnx.cmd_query(f"USE {database_name}")
# Execute Kernel: Create or Delete tables
for i in range(len(table_names)):
kernel(exec_seq, table_names, cnx, i)
# Show tables
cnx.cmd_query("SHOW tables")
show_tables = cnx.get_rows()[0]
# Return execution sequence and table names retrieved with `SHOW tables;`.
return exec_seq, show_tables
if __name__ == "__main__":
# with num_tables=511 -> Elapsed time ~ 25.86
clock = time.time()
print_exec_seq = False
num_tables = 511
table_names = [f"table_sync_{n}" for n in range(num_tables)]
print("-------------------- SYNC CREATOR --------------------")
exec_seq, show_tables = main(kernel=create_table, table_names=table_names)
assert len(show_tables) == num_tables
if print_exec_seq:
print(exec_seq)
print("-------------------- SYNC DROPPER --------------------")
exec_seq, show_tables = main(kernel=drop_table, table_names=table_names)
assert len(show_tables) == 0
if print_exec_seq:
print(exec_seq)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# Expected output with num_tables = 11:
# -------------------- SYNC CREATOR --------------------
# [
# "start_0",
# "end_0",
# "start_1",
# "end_1",
# "start_2",
# "end_2",
# "start_3",
# "end_3",
# "start_4",
# "end_4",
# "start_5",
# "end_5",
# "start_6",
# "end_6",
# "start_7",
# "end_7",
# "start_8",
# "end_8",
# "start_9",
# "end_9",
# "start_10",
# "end_10",
# ]
# -------------------- SYNC DROPPER --------------------
# [
# "start_0",
# "end_0",
# "start_1",
# "end_1",
# "start_2",
# "end_2",
# "start_3",
# "end_3",
# "start_4",
# "end_4",
# "start_5",
# "end_5",
# "start_6",
# "end_6",
# "start_7",
# "end_7",
# "start_8",
# "end_8",
# "start_9",
# "end_9",
# "start_10",
# "end_10",
# ]
該腳本會建立和刪除 {num_tables} 個資料表,並且完全依序執行,也就是在移動到 table_{i+1} 之前會建立和刪除 table_{i}。
相同任務的非同步程式碼範例
import asyncio
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple
from mysql.connector.aio import connect
if TYPE_CHECKING:
from mysql.connector.aio.abstracts import (
MySQLConnectionAbstract,
)
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
exec_sequence = []
async def create_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Creates a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
stmt = f"""
CREATE TABLE IF NOT EXISTS {table_names[i]} (
dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
category TEXT,
dish_name TEXT,
price FLOAT,
servings INT,
order_time TIME
)
"""
await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
await cnx.cmd_query(stmt)
exec_seq.append(f"end_{i}")
return True
async def drop_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Drops a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
exec_seq.append(f"end_{i}")
return True
async def main_async(
kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
table_names: List[str],
num_jobs: int = 2,
) -> Tuple[List, List]:
"""The asynchronous tables creator...
Reference:
[as_completed]: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
"""
exec_seq = []
database_name = "TABLE_CREATOR"
# Create/Setup database
# ---------------------
# No asynchronous execution is done here.
# NOTE: observe usage WITH context manager.
async with await connect(**config) as cnx:
await cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
await cnx.cmd_query(f"USE {database_name}")
config["database"] = database_name
# Open connections
# ----------------
# `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
# NOTE: observe usage WITHOUT context manager.
aws = [connect(**config) for _ in range(num_jobs)]
cnxs: List["MySQLConnectionAbstract"] = [
await coro for coro in asyncio.as_completed(aws)
]
# Execute Kernel: Create or Delete tables
# -------------
# N tables must be created/deleted and we can run up to `num_jobs` jobs asynchronously,
# therefore we execute jobs in batches of size num_jobs`.
returned_values, i = [True], 0
while any(returned_values): # Keep running until i >= len(table_names) for all jobs
# Prepare coros: map connections/cursors and table-name IDs to jobs.
aws = [
kernel(exec_seq, table_names, cnx, i + idx) for idx, cnx in enumerate(cnxs)
]
# When i >= len(table_names) coro simply returns False, else True.
returned_values = [await coro for coro in asyncio.as_completed(aws)]
# Update table-name ID offset based on the number of jobs
i += num_jobs
# Close cursors
# -------------
# `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
for coro in asyncio.as_completed([cnx.close() for cnx in cnxs]):
await coro
# Load table names
# ----------------
# No asynchronous execution is done here.
async with await connect(**config) as cnx:
# Show tables
await cnx.cmd_query("SHOW tables")
show_tables = (await cnx.get_rows())[0]
# Return execution sequence and table names retrieved with `SHOW tables;`.
return exec_seq, show_tables
if __name__ == "__main__":
# `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
# You cannot run a coro without it.
# References:
# [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run
# with num_tables=511 and num_jobs=3 -> Elapsed time ~ 19.09
# with num_tables=511 and num_jobs=12 -> Elapsed time ~ 13.15
clock = time.time()
print_exec_seq = False
num_tables = 511
num_jobs = 12
table_names = [f"table_async_{n}" for n in range(num_tables)]
print("-------------------- ASYNC CREATOR --------------------")
exec_seq, show_tables = asyncio.run(
main_async(kernel=create_table, table_names=table_names, num_jobs=num_jobs)
)
assert len(show_tables) == num_tables
if print_exec_seq:
print(exec_seq)
print("-------------------- ASYNC DROPPER --------------------")
exec_seq, show_tables = asyncio.run(
main_async(kernel=drop_table, table_names=table_names, num_jobs=num_jobs)
)
assert len(show_tables) == 0
if print_exec_seq:
print(exec_seq)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# Expected output with num_tables = 11 and num_jobs = 3:
# -------------------- ASYNC CREATOR --------------------
# 11
# [
# "start_2",
# "start_1",
# "start_0",
# "end_2",
# "end_0",
# "end_1",
# "start_5",
# "start_3",
# "start_4",
# "end_3",
# "end_5",
# "end_4",
# "start_8",
# "start_7",
# "start_6",
# "end_7",
# "end_8",
# "end_6",
# "start_10",
# "start_9",
# "end_9",
# "end_10",
# ]
# -------------------- ASYNC DROPPER --------------------
# [
# "start_1",
# "start_2",
# "start_0",
# "end_1",
# "end_2",
# "end_0",
# "start_3",
# "start_5",
# "start_4",
# "end_4",
# "end_5",
# "end_3",
# "start_6",
# "start_8",
# "start_7",
# "end_7",
# "end_6",
# "end_8",
# "start_10",
# "start_9",
# "end_9",
# "end_10",
# ]
此輸出顯示工作流程不是依序的,最多可以非同步執行 {num_jobs} 個工作。這些工作會按照 {num_jobs} 的批次方式執行,並等待所有工作終止後再啟動下一批工作,且迴圈會在沒有資料表可以建立時結束。
這些範例的效能比較:使用 3 個工作時,非同步實作快約 26%;使用 12 個工作時快 49%。請注意,增加工作數量確實會增加工作管理負擔,這在某種程度上會抵消初始的速度提升。最佳工作數量取決於問題,並且是透過經驗確定的值。
如所展示的,非同步版本需要比非同步變體更多的程式碼才能運作。這值得付出努力嗎?這取決於目標,因為非同步程式碼可以更好地最佳化效能,例如 CPU 使用率,而編寫標準的同步程式碼則更簡單。
有關 asyncio 模組的更多資訊,請參閱官方的 非同步 I/O Python 文件。