Redrock Postgres 文档
主页 切换暗/亮/自动模式 切换暗/亮/自动模式 切换暗/亮/自动模式 返回首页

使用 COPY TO 和 COPY FROM

Psycopg 允许使用 PostgreSQL COPY 协议进行操作。COPY是将数据加载到数据库中(并修改它,带有有一些 SQL 的创造力)的最有效方法之一。

可以使用 Cursor.copy() 方法执行 COPY,向其传递COPY ... FROM STDINCOPY ... TO STDOUT的查询形式,并在with块中管理生成的 Copy 对象:

with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
    # pass data to the 'copy' object using write()/write_row()

您可以使用 psycopg.sql 模块中的对象动态生成 COPY 语句:

with cursor.copy(
    sql.SQL("COPY {} TO STDOUT").format(sql.Identifier("table_name"))
) as copy:
    # read data from the 'copy' object using read()/read_row()

*在 3.1 版本更改:*你也可以将参数传递给copy(),就像在 execute() 中一样:

with cur.copy("COPY (SELECT * FROM table_name LIMIT %s) TO STDOUT", (3,)) as copy:
    # expect no more than three records

连接受通常的事务行为的约束,因此,除非连接处于自动提交状态,否则在 COPY 操作结束时,您仍然必须提交产生的更改,当然您还可以回滚它们。有关详细信息,请参阅事务管理

逐行写入数据

使用 COPY 操作,您可以从任何 Python 可迭代对象(元组列表或任何可迭代的序列)将数据加载到数据库中:Python 值会像在正常查询中一样进行适配。要执行此类操作,可以通过 Cursor.copy() 方法执行COPY ... FROM STDIN操作,并对with块中的Copy对象使用 write_row()。退出代码块后,操作将结束:

records = [(10, 20, "hello"), (40, None, "world")]

with cursor.copy("COPY sample (col1, col2, col3) FROM STDIN") as copy:
    for record in records:
        copy.write_row(record)

如果在代码块内引发异常,则会中断操作,并丢弃到目前为止插入的记录。

为了从Copy逐行读取或写入数据,您不得指定诸如FORMAT CSVDELIMITERNULLCOPY选项,请不要理会这些详细信息,谢谢 :)

逐行读取数据

你也可以做反向的事情,通过迭代 rows()COPY ... TO STDOUT操作中读取行。但是,这可能不是您希望正常执行的操作:通常正常的查询过程会更容易使用。

PostgreSQL 目前在COPY TO的结果中没有提供完整的类型信息,因此返回的行将具有未解析的数据,根据格式可能是字符串或字节。

with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
    for row in copy.rows():
        print(row)  # return unparsed data: ('10', '2046-12-24')

您可以在读取前使用 set_types() 来改进结果数据,但您必须自己指定它们。

with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
    copy.set_types(["int4", "date"])
    for row in copy.rows():
        print(row)  # (10, datetime.date(2046, 12, 24))

逐块 COPY

如果数据已经以适合 COPY 的方式格式化(例如,因为它来自先前的COPY TO操作生成的文件),则可以使用 Copy.write() 将其加载到数据库中。

with open("data", "r") as f:
    with cursor.copy("COPY data FROM STDIN") as copy:
        while data := f.read(BLOCK_SIZE):
            copy.write(data)

在这种情况下,您可以使用任何COPY选项和格式,只要输入数据与copy()操作的预期兼容即可。如果 COPY 指定了FORMAT TEXT格式,则数据可以作为str传递;对于FORMAT TEXTFORMAT BINARY两种格式,数据都可以作为bytes传递。

为了以COPY格式生成数据,您可以使用COPY ... TO STDOUT语句并迭代访问生成的 Copy 对象,这将产生一个bytes对象流:

with open("data.out", "wb") as f:
    with cursor.copy("COPY table_name TO STDOUT") as copy:
        for data in copy:
            f.write(data)

二进制 COPY

通过在COPY语句中指定FORMAT BINARY可以进行二进制 COPY。为了使用 write_row() 导入二进制数据,传递给数据库的所有类型都必须注册二进制转储器;如果使用 write() 逐块复制数据,则不需要这样做。

PostgreSQL 在二进制模式下加载数据时特别挑剔,并且不会应用强制转换规则。例如,这意味着将值 100 传递给integer将失败,因为 Psycopg 会将其作为smallint值传递,而服务器将拒绝它,因为它的大小与预期不匹配。

您可以使用Copy对象的 set_types() 方法并仔细指定要加载的类型来解决此问题。

有关二进制查询的详细信息,请参阅二进制参数和结果

异步 COPY 支持

使用通过 AsyncConnection 获取的对象,可以使用与上述相同的模式进行异步操作。例如,如果对象f支持返回COPY数据的异步的read()方法,则完全异步复制操作可以是:

async with cursor.copy("COPY data FROM STDIN") as copy:
    while data := await f.read():
        await copy.write(data)

AsyncCopy 对象文档描述了异步方法的签名以及与其同步 Copy 的对应方法的区别。

有关使用异步对象的详细信息,请参阅异步操作

示例:跨服务器复制表

为了跨服务器复制表或一部分表数据,可以在两个不同的连接上使用两个 COPY 操作,同时从第一个连接读取并写入第二个连接。

with psycopg.connect(dsn_src) as conn1, psycopg.connect(dsn_tgt) as conn2:
    with conn1.cursor().copy("COPY src TO STDOUT (FORMAT BINARY)") as copy1:
        with conn2.cursor().copy("COPY tgt FROM STDIN (FORMAT BINARY)") as copy2:
            for data in copy1:
                copy2.write(data)

使用FORMAT BINARY通常可以提高性能,但仅当源模式和目标模式完全相同时,它才有效。如果表仅兼容(例如,如果要将integer字段复制到目标bigint字段),则应去掉BINARY选项并执行基于文本的复制。有关详细信息,请参阅二进制 COPY

要进行异步 COPY 时,相同的模式可以适应到使用异步对象上。