PostgreSQLには物理レプリケーションと論理レプリケーションの2種類のレプリケーションがありますが、どちらもサーバ間の通信にはストリーミングレプリケーションプロトロル(以下、長いのでレプリケーションプロトロルとします)を使用して、データを送っています。

+---------+  Replication Protocol   +---------+
| primary | ----------------------> | replica |
+---------+                         +---------+

レプリケーションプロトロルで接続してみる

クライアントがPostgreSQLサーバに接続するのと同じようにPostgreSQLサーバに接続要求を出しますが、接続文字列にreplicationパラメータを使用します。例えば、psqlを使ってレプリケーションプロトロルが使えるように接続することもできます。

$ psql -d "dbname=postgres replication=database"
psql (16devel)
Type "help" for help.

postgres(1:1636174)=#

replication=databaseとしている所がポイントです。これで論理レプリケーションが使えるモードに入ります。replication=onとすると、物理レプリケーションのモードに入ります。

psqlで接続したままそれに対応するPostgreSQLのサーバプロセスを確認すると、walsenderが起動していることがわかります(psqlのプロンプトに出ている1636174が接続しているPostgreSQLのサーバプロセスです)。

$ ps x | grep 1636174
1636174 ?        Ss     0:00 postgres: walsender masahiko postgres [local] idle

このように、PostgreSQLではレプリケーションプロトロルを利用できるように接続すると、クライアントはwalsenderプロセスを通信することになります。

レプリケーションプロトロルでは何ができる?

指定するreplicationパラメータによって、物理walsenderモードと論理walsenderモードに別れます。どちらもモードでもレプリケーションコマンドは使えますが、論理walsenderモードではそれに加えて通常のSQLを実行することができます。

レプリケーションプロトロルで使えるコマンドの一覧は公式ドキュメントに載っていて、例えば物理レプリケーションを開始したいときは、START_REPLICATIONコマンドを使用します。

$ bin/psql -d "dbname=postgres replication=on" # 物理walsenderモード
psql (16devel)
Type "help" for help.

postgres(1:1636516)=# IDENTIFY_SYSTEM;
      systemid       | timeline |  xlogpos   | dbname
---------------------+----------+------------+--------
 7218079788637499376 |        1 | F/FE96A3E0 |
(1 row)

postgres(1:1636516)=# select version();
ERROR:  cannot execute SQL commands in WAL sender for physical replication
$ psql -d "dbname=postgres replication=database" # 論理walsenderモード
psql (16devel)
Type "help" for help.

postgres(1:1636464)=# IDENTIFY_SYSTEM;
      systemid       | timeline |  xlogpos   |  dbname
---------------------+----------+------------+----------
 7218079788637499376 |        1 | F/FE96A3E0 | postgres
(1 row)

postgres(1:1636464)=# select version();
                                     version
---------------------------------------------------------------------------------
 PostgreSQL 16devel on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.2.0, 64-bit
(1 row)

レプリケーションコマンドを使ってレプリケーションしてみる

ここまで紹介した内容でレプリケーションコマンドが使える様になったので、PostgreSQLの物理レプリケーションや論理レプリケーションが内部でやっていることはpsqlでもできそうです。試しにpsqlを使って論理レプリケーションをやってみたいと思います。手順は簡単で、まずレプリケーションスロットを作成し、それを使ってレプリケーションを開始するだけです。

$ psql -d "dbname=postgres replication=database" # 論理walsenderモードで接続
psql (16devel)
Type "help" for help.

postgres(1:1636464)=# CREATE_REPLICATION_SLOT myslot LOGICAL test_decoding; -- test_decodingというプラグインを使ってスロットを作成
 slot_name | consistent_point |    snapshot_name    | output_plugin
-----------+------------------+---------------------+---------------
 myslot    | F/FE96C5A8       | 00000003-00000002-1 | test_decoding
(1 row)

postgres(1:1636464)=# START_REPLICATION SLOT myslot LOGICAL 0/0; -- レプリケーションを開始
unexpected PQresultStatus: 8

START_REPLICATIONコマンドを使ってレプリケーションを開始しようとしたのですが、unexpected PQresultStatus: 8というメッセージが出て止まってしまいました。これは、psqlがレプリケーションプロトロルに対応していないことが原因で出た、psqlのエラーです。

レプリケーションコマンドを使ってレプリケーションしてみる(リベンジ)

せっかくなので、簡単なクライアントプログラムを使って論理レプリケーションのデータを受信してみます。

論理walsenderモードを使ってPostgreSQLサーバに接続し、CREATE_REPLICATION_SLOTコマンドでレプリケーションスロットを作成した後、START_REPLICATIONコマンドで受信を開始します。START_REPLICATIONコマンドが成功すると結果はPGRES_COPY_BOTHとなり、これはレプリケーションプロトロルでのみ利用されます(サーバからもクライアントからもデータを贈り合う、という意味)。psqlはこれに対応していないため、unexpected PQresultStatus: 8を出していました(PGRES_COPY_BOTHは8ですコードはこちら)。

レプリケーションプロトロルは、libpq的にはCOPY ... TO stdinのような感じで動くので、データの受信にはPQgetCopyData()が利用できます。受信したデータのヘッダを除いて出力します。

#include "libpq-fe.h"
#include <stdlib.h>
#include <stdio.h>

int main()
{
    PGresult *res;
    PGconn *conn;
    char *buf = NULL;

    conn = PQconnectdb("dbname=postgres replication=database");
    if (PQstatus(conn) != CONNECTION_OK)
    {
        fprintf(stderr, "connection error: %s", PQerrorMessage(conn));
        exit(1);
    }

    res = PQexec(conn, "CREATE_REPLICATION_SLOT myslot LOGICAL test_decoding");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        fprintf(stderr, "could not create logical replication slot: %s",
                PQresultErrorMessage(res));
        exit(1);
    }

    res = PQexec(conn, "START_REPLICATION SLOT myslot LOGICAL 0/0");
    if (PQresultStatus(res) != PGRES_COPY_BOTH)
    {
        fprintf(stderr, "could not start replication: %s",
                PQresultErrorMessage(res));
        exit(1);
    }
    PQclear(res);

    for (;;)
    {
        int r;

        if (buf != NULL)
        {
            PQfreemem(buf);
            buf = NULL;
        }

        r = PQgetCopyData(conn, &buf, 0);
        if (r <= 0)
            break;

        /* ignore other than decoded WAL data */
        if (buf[0] != 'w')
            continue;

        /* the message header is 24 bytes */
        printf("%s\n", &(buf[25]));
    }

    if (buf != NULL)
        PQfreemem(buf);

    PQfinish(conn);
    return 0;
}

これをコンパイルして実行します(libpqライブラリのパスは調整してください)。

$ gcc -o test test.c -L/usr/local/pgsql/lib -lpq
$ ./test

プログラム実行直後は何も出力されませんが、サーバ側でなにかテーブルを変更すると、そのWALをデコードしたデータがサーバから送信され、それが表示されます。

$ psql
postgres(1:1709135)=# create table test (c int);
CREATE TABLE
postgres(1:1709135)=# insert into test values (1);
INSERT 0 1
postgres(1:1709135)=# insert into test values (2);
INSERT 0 1

出力されるデータの形式はデコーディング・プラグインによって変わります。今回は、PostgreSQLに同梱されているtest_decodingを使っています。この他にも、例えばwal2jsonを使うと、JSON形式でデータを取得できます。

$ ./test
BEGIN 766
COMMIT 766
BEGIN 767
table public.test: INSERT: c[integer]:1
COMMIT 767
BEGIN 768
table public.test: INSERT: c[integer]:2
COMMIT 768

サンプルプログラムはCtl-cで終了できます。

まとめ

ストリーミングレプリケーションプロトロルで遊んでみました。

PostgreSQLにはpg_recvlogicalという論理レプリケーションをするクライアントプログラムが同梱されています。最後に作ったサンプルプログラムは、pg_recvlogicalをかなり単純化したものと言えます。