Redrock Postgres 文档
主页 切换暗/亮/自动模式 切换暗/亮/自动模式 切换暗/亮/自动模式 返回首页

事务管理

相对于 psql 客户端程序,Psycopg 有一个行为似乎令人惊讶:默认情况下,任何数据库操作都将启动一个新事务。因此,在调用 Connection.commit() 之前,连接上的任何操作所做的更改将不可见,并且将被 Connection.rollback() 丢弃。在前面的事务结束后,同一连接上执行的后续操作也会启动新的事务。

如果一个数据库操作失败,服务器将可能拒绝进一步的命令,直到调用rollback()

如果在打开事务的情况下关闭连接,则不会向服务器发送 COMMIT 命令,服务器将丢弃连接中未提交的更改。某些中间件(如 PgBouncer)也会丢弃处于事务状态的连接,因此,如果可能的话,您需要在完成使用连接之前提交或回滚连接。

作为一个例子说明下这可能会引发什么样的问题,您第一次使用 Psycopg 时(并对此感到失望)可能是这样的:

conn = psycopg.connect()

# Creating a cursor doesn't start a transaction or affect the connection
# in any way.
cur = conn.cursor()

cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.

# If your program spends a long time in this state, the server will keep
# a connection "idle in transaction", which is likely something undesired

cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# This statement is executed inside the transaction

conn.close()
# No COMMIT was sent: the INSERT was discarded.

这里有一些问题,让我们看看如何改进它们。

运行上述代码后的一个明显问题是,启动 psql 连接到数据库,您将在表data中看不到新记录。解决此问题的一种方法是在关闭连接之前调用conn.commit()。值得庆幸的是,如果您使用连接上下文,Psycopg 将在代码块的末尾提交连接(或者在块退出并出现异常时将其回滚)。

将代码使用连接上下文修改后,将产生下面这样的数据库语句序列:

with psycopg.connect() as conn:

    cur = conn.cursor()

    cur.execute("SELECT count(*) FROM my_table")
    # This function call executes:
    # - BEGIN
    # - SELECT count(*) FROM my_table
    # So now a transaction has started.

    cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
    # This statement is executed inside the transaction

# No exception at the end of the block:
# COMMIT is executed.

这样我们就不必记住调用close()commit(),并且数据库操作实际上具有持久的影响。代码可能仍会执行您意想不到的操作:将事务从第一个操作保留到连接关闭。您可以使用自动提交事务和/或事务上下文对事务进行更精细的控制。

默认情况下,即使是一个简单的SELECT语句也会启动事务,在长时间运行的程序中,如果不采取进一步的操作,会话将一直在事务中处于空闲状态。由于多种原因(会话持有锁,表膨胀……),这会让数据库处于不良状态。对于长期存在的脚本,请确保尽快终止事务或配置连接启用自动提交
如果数据库操作失败并显示错误消息,如 InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block,则表示上一个操作失败了,数据库会话处于错误状态。如果你想继续使用相同的连接,你需要调用 rollback()

自动提交事务

手动提交的要求可以使用autocommit作为连接属性或 connect() 的参数绕开掉。使用事务控制在存储过程中运行无法在事务内执行的操作(如 CREATE DATABASEVACUUMCALL)时,可能需要这样做。

使用自动提交事务,上述的操作序列将变成:

with psycopg.connect(autocommit=True) as conn:

    cur = conn.cursor()

    cur.execute("SELECT count(*) FROM my_table")
    # This function call now only executes:
    # - SELECT count(*) FROM my_table
    # and no transaction starts.

    cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
    # The result of this statement is persisted immediately by the database

# The connection is closed at the end of the block but, because it is not
# in a transaction state, no COMMIT is executed.

自动提交事务的行为更像习惯使用 psql 的人所期望的。这具有有益的性能效果,因为数据库发送的查询更少,执行的操作也更少。但是,这些语句不会在原子事务中执行;如果需要在事务中执行某些操作,也可以使用显式事务块通过自动提交连接来实现。

事务上下文

确保事务在正确的时间完成的一种更透明的方法是,使用 with Connection.transaction() 创建事务上下文。引入上下文后,将启动事务;离开上下文时,事务被提交,或者如果代码块内发生异常,则进行回滚。

继续上面的示例,如果要使用自动提交连接,但仍将选定的命令组包装在一个原子事务中,则可以使用transaction()上下文:

with psycopg.connect(autocommit=True) as conn:

    cur = conn.cursor()

    cur.execute("SELECT count(*) FROM my_table")
    # The connection is autocommit, so no BEGIN executed.

    with conn.transaction():
        # BEGIN is executed, a transaction started

        cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
        cur.execute("INSERT INTO times VALUES (now())")
        # These two operation run atomically in the same transaction

    # COMMIT is executed at the end of the block.
    # The connection is in idle state again.

# The connection is closed at the end of the block.

请注意,连接块也可以与非自动提交连接一起使用:在这种情况下,您仍然需要注意自动启动的最终事务。如果一个操作隐式启动了事务,则transaction()块将仅管理保存点子事务,主事务需要由调用方处理,如事务管理中所述:

conn = psycopg.connect()

cur = conn.cursor()

cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.

with conn.transaction():
    # The block starts with a transaction already open, so it will execute
    # - SAVEPOINT

    cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))

# The block was executing a sub-transaction so on exit it will only run:
# - RELEASE SAVEPOINT
# The transaction is still on.

conn.close()
# No COMMIT was sent: the INSERT was discarded.

如果一个transaction()块在没有事务处于活动状态时启动,那么它将管理一个正确的事务。从本质上讲,事务上下文试图将连接保留在它发现的状态中,并让您处理更广泛的上下文。

非自动提交事务和事务上下文之间的交互可能令人惊讶。尽管 DBAPI 要求事务应该默认是非自动提交的,一些经验丰富的开发人员会更偏好这样做:

  • 使用连接代码块:with psycopg.connect(...) as conn
  • 使用自动提交的连接:通过向connect()传递参数autocommit=True,或者设置属性conn.autocommit = True
  • 仅在需要的地方使用代码块with conn.transaction()来管理事务。

嵌套事务

事务块也可以嵌套(内部事务块使用 SAVEPOINT 实现):内部块内引发的异常有机会被处理,并且不会让外部的操作完全失败。下面是一系列与数据库交互的操作示例:允许操作失败;最后,我们还希望存储成功处理的操作数。

with conn.transaction() as tx1:
    num_ok = 0
    for operation in operations:
        try:
            with conn.transaction() as tx2:
                unreliable_operation(conn, operation)
        except Exception:
            logger.exception(f"{operation} failed")
        else:
            num_ok += 1

    save_number_of_successes(conn, num_ok)

如果unreliable_operation()发生错误(包括导致数据库错误的操作),则将回退其所有更改。异常会像往块外冒泡一样扩散:在示例中,它被try拦截,以便循环可以完成。最外层的块没有受影响(除非那里发生其他错误)。

您还可以编写代码,通过引发 Rollback 异常来显式回滚任何当前活动的事务块。异常“跳转”到事务块的末尾,回滚其事务,但允许程序从那里继续执行。默认情况下,异常回滚最里面的事务块,但可以将任何当前块指定为目标。在以下示例中,一个假设的CancelCommand可以停止当前的处理并取消以前执行但尚未完全提交的任何操作。

from psycopg import Rollback

with conn.transaction() as outer_tx:
    for command in commands():
        with conn.transaction() as inner_tx:
            if isinstance(command, CancelCommand):
                raise Rollback(outer_tx)
            process_command(command)

# If `Rollback` is raised, it would propagate only up to this block,
# and the program would continue from here with no exception.

事务特征

您可以为 Psycopg 处理的事务设置事务参数。它们会影响由非自动提交事务隐式启动的事务,以及由 Connection.transaction() 显式启动的事务,可用于自动提交和非自动提交事务。让这些参数保留为None,将使用服务器的默认行为(由服务端配置参数控制,如 default_transaction_isolation)。

为了设置这些参数,您可以使用连接属性isolation_levelread_onlydeferrable。对于异步连接,必须使用等效的 set_isolation_level() 方法和类似方法。仅当连接上没有已处于活动状态的事务时,才能更改参数。

可重复读可序列化隔离级别运行的应用程序可能会出现序列化失败的错误。在某些并发更新的情况下,PostgreSQL 将引发如下所示的异常:

psycopg2.errors.SerializationFailure: could not serialize access
due to concurrent update

在这种情况下,应用程序必须准备好重试发生异常的操作。

两阶段提交协议支持

版本 3.1 中的新功能。

Psycopg 提供了 PostgreSQL 中可用的两阶段提交功能,实现了 DBAPI 提出的两阶段提交扩展

两阶段提交的 DBAPI 模型受到了XA 规范的启发,根据该规范,事务 ID 由三个部分组成:

  • 格式 ID(非负的 32 位整数)
  • 全局事务 ID(长度不超过 64 个字节的字符串)
  • 分支限定符(长度不超过 64 个字节的字符串)

对于一个特定的全局事务,所有资源的前两个部分都是相同的。每个资源将会分配不同的分支限定符。

根据 DBAPI 规范,事务 ID 是使用 Connection.xid() 方法创建的。获得事务 ID 后,可以使用 Connection.tpc_begin() 启动分布式事务,使用 tpc_prepare() 准备并使用 tpc_commit()tpc_rollback() 完成。事务 ID 也可以使用 tpc_recover() 从数据库中恢复,并使用上述的tpc_commit()tpc_rollback()完成。

PostgreSQL 并不遵循 XA 标准,PostgreSQL 中的预备事务 ID 可以是任何长达200个字符的字符串。Psycopg 的 Xid 对象可以表示 XA 样式的事务 ID(例如由xid()方法创建的事务 ID)和由未解析字符串标识的 PostgreSQL 事务 ID。

将事务 ID 转换为传递给数据库的字符串的格式与 PostgreSQL JDBC 驱动程序采用的格式相同:这应该允许用 Python 和 Java 编写的工具之间的互操作。例如,用Python编写的恢复工具将能够识别 Java 程序产生的事务 ID。

有关更多详细信息,请参阅两阶段提交支持方法的文档。