9.3 非同步連線

安裝 Connector/Python 也會安裝 mysql.connector.aio 套件,該套件將 asyncio 與連接器整合,以便將非同步 MySQL 互動整合到應用程式中。

以下是一些整合 mysql.connector.aio 功能的程式碼範例

基本用法

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
cnx = await connect(user="myuser", password="mypass")
cur = await cnx.cursor()

# Execute a non-blocking query
await cur.execute("SELECT version()")

# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)

# Close cursor and connection
await cur.close()
await cnx.close()

搭配情境管理器的用法

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
async with await connect(user="myuser", password="mypass") as cnx:
    async with await cnx.cursor() as cur:
        # Execute a non-blocking query
        await cur.execute("SELECT version()")

        # Retrieve the results of the query asynchronously
        results = await cur.fetchall()
        print(results)

非同步執行多個任務

這個範例展示如何非同步執行任務以及 to_thread 的用法,to_thread 是非同步執行阻擋函式的基礎。

注意

此範例的同步版本實作了協程,而不是遵循常見的同步方法;這是為了明確展示僅等待協程並不會使程式碼非同步執行。必須使用 asyncio API 中包含的函式才能實現非同步性。

import asyncio
import os
import time

from mysql.connector.aio import connect

# Global variable which will help to format the job sequence output.
# DISCLAIMER: this is an example for showcasing/demo purposes,
# you should avoid global variables usage for production code.
global indent
indent = 0

# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}


async def job_sleep(n):
    """Take a nap for n seconds.

    This job represents any generic task - it may be or not an IO task.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Emulating a generic job/task
    print(f"{offset}START_SLEEP")
    await asyncio.sleep(n)
    print(f"{offset}END_SLEEP")

    return f"I slept for {n} seconds"


async def job_mysql():
    """Connect to a MySQL Server and do some operations.

    Run queries, run procedures, insert data, etc.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # MySQL operations
    print(f"{offset}START_MYSQL_OPS")
    async with await connect(**config) as cnx:
        async with await cnx.cursor() as cur:
            await cur.execute("SELECT @@version")
            res = await cur.fetchone()
            time.sleep(1)  # for simulating that the fetch isn't immediate
    print(f"{offset}END_MYSQL_OPS")

    # return server version
    return res


async def job_io():
    """Emulate an IO operation.

    `to_thread` allows to run a blocking function asynchronously.

    References:
        [asyncio.to_thread]: https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread
    """

    # Emulating a native blocking IO procedure
    def io():
        """Blocking IO operation."""
        time.sleep(5)

    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Showcasing how a native blocking IO procedure can be awaited,
    print(f"{offset}START_IO")
    await asyncio.to_thread(io)
    print(f"{offset}END_IO")

    return "I am an IO operation"


async def main_asynchronous():
    """Running tasks asynchronously.

    References:
        [asyncio.gather]: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
    """
    print("-------------------- ASYNCHRONOUS --------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # `asyncio.gather()` allows to run awaitable objects
    # in the aws sequence asynchronously.\

    # If all awaitables are completed successfully,
    # the result is an aggregate list of returned values.
    aws = (job_io(), job_mysql(), job_sleep(4))
    returned_vals = await asyncio.gather(*aws)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # The order of result values corresponds to the
    # order of awaitables in aws.
    print(returned_vals, end="\n" * 2)

    # Example expected output
    # -------------------- ASYNCHRONOUS --------------------
    # START_IO
    #         START_MYSQL_OPS
    #                 START_SLEEP
    #         END_MYSQL_OPS
    #                 END_SLEEP
    # END_IO
    # Elapsed time: 5.01
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


async def main_non_asynchronous():
    """Running tasks non-asynchronously"""
    print("------------------- NON-ASYNCHRONOUS -------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # Sequence of awaitable objects
    aws = (job_io(), job_mysql(), job_sleep(4))

    # The line below this docstring is the short version of:
    #     coro1, coro2, coro3 = *aws
    #     res1 = await coro1
    #     res2 = await coro2
    #     res3 = await coro3
    #     returned_vals = [res1, res2, res3]
    # NOTE: Simply awaiting a coro does not make the code run asynchronously!
    returned_vals = [await coro for coro in aws]  # this will run synchronously

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    print(returned_vals, end="\n")

    # Example expected output
    # ------------------- NON-ASYNCHRONOUS -------------------
    # START_IO
    # END_IO
    #         START_MYSQL_OPS
    #         END_MYSQL_OPS
    #                 START_SLEEP
    #                 END_SLEEP
    # Elapsed time: 10.07
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run
    assert asyncio.run(main_asynchronous()) == asyncio.run(main_non_asynchronous())

它顯示這三個工作非同步執行

  • job_io:模擬 I/O 操作;使用 to_thread 以非同步方式執行阻擋函式。

    首先啟動,需要五秒才能完成,因此是最後一個完成的工作。

  • job_mysql:連線到 MySQL 伺服器以執行查詢和預存程序等操作。

    第二個啟動,需要一秒才能完成,因此是第一個完成的工作。

  • job_sleep:休眠 n 秒以表示一般任務。

    最後啟動,需要四秒才能完成,因此是第二個完成的工作。

注意

沒有將鎖定/互斥鎖新增到 indent 變數中,因為未使用多執行緒;相反地,唯一活動的執行緒會執行所有工作。非同步執行的重點是在等待 I/O 操作結果時完成其他工作。

非同步 MySQL 查詢

這是一個類似的範例,使用 MySQL 查詢而不是一般工作。

注意

雖然這些範例中未使用游標,但原理和工作流程可以應用於游標,方法是讓每個連線物件建立一個游標來操作。

建立和填入數百個資料表的同步程式碼

import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector import connect

if TYPE_CHECKING:
    from mysql.connector.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


def main(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
) -> Tuple[List, List]:

    exec_seq = []
    database_name = "TABLE_CREATOR"

    with connect(**config) as cnx:
        # Create/Setup database
        cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        cnx.cmd_query(f"USE {database_name}")

        # Execute Kernel: Create or Delete tables
        for i in range(len(table_names)):
            kernel(exec_seq, table_names, cnx, i)

        # Show tables
        cnx.cmd_query("SHOW tables")
        show_tables = cnx.get_rows()[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # with num_tables=511 -> Elapsed time ~ 25.86
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    table_names = [f"table_sync_{n}" for n in range(num_tables)]

    print("-------------------- SYNC CREATOR --------------------")
    exec_seq, show_tables = main(kernel=create_table, table_names=table_names)
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- SYNC DROPPER --------------------")
    exec_seq, show_tables = main(kernel=drop_table, table_names=table_names)
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11:
    # -------------------- SYNC CREATOR --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]
    # -------------------- SYNC DROPPER --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]

該腳本會建立和刪除 {num_tables} 個資料表,並且完全依序執行,也就是在移動到 table_{i+1} 之前會建立和刪除 table_{i}。

相同任務的非同步程式碼範例

import asyncio
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector.aio import connect

if TYPE_CHECKING:
    from mysql.connector.aio.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


async def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    await cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


async def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


async def main_async(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
    num_jobs: int = 2,
) -> Tuple[List, List]:
    """The asynchronous tables creator...
    Reference:
        [as_completed]: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
    """
    exec_seq = []
    database_name = "TABLE_CREATOR"

    # Create/Setup database
    # ---------------------
    # No asynchronous execution is done here.
    # NOTE: observe usage WITH context manager.
    async with await connect(**config) as cnx:
        await cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        await cnx.cmd_query(f"USE {database_name}")
    config["database"] = database_name

    # Open connections
    # ----------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    # NOTE: observe usage WITHOUT context manager.
    aws = [connect(**config) for _ in range(num_jobs)]
    cnxs: List["MySQLConnectionAbstract"] = [
        await coro for coro in asyncio.as_completed(aws)
    ]

    # Execute Kernel: Create or Delete tables
    # -------------
    # N tables must be created/deleted and we can run up to `num_jobs` jobs asynchronously,
    # therefore we execute jobs in batches of size num_jobs`.
    returned_values, i = [True], 0
    while any(returned_values):  # Keep running until i >= len(table_names) for all jobs
        # Prepare coros: map connections/cursors and table-name IDs to jobs.
        aws = [
            kernel(exec_seq, table_names, cnx, i + idx) for idx, cnx in enumerate(cnxs)
        ]
        # When i >= len(table_names) coro simply returns False, else True.
        returned_values = [await coro for coro in asyncio.as_completed(aws)]
        # Update table-name ID offset based on the number of jobs
        i += num_jobs

    # Close cursors
    # -------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    for coro in asyncio.as_completed([cnx.close() for cnx in cnxs]):
        await coro

    # Load table names
    # ----------------
    # No asynchronous execution is done here.
    async with await connect(**config) as cnx:
        # Show tables
        await cnx.cmd_query("SHOW tables")
        show_tables = (await cnx.get_rows())[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run

    # with num_tables=511 and num_jobs=3 -> Elapsed time ~ 19.09
    # with num_tables=511 and num_jobs=12 -> Elapsed time ~ 13.15
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    num_jobs = 12
    table_names = [f"table_async_{n}" for n in range(num_tables)]

    print("-------------------- ASYNC CREATOR --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=create_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- ASYNC DROPPER --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=drop_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11 and num_jobs = 3:
    # -------------------- ASYNC CREATOR --------------------
    # 11
    # [
    #     "start_2",
    #     "start_1",
    #     "start_0",
    #     "end_2",
    #     "end_0",
    #     "end_1",
    #     "start_5",
    #     "start_3",
    #     "start_4",
    #     "end_3",
    #     "end_5",
    #     "end_4",
    #     "start_8",
    #     "start_7",
    #     "start_6",
    #     "end_7",
    #     "end_8",
    #     "end_6",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]
    # -------------------- ASYNC DROPPER --------------------
    # [
    #     "start_1",
    #     "start_2",
    #     "start_0",
    #     "end_1",
    #     "end_2",
    #     "end_0",
    #     "start_3",
    #     "start_5",
    #     "start_4",
    #     "end_4",
    #     "end_5",
    #     "end_3",
    #     "start_6",
    #     "start_8",
    #     "start_7",
    #     "end_7",
    #     "end_6",
    #     "end_8",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]

此輸出顯示工作流程不是依序的,最多可以非同步執行 {num_jobs} 個工作。這些工作會按照 {num_jobs} 的批次方式執行,並等待所有工作終止後再啟動下一批工作,且迴圈會在沒有資料表可以建立時結束。

這些範例的效能比較:使用 3 個工作時,非同步實作快約 26%;使用 12 個工作時快 49%。請注意,增加工作數量確實會增加工作管理負擔,這在某種程度上會抵消初始的速度提升。最佳工作數量取決於問題,並且是透過經驗確定的值。

如所展示的,非同步版本需要比非同步變體更多的程式碼才能運作。這值得付出努力嗎?這取決於目標,因為非同步程式碼可以更好地最佳化效能,例如 CPU 使用率,而編寫標準的同步程式碼則更簡單。

有關 asyncio 模組的更多資訊,請參閱官方的 非同步 I/O Python 文件