如何解决PostgreSQL逻辑复制冲突

前言 逻辑复制过程中往往会出现冲突的情况。冲突的情况多种多样,我们今天就来研究一下怎么处理这些冲突。 根据PostgreSQL官方文档介绍,解决冲突的办法有两种: 手工修改订阅库上的数据,使...

前言

逻辑复制过程中往往会出现冲突的情况。冲突的情况多种多样,我们今天就来研究一下怎么处理这些冲突。

根据PostgreSQL官方文档介绍,解决冲突的办法有两种:

  • 手工修改订阅库上的数据,使发布端传过来的SQL可以在订阅端上执行。
  • 通过pg_replication_origin_advance()函数跳过当前的事务。

手工解决冲突

我们来通过实际案例来模拟一下问题。

--发布端和订阅端都创建T1表,id为主键
hr=# create table t1 (id int primary key,name text);
--发布端创建发布指定t1表
CREATE PUBLICATION testpub FOR TABLE t1;
--创建订阅
create subscription testsub connection 'host=192.168.56.119 port=5432 dbname=hr user=replication' publication testpub;


主库插入一些数据。


hr=# insert into t1 values(1,'aaa');
INSERT 0 1
hr=# insert into t1 values(2,'aaa');
INSERT 0 1

此时订阅端会顺利的把这些数据同步过来。

hr=# select * from t1;
 id | name 
----+------
  1 | aaa
  2 | aaa
(2 rows)

接下来我们在订阅端插入id号为3的记录。

hr=# insert into t1 values(3,'aaa');
INSERT 0 1

此时我们在发布端在插入同样的id号为3的记录,当它复制过来的时候,就会出现主键冲突的情况。我们来测试一下。

hr=# insert into t1 values(3,'aaa');
INSERT 0 1

顺利插入之后,在从库的后台日志上,我们看到了冲突的产生。

2021-03-29 16:52:29.901 CST [2141] ERROR:  duplicate key value violates unique constraint "t1_pkey"
2021-03-29 16:52:29.901 CST [2141] DETAIL:  Key (id)=(3) already exists.
2021-03-29 16:52:29.902 CST [1470] LOG:  background worker "logical replication worker" (PID 2141) exited with exit code 1
2021-03-29 16:52:29.905 CST [2156] LOG:  logical replication apply worker for subscription "testsub" has started
2021-03-29 16:52:29.911 CST [2156] ERROR:  duplicate key value violates unique constraint "t1_pkey"
2021-03-29 16:52:29.911 CST [2156] DETAIL:  Key (id)=(3) already exists.
2021-03-29 16:52:29.911 CST [1470] LOG:  background worker "logical replication worker" (PID 2156) exited with exit code 1

错误显示的比较明显,那么解决问题的第一种方法就是我们把订阅先disable一下,删除相应的记录,在enable订阅。

hr=# alter subscription testsub disable;
ALTER SUBSCRIPTION

hr=# delete from t1 where id=3;
DELETE 1


hr=# alter subscription testsub enable;
ALTER SUBSCRIPTION

执行完上述操作后,后台日志没有再报错。冲突已经解决。查看订阅侧的t1表数据已经被复制过来了。

hr=# select * from t1;
 id | name 
----+------
  1 | aaa
  2 | aaa
  3 | aaa
(3 rows)

使用pg_replication_origin_advance跳过事务

接下来我们在订阅端插入id号为4的记录。

hr=#  insert into t1 values(4,'aaa');
INSERT 0 1

然后发布端同样插入id号为4的记录。

hr=# insert into t1 values(4,'aaa');
INSERT 0 1

此时就产生了事务冲突。那么接下来我们需要找到主库冲突的的LSN。找到这个LSN是比较麻烦的,假设我们的发布端上一直有程序在执行插入。你会发现当前的LSN一直在往前增加。

insert into t1 values(5,'aaa');
insert into t1 values(6,'aaa');

hr=# SELECT pg_current_wal_lsn() ;
 pg_current_wal_lsn 
--------------------
 0/4A905B98
(1 row)

hr=# insert into t1 values(7,'aaa');
INSERT 0 1

hr=# SELECT pg_current_wal_lsn() ;  
 pg_current_wal_lsn 
--------------------
 0/4A905C48
(1 row)

此时你要跳跃的LSN位置就不知道从哪开始了?那么简单的做法是通过读取逻辑插槽来判断。

hr=#  select * from  pg_replication_slots;
-[ RECORD 1 ]-------+-----------
slot_name           | testsub
plugin              | pgoutput
slot_type           | logical
datoid              | 99550
database            | hr
temporary           | f
active              | f
active_pid          | 
xmin                | 
catalog_xmin        | 29946
restart_lsn         | 0/4A903990
confirmed_flush_lsn | 0/4A9039C8
wal_status          | reserved
safe_wal_size       | 

逻辑插槽记录了复制的位置。我们要查看逻辑插槽中的WAL内容。要查看WAL内容先需要复制一个逻辑插槽。然后再进行查看。

注:这里之所以要复制一个,是因为你没有办法直接查看当前再使用的逻辑插槽。复制一个就可以查看复制后的插槽。

hr=# SELECT * from pg_logical_slot_peek_changes('testsub',null,null);
ERROR:  replication slot "testsub" is active for PID 3137

hr=# SELECT pg_copy_logical_replication_slot('testsub', 'test_sub_copy', true, 'test_decoding');
 pg_copy_logical_replication_slot 
----------------------------------
 (test_sub_copy,0/4A9039C8)
(1 row)

hr=# SELECT * from pg_logical_slot_peek_changes('test_sub_copy', NULL, NULL);
    lsn     |  xid  |                          data                           
------------+-------+---------------------------------------------------------
 0/4A9039C8 | 29946 | BEGIN 29946
 0/4A9039C8 | 29946 | table public.t1: INSERT: id[integer]:4 name[text]:'aaa'
 0/4A903B98 | 29946 | COMMIT 29946   -->  一般在commit位置
 0/4A903BD0 | 29947 | BEGIN 29947
 0/4A903BD0 | 29947 | table public.t1: INSERT: id[integer]:5 name[text]:'aaa'
 0/4A903C80 | 29947 | COMMIT 29947
 0/4A905920 | 29948 | BEGIN 29948
 0/4A905920 | 29948 | table public.t1: INSERT: id[integer]:6 name[text]:'aaa'
 0/4A905B60 | 29948 | COMMIT 29948
 0/4A905B98 | 29949 | BEGIN 29949
 0/4A905B98 | 29949 | table public.t1: INSERT: id[integer]:7 name[text]:'aaa'
 0/4A905C48 | 29949 | COMMIT 29949
(12 rows)

我们对复制的插槽进行了分析,发现一共有4条记录,那么究竟在哪个位置出现了冲突。一般是第一个记录的commit这里。也就是0/4A903B98

找到这个lsn后,我们在订阅侧就可以执行pg_replication_origin_advance函数来跳过事务了,第一个参数是表的外部节点标识符。可以从pg_replication_origin_status视图中获取,第二个参数就是刚刚我们查的0/4A903B98。

hr=#  SELECT * FROM pg_replication_origin_status;
 local_id | external_id | remote_lsn | local_lsn  
----------+-------------+------------+------------
        1 | pg_24618    | 0/4A9038A8 | 0/7913EAC8

这里的external_id,就是表的外部节点标识符。

SELECT pg_replication_origin_advance ('pg_24618''0/4A903B98'::pg_lsn);

执行完成之后再次查看jobs。冲突的事务已经跳过。数据已经复制过来了。

hr=# select * from t1;
 id | name 
----+------
  1 | aaa
  2 | aaa
  5 | aaa
  6 | aaa
  7 | aaa
  3 | aaa
  4 | aaa
(7 rows)

后记

以上就是解决PostgreSQL逻辑复制冲突的办法,您学会了吗?

  • 发表于 2021-07-22 19:58
  • 阅读 ( 34 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
石天
石天

437 篇文章

作家榜 »

  1. shitian 662 文章
  2. 石天 437 文章
  3. 每天惠23 33 文章
  4. 小A 29 文章