Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SERVICE_NAME=cdc_notifier
OTP_VERSION=27.1.2
REBAR_VERSION=3.24
OTP_VERSION=28.5.0
REBAR_VERSION=3.26
THRIFT_VERSION=0.14.2.3
CONFLUENT_PLATFORM_VERSION=7.2.15
116 changes: 85 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ cdc_notifier является umbrella приложением и может со
В текущей версии оно содержит только cdc_progressor.

Описание конфигурации cdc_progressor
```
```erlang
[
{cdc_progressor, [
{streams, #{
Expand Down Expand Up @@ -65,6 +65,23 @@ cdc_notifier является umbrella приложением и может со
а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован),
топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика).

Рекомендуется создавать один слот репликации на одну БД. Слот представляет собой отдельный WAL log, поэтому их неоправданное размножение нежелательно.
Все необходимые неймспейсы (публикации) должны быть перечислены внутри одного слота для соответствующей БД:
```erlang
{streams, #{
fistful_db => #{
"cdc_slot_fistful" => #{
'ff/withdrawal' => #{...},
'ff/deposit' => #{...}
}
}
}}
```

Использование нескольких слотов на одну БД оправдано только при специфическом профиле нагрузки,
например, когда высоконагруженный неймспейс создаёт задержки для критичных к скорости оповещений и
его требуется вынести на отдельный экземпляр CDC. В общем случае достаточно одного слота.

Также cdc_progressor имеет опциональные параметры:
- resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи
- max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений
Expand All @@ -76,10 +93,10 @@ cdc_notifier является umbrella приложением и может со

### Старт

- создание необходимых публикаций в БД, если еще не созданы (однократно, при первом запуске после добавления стрима в конфиг)
- создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг)
- определение состава публикаций, которые будет читать слот (при каждом старте, в соответствии с текущим конфигом)
- получение текущего LSN в PostgreSQL ("0/0" при первом запуске)
- старт репликации с соответствующим LSN
- старт репликации с соответствующими LSN и составом публикаций

Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe.
Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL.
Expand All @@ -106,36 +123,73 @@ cdc_notifier является umbrella приложением и может со
- старт репликации со значения LSN, следующего за последним подтверждённым
- ожидание сообщения репликации (в начало)

## Запуск стримов (деплой и эксплуатация)
## Запуск/останов стримов (деплой и эксплуатация)

ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций
при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена
ДО настройки стримов на cdc_progressor.

### Запуск стрима на существующем неймспейсе, в котором включены механизмы нотификации progressor

Основное допущение: сервисы-потребители топиков kafka корректно обрабатывают дубликаты сообщений

Порядок запуска:
- настроить параметры стриминга в cdc_progressor для требуемого неймспейса
- рестартануть cdc_notifier (с этого момента сообщения в соответствующих топиках "задваиваются")
- отключить механизмы нотификации progressor для неймспейса в соответствующем сервисе-процессоре
- рестартануть сервис-процессор (с этого момента остаётся единственный источник нотификций)

### Запуск стрима на существующем неймспейсе с отключенными механимзмами нотификации progressor

Порядок запуска:
- остановить трафик неймспейса (чтобы избежать неконсистентности данных в даунстриме)
- настроить параметры стриминга в cdc_progressor для требуемого неймспейса
- рестартануть cdc_notifier
- подать трафик нового неймспейса

### Запуск стрима на вновь создаваемом неймспейсе

Порядок запуска:
- трафик нового неймспейса не подавать до завершения настройки
- настроить неймспейс в progressor соответствующего сервиса-процессора
- рестартануть сервис-процессор
- настроить параметры стриминга для неймспейса в cdc_progressor
- рестартануть cdc_progressor
- подать трафик нового неймспейса
### Порядок запуска стрима

- создать неймспейс в отслеживаемом сервисе (добавить в конфиг progressor`а и перезапустить сервис)
- вручную создать публикацию для неймспейса. имя публикации ДОЛЖНО совпадать с именем неймспейса,
в публикацию должны быть включены таблица процессов и таблица событий. пример для неймспейса ff/withdrawal,
на БД fistful необходимо выполнить команду:
```SQL
CREATE PUBLICATION "ff/withdeawal" FOR TABLE "ff/withdrawal_processes", "ff/withdrawal_events";
```
- сконфигурировать стрим в конфиге cdc_progressor. пример:
```
[
{cdc_progressor, [
{streams, #{
fistful => #{
"cdc_slot_fistful" => #{
'ff/withdrawal' => #{
kafka_client => default_kafka_client,
eventsink_topic => <<"ff/withdrawal_eventsink_topic">>,
lifecycle_topic => <<"ff/withdrawal_lifecycle_topic">>
}
}
}
}}
]}
]
```
здесь, fistful - ссылка (db_ref) на требуюемую БД, cdc_slot_fistful - имя слота репликации,
ff/withdrawal (и остальные ключи этой мапы) - это имена публикаций, на которые подпишется соответсвующий слот

примечание: не требуется создавать новый слот репликации под каждый неймспейс. Если слот к требуемой БД
(например, "cdc_slot_fistful") уже существует, достаточно добавить описание нового неймспейса (стрима)
внутрь существующего слота.
- перезапустить сервис cdc_notifier. при перезапуске слот запустится с обновленным списком публикаций

ВАЖНО: если при старте публикация не существует, то она будет проигнорирована и появится warning запись в логе
"unknown publication: ~p"

### Останов стрима

Удаление публикации должно происходить в обратном порядке.

ВАЖНО! Публикация не должна удаляться при работающем слоте репликации, который на неё подписана.
Сначала нужно перезапустить слот с новым спископ публикаций (без удаляемой), потом выполняется удаление.

Порядок останова стрима и удаления публикации:

- удалить стрим из конфигурации cdc_notifier
```
[
{cdc_progressor, [
{streams, #{
fistful => #{
"cdc_slot_fistful" => #{}
}
}}
]}
]
```
- перезапустить сервис cdc_notifier
- вручную удалить публикацию для неймспейса. для этого выполнить команду:
```SQL
DROP PUBLICATION "ff/withdeawal";
```
18 changes: 18 additions & 0 deletions apps/cdc_progressor/src/cdc_prg_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-module(cdc_prg_utils).

-export([tables/1]).
-export([construct_table_name/2]).

-spec tables(atom()) -> map().
tables(NsId) ->
#{
processes => construct_table_name(NsId, "_processes"),
tasks => construct_table_name(NsId, "_tasks"),
schedule => construct_table_name(NsId, "_schedule"),
running => construct_table_name(NsId, "_running"),
events => construct_table_name(NsId, "_events")
}.

-spec construct_table_name(atom(), string()) -> string().
construct_table_name(NsId, Postfix) ->
"\"" ++ erlang:atom_to_list(NsId) ++ Postfix ++ "\"".
46 changes: 11 additions & 35 deletions apps/cdc_progressor/src/cdc_progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,19 @@ handle_replication_stop(Pid, ReplStop) ->

-spec init([any()]) -> {ok, state()}.
init([DbOpts, ReplSlot, Streams]) ->
_ = create_metrics(),
NsIDs = maps:keys(Streams),
{ok, Connection} = epgsql:connect(DbOpts),
Publications = lists:foldl(
fun(NsID, Acc) ->
{ok, PubName} = create_publication_if_not_exists(Connection, NsID),
_ = create_metrics(),
[PubName | Acc]
PubName = erlang:atom_to_list(NsID),
case is_publication_exists(Connection, PubName) of
true ->
[PubName | Acc];
false ->
logger:warning("unknown publication: ~p", [PubName]),
Acc
end
end,
[],
NsIDs
Expand Down Expand Up @@ -181,43 +187,13 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

create_publication_if_not_exists(Connection, NsID) ->
PubName = erlang:atom_to_list(NsID),
PubNameEscaped = "\"" ++ PubName ++ "\"",
#{
processes := ProcessesTable,
events := EventsTable
} = tables(NsID),
is_publication_exists(Connection, PubName) ->
{ok, _, [{IsPublicationExists}]} = epgsql:equery(
Connection,
"SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)",
[PubName]
),
case IsPublicationExists of
true ->
logger:info("publication ~p for tables ~p already exists", [PubName, [ProcessesTable, EventsTable]]),
{ok, PubName};
false ->
{ok, _, _} = epgsql:equery(
Connection,
"CREATE PUBLICATION " ++ PubNameEscaped ++
" FOR TABLE " ++ ProcessesTable ++ " , " ++ EventsTable
),
logger:info("publication ~p for tables ~p created", [PubName, [ProcessesTable, EventsTable]]),
{ok, PubName}
end.

tables(NsId) ->
#{
processes => construct_table_name(NsId, "_processes"),
tasks => construct_table_name(NsId, "_tasks"),
schedule => construct_table_name(NsId, "_schedule"),
running => construct_table_name(NsId, "_running"),
events => construct_table_name(NsId, "_events")
}.

construct_table_name(NsId, Postfix) ->
"\"" ++ erlang:atom_to_list(NsId) ++ Postfix ++ "\"".
IsPublicationExists.

-spec parse_repl_data(
[{Table :: binary(), Op :: insert | update | delete, Row :: map(), PrevRow :: map()}],
Expand Down
12 changes: 9 additions & 3 deletions apps/cdc_progressor/test/cdc_prg_base_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@
).

init_per_suite(Config) ->
Apps = [brod, epg_connector, progressor, cdc_progressor],
Apps = [brod, epg_connector, progressor],
lists:foreach(fun(Application) -> ok = cdc_prg_ct_helper:start_app(Application) end, Apps),
%% must be before cdc_progressor started (before replication started)
%% but after progressor (after progressor migrations)
ok = cdc_prg_ct_helper:create_publication(),
ok = cdc_prg_ct_helper:start_app(cdc_progressor),
Config.

end_per_suite(_Config) ->
Apps = lists:reverse([brod, epg_connector, progressor, cdc_progressor]),
lists:foreach(fun(Application) -> ok = cdc_prg_ct_helper:stop_app(Application) end, Apps),
%% must be after cdc_progressor stopped (after replication stoped)
ok = cdc_prg_ct_helper:delete_publication(),
ok.

init_per_group(_, C) ->
Expand Down Expand Up @@ -312,7 +318,7 @@ gen_id() ->
event(Id) ->
#{
event_id => Id,
timestamp => erlang:system_time(second),
timestamp => erlang:system_time(microsecond),
metadata => #{<<"format_version">> => 1},
%% msg_pack compatibility for mg_proto
payload => erlang:term_to_binary({bin, <<Id>>})
Expand Down Expand Up @@ -373,7 +379,7 @@ mock_processor(simple_success_test = TestCase) ->
Result = #{
response => <<"response">>,
events => [event(3)],
action => #{set_timer => erlang:system_time(second)}
action => timeout
},
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
Expand Down
15 changes: 8 additions & 7 deletions apps/cdc_progressor/test/cdc_prg_ct.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@
]).
-define(DEFAULT_EPG_CONF, [
{databases, #{
?EPG_DB_REF => #{
host => "postgres",
port => 5432,
database => "progressor_db",
username => "progressor",
password => "progressor"
}
?EPG_DB_REF => ?DEFAULT_DB_OPTS
}},
{pools, #{
?EPG_POOL => #{
Expand All @@ -56,6 +50,13 @@
}
}}
]).
-define(DEFAULT_DB_OPTS, #{
host => "postgres",
port => 5432,
database => "progressor_db",
username => "progressor",
password => "progressor"
}).

-define(DEFAULT_PRG_CONF, [
{namespaces, #{
Expand Down
63 changes: 63 additions & 0 deletions apps/cdc_progressor/test/cdc_prg_ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
-export([stop_app/1]).
-export([create_kafka_topics/0]).
-export([delete_kafka_topics/0]).
-export([create_publication/0]).
-export([create_publication/2]).
-export([delete_publication/0]).
-export([delete_publication/2]).

-define(TOPIC_CONFIG(Topic), #{
configs => [],
Expand Down Expand Up @@ -80,6 +84,65 @@ delete_kafka_topics() ->
delete_kafka_topics(Topics) ->
ok = brod:delete_topics(?BROKERS, Topics, 5000).

-spec create_publication() -> ok.
create_publication() ->
{ok, Connection} = epgsql:connect(?DEFAULT_DB_OPTS),
ok = create_publication(Connection, ?NAMESPACE),
ok = epgsql:close(Connection),
ok.

-spec create_publication(pid(), atom()) -> ok.
create_publication(Connection, NsID) ->
PubName = erlang:atom_to_list(NsID),
PubNameEscaped = "\"" ++ PubName ++ "\"",
#{
processes := ProcessesTable,
events := EventsTable
} = cdc_prg_utils:tables(NsID),
{ok, _, [{IsPublicationExists}]} = epgsql:equery(
Connection,
"SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)",
[PubName]
),
case IsPublicationExists of
true ->
ok;
false ->
{ok, _, _} = epgsql:equery(
Connection,
"CREATE PUBLICATION " ++ PubNameEscaped ++
" FOR TABLE " ++ ProcessesTable ++ " , " ++ EventsTable
),
ok
end.

-spec delete_publication() -> ok.
delete_publication() ->
{ok, Connection} = epgsql:connect(?DEFAULT_DB_OPTS),
ok = delete_publication(Connection, ?NAMESPACE),
ok = epgsql:close(Connection),
ok.

-spec delete_publication(pid(), atom()) -> ok.
delete_publication(Connection, NsID) ->
PubName = erlang:atom_to_list(NsID),
PubNameEscaped = "\"" ++ PubName ++ "\"",
{ok, _, [{IsPublicationExists}]} = epgsql:equery(
Connection,
"SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)",
[PubName]
),
case IsPublicationExists of
true ->
{ok, _, _} = epgsql:equery(
Connection,
"DROP PUBLICATION " ++ PubNameEscaped
),
ok;
false ->
ok
end.

%% Internal functions
load_app(AppName) ->
maybe
Expand Down
Loading
Loading