Skip to content

valitydev/cdc_notifier

Repository files navigation

cdc_notifier

change data capture service

Общие сведения

cdc_notifier в текущей реализации не является унифицированным средством для захвата и ретрансляции изменений данных. Текущая реализация предоставляет только функциональность отслеживания обновлений и смены состояний жизненного цикла процессов, реализованных на основе progressor (https://github.com/valitydev/progressor). Захват изменений реализован на основе механизма WAL PostgreSQL. Приложение функционирует как логическая реплика для одного или нескольких экземпляров PostgreSQL. Захваченные обновления сериализуются в thrift в соответствии со спецификацией mg_proto (https://github.com/valitydev/machinegun-proto/blob/master/proto/event_sink.thrift, https://github.com/valitydev/machinegun-proto/blob/master/proto/lifecycle_sink.thrift) и передаются в Kafka.

Конфигурирование

PostgreSQL должен быть сконфигурирован с поддержкой логической репликации (wal_level=logical), должно быть сконфигурировано необходимой количество wal_senders с учётом всех потребителей (max_wal_senders >= max_replication_slots). У роли пользователя, от имени которого подключается CDC, должны быть необходимые права на работу с функциями репликации (ALTER ROLE $user WITH REPLICATION;).

Приложения-зависимости (brod, epg_connector) должны быть сконфигурированы в соответствии с собственной документацией. Пример минимально необходимоой конфигурации приведен в config/sys.config

cdc_notifier является umbrella приложением и может содержать в своём составе специфические реализации для захвата и трансляции обновлений. В текущей версии оно содержит только cdc_progressor.

Описание конфигурации cdc_progressor

[
  {cdc_progressor, [
          {streams, #{
              %% db_ref
              example_db => #{
              %%%% replication slot name
                  "cdc_slot_example" =>
              %%%%%%% publications config (kafka client, source namespace, destionation topics)
                      #{
                          %% source namespace_id = example
                          example => #{
                              %% kafka client config for publication
                              kafka_client => default_kafka_client,
                              eventsink_topic => <<"example_eventsink_topic">>,
                              lifecycle_topic => <<"example_lifecycle_topic">>
                          },
                          %% source namespace_id = invoice
                          invoice => #{
                              kafka_client => default_kafka_client,
                              eventsink_topic => <<"invoice_eventsink_topic">>,
                              lifecycle_topic => <<"invoice_lifecycle_topic">>
                          }
                      }
              }
          }},
        %% optional parameters
        {resend_timeout, 3000},
        {max_retries, 3},
        {reconnect_timeout, 5000}
  ]}
]

Здесь параметр streams описывает к каким БД какие слоты репликации необходимо создать (db_ref - не имя БД, а имя подключения (или иначе - ссылка на БД) в epg_connector, подробнее см. в секции databases https://github.com/valitydev/epg_connector?tab=readme-ov-file#database-and-pool-configuration). Каждому слоту соответствует свой набор неймспейсов (ключ = ID неймспейса progressor`а), а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован), топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика).

Рекомендуется создавать один слот репликации на одну БД. Слот представляет собой отдельный WAL log, поэтому их неоправданное размножение нежелательно. Все необходимые неймспейсы (публикации) должны быть перечислены внутри одного слота для соответствующей БД:

{streams, #{
    fistful_db => #{
        "cdc_slot_fistful" => #{
            'ff/withdrawal' => #{...},
            'ff/deposit' => #{...}
        }
    }
}}

Использование нескольких слотов на одну БД оправдано только при специфическом профиле нагрузки, например, когда высоконагруженный неймспейс создаёт задержки для критичных к скорости оповещений и его требуется вынести на отдельный экземпляр CDC. В общем случае достаточно одного слота.

Также cdc_progressor имеет опциональные параметры:

  • resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи
  • max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений будет пристановлен (соеднинение с БД закрыто, репликация остановлена, восстановление через reconnect_timeout)
  • reconnect_timeout - таймаут возобновления репликации после потери соединения или после достижения max_retries Значения по умолчанию приведены в примере выше.

Функционирование

Старт

  • создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг)
  • определение состава публикаций, которые будет читать слот (при каждом старте, в соответствии с текущим конфигом)
  • получение текущего LSN в PostgreSQL ("0/0" при первом запуске)
  • старт репликации с соответствующими LSN и составом публикаций

Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe. Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL.

Нормальный цикл работы

  • получение сообщения репликации от wal_reader
  • парсинг - конвертация в сообщение kafka
  • синхронная отправка сообщения в kafka
  • получение подтверждения успешной отправки сообщения в kafka
  • отправка подтверждения в wal_reader
  • wal_reader возвращает в PostgreSQL значение обработанного LSN
  • ожидание следующего сообщения репликации (в начало)

Сбой отправки сообщения в kafka

  • получение сообщения репликации от wal_reader
  • парсинг - конвертация в сообщение kafka
  • синхронная отправка сообщения в kafka
  • сбой отправки сообщения в kafka
  • достижение max_retries
  • остановка репликации
  • ожидание reconnect_timeout
  • старт репликации со значения LSN, следующего за последним подтверждённым
  • ожидание сообщения репликации (в начало)

Запуск/останов стримов (деплой и эксплуатация)

ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена ДО настройки стримов на cdc_progressor.

Порядок запуска стрима

  • создать неймспейс в отслеживаемом сервисе (добавить в конфиг progressor`а и перезапустить сервис)

  • вручную создать публикацию для неймспейса. имя публикации ДОЛЖНО совпадать с именем неймспейса, в публикацию должны быть включены таблица процессов и таблица событий. пример для неймспейса ff/withdrawal, на БД fistful необходимо выполнить команду:

    CREATE PUBLICATION "ff/withdeawal" FOR TABLE "ff/withdrawal_processes", "ff/withdrawal_events";
  • сконфигурировать стрим в конфиге cdc_progressor. пример:

    [
      {cdc_progressor, [
          {streams, #{
              fistful => #{
                  "cdc_slot_fistful" => #{
                      'ff/withdrawal' => #{
                          kafka_client => default_kafka_client,
                          eventsink_topic => <<"ff/withdrawal_eventsink_topic">>,
                          lifecycle_topic => <<"ff/withdrawal_lifecycle_topic">>
                      }
                  }
              }
          }}
      ]}
    ]
    

    здесь, fistful - ссылка (db_ref) на требуюемую БД, cdc_slot_fistful - имя слота репликации, ff/withdrawal (и остальные ключи этой мапы) - это имена публикаций, на которые подпишется соответсвующий слот

    примечание: не требуется создавать новый слот репликации под каждый неймспейс. Если слот к требуемой БД (например, "cdc_slot_fistful") уже существует, достаточно добавить описание нового неймспейса (стрима) внутрь существующего слота.

  • перезапустить сервис cdc_notifier. при перезапуске слот запустится с обновленным списком публикаций

    ВАЖНО: если при старте публикация не существует, то она будет проигнорирована и появится warning запись в логе "unknown publication: ~p"

Останов стрима

Удаление публикации должно происходить в обратном порядке.

ВАЖНО! Публикация не должна удаляться при работающем слоте репликации, который на неё подписана. Сначала нужно перезапустить слот с новым спископ публикаций (без удаляемой), потом выполняется удаление.

Порядок останова стрима и удаления публикации:

  • удалить стрим из конфигурации cdc_notifier
    [
      {cdc_progressor, [
          {streams, #{
              fistful => #{
                  "cdc_slot_fistful" => #{}
              }
          }}
      ]}
    ]
    
  • перезапустить сервис cdc_notifier
  • вручную удалить публикацию для неймспейса. для этого выполнить команду:
    DROP PUBLICATION "ff/withdeawal";

About

change data capture service

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors