订阅 DDL 操作
PostgreSQL提供了事件触发器。事件触发器对一个特定数据库来说是全局的,并且可以捕捉该数据库中发生的DDL事件。Redrock Postgres 通过对PostgreSQL进行了如下改进,从而让用户可以通过事件触发器,实现对DDL操作的订阅:
- 在 Redrock Postgres 中,角色和表空间都是数据库级别的对象,事件触发器也支持作用于角色和表空间对象的DDL命令,详情可参见 事件触发器触发矩阵 ;
- 支持在命令结束处(对应
ddl_command_end
事件)捕捉删除对象的操作; - 提供函数 pg_ddl_command_deparse 将DDL命令从
pg_ddl_command
类型的内部格式反向解析为SQL语句;
下面将以示例的方式介绍如何实现DDL操作的订阅和同步功能,您可以根据业务情况对示例中的操作命令进行相应修改。
我们需要在发布端通过事件触发器记录DDL语句到DDL操作记录表中。在操作之前,请确保发布端数据库配置参数wal_level
的值为logical,该参数修改后需要重启实例才能生效。然后,就可以使用超级用户或者数据库拥有者登录数据库,执行如下操作:
CREATE SCHEMA IF NOT EXISTS audit;
REVOKE ALL ON SCHEMA audit FROM public;
-- create table for ddl record
CREATE TABLE IF NOT EXISTS audit.ddl_history(
ord int,
event_time timestamp,
username text,
object_type text,
schema_name text,
object_identity text,
command_tag text,
command text
);
-- grant privileges to all user
GRANT INSERT ON TABLE audit.ddl_history TO PUBLIC;
其中表audit.ddl_history
包括的列如下:
名称 | 描述 |
---|---|
ord |
当前DDL查询分解出来的DDL子命令编号 |
event_time |
DDL操作执行的时间 |
username |
执行该DDL操作的用户 |
object_type |
对象的类型。可能的类型是 table , index , sequence , view , materialized view , foreign table , aggregate , function , type , cast , collation , rule , trigger , schema , role , tablespace , foreign data wrapper , server , user mapping , extension , policy , publication 以及 subscription 。 |
schema_name |
该对象所属的模式的名称(如果有),如果没有则为NULL 。没有引号修饰。 |
object_identity |
对象标识的文本表现形式,用模式限定。如果必要,出现在该标识中的每一个标识符会被引号修饰。 |
command_tag |
命令标签。可能的值请参见 支持事件触发器的命令标签 。 |
command |
执行的DDL命令 |
在发布端执行发布DDL操作记录表:
CREATE PUBLICATION ddl_publication FOR TABLE ONLY audit.ddl_history;
-- create function for event triggers
CREATE OR REPLACE FUNCTION audit.ddl_pub_trigger_func()
RETURNS event_trigger AS $$
BEGIN
INSERT INTO audit.ddl_history
SELECT ordinality, now(), current_user,
object_type, schema_name, object_identity,
command_tag, pg_ddl_command_deparse(command)
FROM pg_event_trigger_ddl_commands() WITH ORDINALITY;
END;
$$ LANGUAGE plpgsql;
-- create ddl_command_end event trigger
CREATE EVENT TRIGGER ddl_event_trigger
ON ddl_command_end
EXECUTE PROCEDURE audit.ddl_pub_trigger_func();
执行完以上命令后,您的DDL操作将会记录在表audit.ddl_history
中。
您可以通过修改DDL事件触发器函数定义,对DDL操作的记录和发布进行配置。参照上面表audit.ddl_history
的定义,您可以根据DDL操作的执行用户、影响的对象类型、对象所属模式、对象名称以及命令标签等信息,在DDL事件触发器函数定义中添加筛选条件,对DDL操作的记录和发布进行配置。比如,我们可以将上面的事件触发器函数audit.ddl_pub_trigger_func
的定义修改如下,限定记录和发布的DDL操作涉及的模式和对象类型:
CREATE OR REPLACE FUNCTION audit.ddl_pub_trigger_func()
RETURNS event_trigger AS $$
BEGIN
INSERT INTO audit.ddl_history
SELECT ordinality, now(), current_user,
object_type, schema_name, object_identity,
command_tag, pg_ddl_command_deparse(command)
FROM pg_event_trigger_ddl_commands() WITH ORDINALITY
WHERE schema_name IN ('testschema', 'myschema')
AND object_type IN ('table', 'index');
END;
$$ LANGUAGE plpgsql;
在上面的例子中,我们将DDL操作涉及的模式限定在testschema
和myschema
,涉及的对象类型限定在table
和index
。
在发布端我们将已经执行了的DDL语句存储在audit.ddl_history
中。订阅端可以读取记录进行同步。使用超级用户或者数据库拥有者登录数据库,执行如下操作:
在订阅端创建和发布端一致的DDL操作记录表:
CREATE SCHEMA IF NOT EXISTS audit;
REVOKE ALL ON SCHEMA audit FROM public;
-- create table for ddl record
CREATE TABLE IF NOT EXISTS audit.ddl_history(
ord int,
event_time timestamp,
username text,
object_type text,
schema_name text,
object_identity text,
command_tag text,
command text
);
-- grant privileges to all user
GRANT INSERT ON TABLE audit.ddl_history TO PUBLIC;
在订阅端创建订阅:
CREATE SUBSCRIPTION ddl_subscriptin CONNECTION
'host=192.168.1.50 port=5432 dbname=testdb user=testuser password=pgpass'
PUBLICATION ddl_publication;
您需要根据实际的部署环境,修改上面SQL命令中发布端的访问信息。
在订阅端针对audit.ddl_history
表创建相应的触发器,实现DDL增量同步:
CREATE OR REPLACE FUNCTION audit.ddl_sub_trigger_func()
RETURNS trigger AS $$
BEGIN
EXECUTE NEW.command;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER ddl_sub_trigger
AFTER INSERT ON audit.ddl_history
FOR EACH ROW
EXECUTE PROCEDURE audit.ddl_sub_trigger_func();
您可以通过修改audit.ddl_history
表级触发器函数定义,对DDL操作的同步进行配置。参照上面表audit.ddl_history
的定义,您可以根据DDL操作的执行用户、影响的对象类型、对象所属模式、对象名称以及命令标签等信息,在audit.ddl_history
表级触发器函数定义中添加筛选条件,对DDL操作的同步进行配置。比如,我们可以将上面的表级触发器函数audit.ddl_sub_trigger_func
的定义修改如下,限定同步的DDL操作涉及的模式和对象类型:
CREATE OR REPLACE FUNCTION audit.ddl_sub_trigger_func()
RETURNS trigger AS $$
BEGIN
IF NEW.schema_name IN ('testschema', 'myschema') AND
NEW.object_type IN ('table', 'index')
THEN
EXECUTE NEW.command;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
在上面的例子中,我们将同步的DDL操作涉及的模式限定在testschema
和myschema
,涉及的对象类型限定在table
和index
。