change data capture service
cdc_notifier в текущей реализации не является унифицированным средством для захвата и ретрансляции изменений данных. Текущая реализация предоставляет только функциональность отслеживания обновлений и смены состояний жизненного цикла процессов, реализованных на основе progressor (https://github.com/valitydev/progressor). Захват изменений реализован на основе механизма WAL PostgreSQL. Приложение функционирует как логическая реплика для одного или нескольких экземпляров PostgreSQL. Захваченные обновления сериализуются в thrift в соответствии со спецификацией mg_proto (https://github.com/valitydev/machinegun-proto/blob/master/proto/event_sink.thrift, https://github.com/valitydev/machinegun-proto/blob/master/proto/lifecycle_sink.thrift) и передаются в Kafka.
PostgreSQL должен быть сконфигурирован с поддержкой логической репликации (wal_level=logical), должно быть сконфигурировано необходимой количество wal_senders с учётом всех потребителей (max_wal_senders >= max_replication_slots). У роли пользователя, от имени которого подключается CDC, должны быть необходимые права на работу с функциями репликации (ALTER ROLE $user WITH REPLICATION;).
Приложения-зависимости (brod, epg_connector) должны быть сконфигурированы в соответствии с собственной документацией. Пример минимально необходимоой конфигурации приведен в config/sys.config
cdc_notifier является umbrella приложением и может содержать в своём составе специфические реализации для захвата и трансляции обновлений. В текущей версии оно содержит только cdc_progressor.
Описание конфигурации cdc_progressor
[
{cdc_progressor, [
{streams, #{
%% db_ref
example_db => #{
%%%% replication slot name
"cdc_slot_example" =>
%%%%%%% publications config (kafka client, source namespace, destionation topics)
#{
%% source namespace_id = example
example => #{
%% kafka client config for publication
kafka_client => default_kafka_client,
eventsink_topic => <<"example_eventsink_topic">>,
lifecycle_topic => <<"example_lifecycle_topic">>
},
%% source namespace_id = invoice
invoice => #{
kafka_client => default_kafka_client,
eventsink_topic => <<"invoice_eventsink_topic">>,
lifecycle_topic => <<"invoice_lifecycle_topic">>
}
}
}
}},
%% optional parameters
{resend_timeout, 3000},
{max_retries, 3},
{reconnect_timeout, 5000}
]}
]Здесь параметр streams описывает к каким БД какие слоты репликации необходимо создать (db_ref - не имя БД, а имя подключения (или иначе - ссылка на БД) в epg_connector, подробнее см. в секции databases https://github.com/valitydev/epg_connector?tab=readme-ov-file#database-and-pool-configuration). Каждому слоту соответствует свой набор неймспейсов (ключ = ID неймспейса progressor`а), а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован), топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика).
Рекомендуется создавать один слот репликации на одну БД. Слот представляет собой отдельный WAL log, поэтому их неоправданное размножение нежелательно. Все необходимые неймспейсы (публикации) должны быть перечислены внутри одного слота для соответствующей БД:
{streams, #{
fistful_db => #{
"cdc_slot_fistful" => #{
'ff/withdrawal' => #{...},
'ff/deposit' => #{...}
}
}
}}Использование нескольких слотов на одну БД оправдано только при специфическом профиле нагрузки, например, когда высоконагруженный неймспейс создаёт задержки для критичных к скорости оповещений и его требуется вынести на отдельный экземпляр CDC. В общем случае достаточно одного слота.
Также cdc_progressor имеет опциональные параметры:
- resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи
- max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений будет пристановлен (соеднинение с БД закрыто, репликация остановлена, восстановление через reconnect_timeout)
- reconnect_timeout - таймаут возобновления репликации после потери соединения или после достижения max_retries Значения по умолчанию приведены в примере выше.
- создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг)
- определение состава публикаций, которые будет читать слот (при каждом старте, в соответствии с текущим конфигом)
- получение текущего LSN в PostgreSQL ("0/0" при первом запуске)
- старт репликации с соответствующими LSN и составом публикаций
Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe. Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL.
- получение сообщения репликации от wal_reader
- парсинг - конвертация в сообщение kafka
- синхронная отправка сообщения в kafka
- получение подтверждения успешной отправки сообщения в kafka
- отправка подтверждения в wal_reader
- wal_reader возвращает в PostgreSQL значение обработанного LSN
- ожидание следующего сообщения репликации (в начало)
- получение сообщения репликации от wal_reader
- парсинг - конвертация в сообщение kafka
- синхронная отправка сообщения в kafka
- сбой отправки сообщения в kafka
- достижение max_retries
- остановка репликации
- ожидание reconnect_timeout
- старт репликации со значения LSN, следующего за последним подтверждённым
- ожидание сообщения репликации (в начало)
ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена ДО настройки стримов на cdc_progressor.
-
создать неймспейс в отслеживаемом сервисе (добавить в конфиг progressor`а и перезапустить сервис)
-
вручную создать публикацию для неймспейса. имя публикации ДОЛЖНО совпадать с именем неймспейса, в публикацию должны быть включены таблица процессов и таблица событий. пример для неймспейса ff/withdrawal, на БД fistful необходимо выполнить команду:
CREATE PUBLICATION "ff/withdeawal" FOR TABLE "ff/withdrawal_processes", "ff/withdrawal_events";
-
сконфигурировать стрим в конфиге cdc_progressor. пример:
[ {cdc_progressor, [ {streams, #{ fistful => #{ "cdc_slot_fistful" => #{ 'ff/withdrawal' => #{ kafka_client => default_kafka_client, eventsink_topic => <<"ff/withdrawal_eventsink_topic">>, lifecycle_topic => <<"ff/withdrawal_lifecycle_topic">> } } } }} ]} ]здесь, fistful - ссылка (db_ref) на требуюемую БД, cdc_slot_fistful - имя слота репликации, ff/withdrawal (и остальные ключи этой мапы) - это имена публикаций, на которые подпишется соответсвующий слот
примечание: не требуется создавать новый слот репликации под каждый неймспейс. Если слот к требуемой БД (например, "cdc_slot_fistful") уже существует, достаточно добавить описание нового неймспейса (стрима) внутрь существующего слота.
-
перезапустить сервис cdc_notifier. при перезапуске слот запустится с обновленным списком публикаций
ВАЖНО: если при старте публикация не существует, то она будет проигнорирована и появится warning запись в логе "unknown publication: ~p"
Удаление публикации должно происходить в обратном порядке.
ВАЖНО! Публикация не должна удаляться при работающем слоте репликации, который на неё подписана. Сначала нужно перезапустить слот с новым спископ публикаций (без удаляемой), потом выполняется удаление.
Порядок останова стрима и удаления публикации:
- удалить стрим из конфигурации cdc_notifier
[ {cdc_progressor, [ {streams, #{ fistful => #{ "cdc_slot_fistful" => #{} } }} ]} ] - перезапустить сервис cdc_notifier
- вручную удалить публикацию для неймспейса. для этого выполнить команду:
DROP PUBLICATION "ff/withdeawal";