使用 COPY TO 和 COPY FROM
Psycopg 允许使用 PostgreSQL COPY 协议进行操作。COPY
是将数据加载到数据库中(并修改它,带有有一些 SQL 的创造力)的最有效方法之一。
可以使用 Cursor.copy()
方法执行 COPY,向其传递COPY ... FROM STDIN
或COPY ... TO STDOUT
的查询形式,并在with
块中管理生成的 Copy
对象:
with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
# pass data to the 'copy' object using write()/write_row()
您可以使用 psycopg.sql
模块中的对象动态生成 COPY 语句:
with cursor.copy(
sql.SQL("COPY {} TO STDOUT").format(sql.Identifier("table_name"))
) as copy:
# read data from the 'copy' object using read()/read_row()
*在 3.1 版本更改:*你也可以将参数传递给copy()
,就像在 execute()
中一样:
with cur.copy("COPY (SELECT * FROM table_name LIMIT %s) TO STDOUT", (3,)) as copy:
# expect no more than three records
连接受通常的事务行为的约束,因此,除非连接处于自动提交状态,否则在 COPY 操作结束时,您仍然必须提交产生的更改,当然您还可以回滚它们。有关详细信息,请参阅事务管理。
使用 COPY 操作,您可以从任何 Python 可迭代对象(元组列表或任何可迭代的序列)将数据加载到数据库中:Python 值会像在正常查询中一样进行适配。要执行此类操作,可以通过 Cursor.copy()
方法执行COPY ... FROM STDIN
操作,并对with
块中的Copy
对象使用 write_row()
。退出代码块后,操作将结束:
records = [(10, 20, "hello"), (40, None, "world")]
with cursor.copy("COPY sample (col1, col2, col3) FROM STDIN") as copy:
for record in records:
copy.write_row(record)
如果在代码块内引发异常,则会中断操作,并丢弃到目前为止插入的记录。
为了从Copy
逐行读取或写入数据,您不得指定诸如FORMAT CSV
、DELIMITER
、NULL
等COPY
选项,请不要理会这些详细信息,谢谢 :)
你也可以做反向的事情,通过迭代 rows()
从COPY ... TO STDOUT
操作中读取行。但是,这可能不是您希望正常执行的操作:通常正常的查询过程会更容易使用。
PostgreSQL 目前在COPY TO
的结果中没有提供完整的类型信息,因此返回的行将具有未解析的数据,根据格式可能是字符串或字节。
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
for row in copy.rows():
print(row) # return unparsed data: ('10', '2046-12-24')
您可以在读取前使用 set_types()
来改进结果数据,但您必须自己指定它们。
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
copy.set_types(["int4", "date"])
for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
如果数据已经以适合 COPY 的方式格式化(例如,因为它来自先前的COPY TO
操作生成的文件),则可以使用 Copy.write()
将其加载到数据库中。
with open("data", "r") as f:
with cursor.copy("COPY data FROM STDIN") as copy:
while data := f.read(BLOCK_SIZE):
copy.write(data)
在这种情况下,您可以使用任何COPY
选项和格式,只要输入数据与copy()
操作的预期兼容即可。如果 COPY 指定了FORMAT TEXT
格式,则数据可以作为str
传递;对于FORMAT TEXT
和FORMAT BINARY
两种格式,数据都可以作为bytes
传递。
为了以COPY
格式生成数据,您可以使用COPY ... TO STDOUT
语句并迭代访问生成的 Copy
对象,这将产生一个bytes
对象流:
with open("data.out", "wb") as f:
with cursor.copy("COPY table_name TO STDOUT") as copy:
for data in copy:
f.write(data)
通过在COPY
语句中指定FORMAT BINARY
可以进行二进制 COPY。为了使用 write_row()
导入二进制数据,传递给数据库的所有类型都必须注册二进制转储器;如果使用 write()
逐块复制数据,则不需要这样做。
PostgreSQL 在二进制模式下加载数据时特别挑剔,并且不会应用强制转换规则。例如,这意味着将值 100 传递给
integer
列将失败,因为 Psycopg 会将其作为smallint
值传递,而服务器将拒绝它,因为它的大小与预期不匹配。您可以使用
Copy
对象的set_types()
方法并仔细指定要加载的类型来解决此问题。
有关二进制查询的详细信息,请参阅二进制参数和结果。
使用通过 AsyncConnection
获取的对象,可以使用与上述相同的模式进行异步操作。例如,如果对象f
支持返回COPY
数据的异步的read()
方法,则完全异步复制操作可以是:
async with cursor.copy("COPY data FROM STDIN") as copy:
while data := await f.read():
await copy.write(data)
AsyncCopy
对象文档描述了异步方法的签名以及与其同步 Copy
的对应方法的区别。
有关使用异步对象的详细信息,请参阅异步操作。
为了跨服务器复制表或一部分表数据,可以在两个不同的连接上使用两个 COPY 操作,同时从第一个连接读取并写入第二个连接。
with psycopg.connect(dsn_src) as conn1, psycopg.connect(dsn_tgt) as conn2:
with conn1.cursor().copy("COPY src TO STDOUT (FORMAT BINARY)") as copy1:
with conn2.cursor().copy("COPY tgt FROM STDIN (FORMAT BINARY)") as copy2:
for data in copy1:
copy2.write(data)
使用FORMAT BINARY
通常可以提高性能,但仅当源模式和目标模式完全相同时,它才有效。如果表仅兼容(例如,如果要将integer
字段复制到目标bigint
字段),则应去掉BINARY
选项并执行基于文本的复制。有关详细信息,请参阅二进制 COPY。