diff --git a/.env b/.env index 6670bdc..6e44cff 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ SERVICE_NAME=cdc_notifier -OTP_VERSION=27.1.2 -REBAR_VERSION=3.24 +OTP_VERSION=28.5.0 +REBAR_VERSION=3.26 THRIFT_VERSION=0.14.2.3 CONFLUENT_PLATFORM_VERSION=7.2.15 diff --git a/README.md b/README.md index 3674bc5..d6f9d7b 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ cdc_notifier является umbrella приложением и может со В текущей версии оно содержит только cdc_progressor. Описание конфигурации cdc_progressor -``` +```erlang [ {cdc_progressor, [ {streams, #{ @@ -65,6 +65,23 @@ cdc_notifier является umbrella приложением и может со а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован), топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика). +Рекомендуется создавать один слот репликации на одну БД. Слот представляет собой отдельный WAL log, поэтому их неоправданное размножение нежелательно. +Все необходимые неймспейсы (публикации) должны быть перечислены внутри одного слота для соответствующей БД: +```erlang +{streams, #{ + fistful_db => #{ + "cdc_slot_fistful" => #{ + 'ff/withdrawal' => #{...}, + 'ff/deposit' => #{...} + } + } +}} +``` + +Использование нескольких слотов на одну БД оправдано только при специфическом профиле нагрузки, +например, когда высоконагруженный неймспейс создаёт задержки для критичных к скорости оповещений и +его требуется вынести на отдельный экземпляр CDC. В общем случае достаточно одного слота. + Также cdc_progressor имеет опциональные параметры: - resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи - max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений @@ -76,10 +93,10 @@ cdc_notifier является umbrella приложением и может со ### Старт -- создание необходимых публикаций в БД, если еще не созданы (однократно, при первом запуске после добавления стрима в конфиг) - создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг) +- определение состава публикаций, которые будет читать слот (при каждом старте, в соответствии с текущим конфигом) - получение текущего LSN в PostgreSQL ("0/0" при первом запуске) -- старт репликации с соответствующим LSN +- старт репликации с соответствующими LSN и составом публикаций Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe. Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL. @@ -106,36 +123,73 @@ cdc_notifier является umbrella приложением и может со - старт репликации со значения LSN, следующего за последним подтверждённым - ожидание сообщения репликации (в начало) -## Запуск стримов (деплой и эксплуатация) +## Запуск/останов стримов (деплой и эксплуатация) ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена ДО настройки стримов на cdc_progressor. -### Запуск стрима на существующем неймспейсе, в котором включены механизмы нотификации progressor - -Основное допущение: сервисы-потребители топиков kafka корректно обрабатывают дубликаты сообщений - -Порядок запуска: -- настроить параметры стриминга в cdc_progressor для требуемого неймспейса -- рестартануть cdc_notifier (с этого момента сообщения в соответствующих топиках "задваиваются") -- отключить механизмы нотификации progressor для неймспейса в соответствующем сервисе-процессоре -- рестартануть сервис-процессор (с этого момента остаётся единственный источник нотификций) - -### Запуск стрима на существующем неймспейсе с отключенными механимзмами нотификации progressor - -Порядок запуска: -- остановить трафик неймспейса (чтобы избежать неконсистентности данных в даунстриме) -- настроить параметры стриминга в cdc_progressor для требуемого неймспейса -- рестартануть cdc_notifier -- подать трафик нового неймспейса - -### Запуск стрима на вновь создаваемом неймспейсе - -Порядок запуска: -- трафик нового неймспейса не подавать до завершения настройки -- настроить неймспейс в progressor соответствующего сервиса-процессора -- рестартануть сервис-процессор -- настроить параметры стриминга для неймспейса в cdc_progressor -- рестартануть cdc_progressor -- подать трафик нового неймспейса +### Порядок запуска стрима + +- создать неймспейс в отслеживаемом сервисе (добавить в конфиг progressor`а и перезапустить сервис) +- вручную создать публикацию для неймспейса. имя публикации ДОЛЖНО совпадать с именем неймспейса, + в публикацию должны быть включены таблица процессов и таблица событий. пример для неймспейса ff/withdrawal, + на БД fistful необходимо выполнить команду: + ```SQL + CREATE PUBLICATION "ff/withdeawal" FOR TABLE "ff/withdrawal_processes", "ff/withdrawal_events"; + ``` +- сконфигурировать стрим в конфиге cdc_progressor. пример: + ``` + [ + {cdc_progressor, [ + {streams, #{ + fistful => #{ + "cdc_slot_fistful" => #{ + 'ff/withdrawal' => #{ + kafka_client => default_kafka_client, + eventsink_topic => <<"ff/withdrawal_eventsink_topic">>, + lifecycle_topic => <<"ff/withdrawal_lifecycle_topic">> + } + } + } + }} + ]} + ] + ``` + здесь, fistful - ссылка (db_ref) на требуюемую БД, cdc_slot_fistful - имя слота репликации, + ff/withdrawal (и остальные ключи этой мапы) - это имена публикаций, на которые подпишется соответсвующий слот + + примечание: не требуется создавать новый слот репликации под каждый неймспейс. Если слот к требуемой БД + (например, "cdc_slot_fistful") уже существует, достаточно добавить описание нового неймспейса (стрима) + внутрь существующего слота. +- перезапустить сервис cdc_notifier. при перезапуске слот запустится с обновленным списком публикаций + + ВАЖНО: если при старте публикация не существует, то она будет проигнорирована и появится warning запись в логе + "unknown publication: ~p" + +### Останов стрима + +Удаление публикации должно происходить в обратном порядке. + +ВАЖНО! Публикация не должна удаляться при работающем слоте репликации, который на неё подписана. +Сначала нужно перезапустить слот с новым спископ публикаций (без удаляемой), потом выполняется удаление. + +Порядок останова стрима и удаления публикации: + +- удалить стрим из конфигурации cdc_notifier + ``` + [ + {cdc_progressor, [ + {streams, #{ + fistful => #{ + "cdc_slot_fistful" => #{} + } + }} + ]} + ] + ``` +- перезапустить сервис cdc_notifier +- вручную удалить публикацию для неймспейса. для этого выполнить команду: + ```SQL + DROP PUBLICATION "ff/withdeawal"; + ``` diff --git a/apps/cdc_progressor/src/cdc_prg_utils.erl b/apps/cdc_progressor/src/cdc_prg_utils.erl new file mode 100644 index 0000000..973659f --- /dev/null +++ b/apps/cdc_progressor/src/cdc_prg_utils.erl @@ -0,0 +1,18 @@ +-module(cdc_prg_utils). + +-export([tables/1]). +-export([construct_table_name/2]). + +-spec tables(atom()) -> map(). +tables(NsId) -> + #{ + processes => construct_table_name(NsId, "_processes"), + tasks => construct_table_name(NsId, "_tasks"), + schedule => construct_table_name(NsId, "_schedule"), + running => construct_table_name(NsId, "_running"), + events => construct_table_name(NsId, "_events") + }. + +-spec construct_table_name(atom(), string()) -> string(). +construct_table_name(NsId, Postfix) -> + "\"" ++ erlang:atom_to_list(NsId) ++ Postfix ++ "\"". diff --git a/apps/cdc_progressor/src/cdc_progressor.erl b/apps/cdc_progressor/src/cdc_progressor.erl index f5c72bc..63f2019 100644 --- a/apps/cdc_progressor/src/cdc_progressor.erl +++ b/apps/cdc_progressor/src/cdc_progressor.erl @@ -76,13 +76,19 @@ handle_replication_stop(Pid, ReplStop) -> -spec init([any()]) -> {ok, state()}. init([DbOpts, ReplSlot, Streams]) -> + _ = create_metrics(), NsIDs = maps:keys(Streams), {ok, Connection} = epgsql:connect(DbOpts), Publications = lists:foldl( fun(NsID, Acc) -> - {ok, PubName} = create_publication_if_not_exists(Connection, NsID), - _ = create_metrics(), - [PubName | Acc] + PubName = erlang:atom_to_list(NsID), + case is_publication_exists(Connection, PubName) of + true -> + [PubName | Acc]; + false -> + logger:warning("unknown publication: ~p", [PubName]), + Acc + end end, [], NsIDs @@ -181,43 +187,13 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== -create_publication_if_not_exists(Connection, NsID) -> - PubName = erlang:atom_to_list(NsID), - PubNameEscaped = "\"" ++ PubName ++ "\"", - #{ - processes := ProcessesTable, - events := EventsTable - } = tables(NsID), +is_publication_exists(Connection, PubName) -> {ok, _, [{IsPublicationExists}]} = epgsql:equery( Connection, "SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)", [PubName] ), - case IsPublicationExists of - true -> - logger:info("publication ~p for tables ~p already exists", [PubName, [ProcessesTable, EventsTable]]), - {ok, PubName}; - false -> - {ok, _, _} = epgsql:equery( - Connection, - "CREATE PUBLICATION " ++ PubNameEscaped ++ - " FOR TABLE " ++ ProcessesTable ++ " , " ++ EventsTable - ), - logger:info("publication ~p for tables ~p created", [PubName, [ProcessesTable, EventsTable]]), - {ok, PubName} - end. - -tables(NsId) -> - #{ - processes => construct_table_name(NsId, "_processes"), - tasks => construct_table_name(NsId, "_tasks"), - schedule => construct_table_name(NsId, "_schedule"), - running => construct_table_name(NsId, "_running"), - events => construct_table_name(NsId, "_events") - }. - -construct_table_name(NsId, Postfix) -> - "\"" ++ erlang:atom_to_list(NsId) ++ Postfix ++ "\"". + IsPublicationExists. -spec parse_repl_data( [{Table :: binary(), Op :: insert | update | delete, Row :: map(), PrevRow :: map()}], diff --git a/apps/cdc_progressor/test/cdc_prg_base_SUITE.erl b/apps/cdc_progressor/test/cdc_prg_base_SUITE.erl index 148da18..d561b15 100644 --- a/apps/cdc_progressor/test/cdc_prg_base_SUITE.erl +++ b/apps/cdc_progressor/test/cdc_prg_base_SUITE.erl @@ -63,13 +63,19 @@ ). init_per_suite(Config) -> - Apps = [brod, epg_connector, progressor, cdc_progressor], + Apps = [brod, epg_connector, progressor], lists:foreach(fun(Application) -> ok = cdc_prg_ct_helper:start_app(Application) end, Apps), + %% must be before cdc_progressor started (before replication started) + %% but after progressor (after progressor migrations) + ok = cdc_prg_ct_helper:create_publication(), + ok = cdc_prg_ct_helper:start_app(cdc_progressor), Config. end_per_suite(_Config) -> Apps = lists:reverse([brod, epg_connector, progressor, cdc_progressor]), lists:foreach(fun(Application) -> ok = cdc_prg_ct_helper:stop_app(Application) end, Apps), + %% must be after cdc_progressor stopped (after replication stoped) + ok = cdc_prg_ct_helper:delete_publication(), ok. init_per_group(_, C) -> @@ -312,7 +318,7 @@ gen_id() -> event(Id) -> #{ event_id => Id, - timestamp => erlang:system_time(second), + timestamp => erlang:system_time(microsecond), metadata => #{<<"format_version">> => 1}, %% msg_pack compatibility for mg_proto payload => erlang:term_to_binary({bin, <>}) @@ -373,7 +379,7 @@ mock_processor(simple_success_test = TestCase) -> Result = #{ response => <<"response">>, events => [event(3)], - action => #{set_timer => erlang:system_time(second)} + action => timeout }, {ok, Result}; ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> diff --git a/apps/cdc_progressor/test/cdc_prg_ct.hrl b/apps/cdc_progressor/test/cdc_prg_ct.hrl index 93c54a6..3976871 100644 --- a/apps/cdc_progressor/test/cdc_prg_ct.hrl +++ b/apps/cdc_progressor/test/cdc_prg_ct.hrl @@ -41,13 +41,7 @@ ]). -define(DEFAULT_EPG_CONF, [ {databases, #{ - ?EPG_DB_REF => #{ - host => "postgres", - port => 5432, - database => "progressor_db", - username => "progressor", - password => "progressor" - } + ?EPG_DB_REF => ?DEFAULT_DB_OPTS }}, {pools, #{ ?EPG_POOL => #{ @@ -56,6 +50,13 @@ } }} ]). +-define(DEFAULT_DB_OPTS, #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" +}). -define(DEFAULT_PRG_CONF, [ {namespaces, #{ diff --git a/apps/cdc_progressor/test/cdc_prg_ct_helper.erl b/apps/cdc_progressor/test/cdc_prg_ct_helper.erl index 7a8d7c3..9deda37 100644 --- a/apps/cdc_progressor/test/cdc_prg_ct_helper.erl +++ b/apps/cdc_progressor/test/cdc_prg_ct_helper.erl @@ -7,6 +7,10 @@ -export([stop_app/1]). -export([create_kafka_topics/0]). -export([delete_kafka_topics/0]). +-export([create_publication/0]). +-export([create_publication/2]). +-export([delete_publication/0]). +-export([delete_publication/2]). -define(TOPIC_CONFIG(Topic), #{ configs => [], @@ -80,6 +84,65 @@ delete_kafka_topics() -> delete_kafka_topics(Topics) -> ok = brod:delete_topics(?BROKERS, Topics, 5000). +-spec create_publication() -> ok. +create_publication() -> + {ok, Connection} = epgsql:connect(?DEFAULT_DB_OPTS), + ok = create_publication(Connection, ?NAMESPACE), + ok = epgsql:close(Connection), + ok. + +-spec create_publication(pid(), atom()) -> ok. +create_publication(Connection, NsID) -> + PubName = erlang:atom_to_list(NsID), + PubNameEscaped = "\"" ++ PubName ++ "\"", + #{ + processes := ProcessesTable, + events := EventsTable + } = cdc_prg_utils:tables(NsID), + {ok, _, [{IsPublicationExists}]} = epgsql:equery( + Connection, + "SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)", + [PubName] + ), + case IsPublicationExists of + true -> + ok; + false -> + {ok, _, _} = epgsql:equery( + Connection, + "CREATE PUBLICATION " ++ PubNameEscaped ++ + " FOR TABLE " ++ ProcessesTable ++ " , " ++ EventsTable + ), + ok + end. + +-spec delete_publication() -> ok. +delete_publication() -> + {ok, Connection} = epgsql:connect(?DEFAULT_DB_OPTS), + ok = delete_publication(Connection, ?NAMESPACE), + ok = epgsql:close(Connection), + ok. + +-spec delete_publication(pid(), atom()) -> ok. +delete_publication(Connection, NsID) -> + PubName = erlang:atom_to_list(NsID), + PubNameEscaped = "\"" ++ PubName ++ "\"", + {ok, _, [{IsPublicationExists}]} = epgsql:equery( + Connection, + "SELECT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = $1)", + [PubName] + ), + case IsPublicationExists of + true -> + {ok, _, _} = epgsql:equery( + Connection, + "DROP PUBLICATION " ++ PubNameEscaped + ), + ok; + false -> + ok + end. + %% Internal functions load_app(AppName) -> maybe diff --git a/rebar.config b/rebar.config index 2299aef..9d3ec3f 100644 --- a/rebar.config +++ b/rebar.config @@ -2,8 +2,8 @@ {deps, [ {brod, "4.3.2"}, {cowboy, "2.12.0"}, - {prometheus, "6.1.2"}, - {prometheus_cowboy, "0.2.0"}, + {prometheus, "4.11.0"}, + {prometheus_cowboy, "0.1.9"}, {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}} @@ -60,7 +60,7 @@ ]}, {test, [ {deps, [ - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.19"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.25"}}}, {meck, "0.9.2"} ]}, {dialyzer, [ diff --git a/rebar.lock b/rebar.lock index 775cc1d..9eddb15 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,7 +8,6 @@ {<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.12.0">>},0}, {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.13.0">>},1}, {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.11">>},2}, - {<<"ddskerl">>,{pkg,<<"ddskerl">>,<<"0.4.3">>},1}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", {ref,"939a0d4ab3f7561a79b45381bbe13029d9263006"}}, @@ -24,9 +23,10 @@ {git,"https://github.com/valitydev/machinegun-proto.git", {ref,"cc2c27c30d30dc34c0c56fc7c7e96326d6bd6a14"}}, 0}, - {<<"prometheus">>,{pkg,<<"prometheus">>,<<"6.1.2">>},0}, - {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.2.0">>},0}, + {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, + {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.15">>},1}, + {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},1}, {<<"thrift">>, {git,"https://github.com/valitydev/thrift_erlang.git", @@ -39,13 +39,13 @@ {<<"cowboy">>, <<"F276D521A1FF88B2B9B4C54D0E753DA6C66DD7BE6C9FCA3D9418B561828A3731">>}, {<<"cowlib">>, <<"DB8F7505D8332D98EF50A3EF34B34C1AFDDEC7506E4EE4DD4A3A266285D282CA">>}, {<<"crc32cer">>, <<"B550DA6D615FEB72A882D15D020F8F7DEE72DFB2CB1BCDF3B1EE8DC2AFD68CFC">>}, - {<<"ddskerl">>, <<"BB97B90EEF1B5906520CEBDD38DA29B18A770B5567F6297927F53566EAE9BEBB">>}, {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, - {<<"prometheus">>, <<"E5C3C567CD8B0994425920763405635211183C15BDC47D0CD524807DDE20CA1D">>}, - {<<"prometheus_cowboy">>, <<"526F75D9850A9125496F78BCEECCA0F237BC7B403C976D44508543AE5967DAD9">>}, + {<<"prometheus">>, <<"B95F8DE8530F541BD95951E18E355A840003672E5EDA4788C5FA6183406BA29A">>}, + {<<"prometheus_cowboy">>, <<"D9D5B300516A61ED5AE31391F8EEEEB202230081D32A1813F2D78772B6F274E1">>}, {<<"prometheus_httpd">>, <<"8F767D819A5D36275EAB9264AFF40D87279151646776069BF69FBDBBD562BD75">>}, + {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>}]}, {pkg_hash_ext,[ {<<"accept">>, <<"CA69388943F5DAD2E7232A5478F16086E3C872F48E32B88B378E1885A59F5649">>}, @@ -53,12 +53,12 @@ {<<"cowboy">>, <<"8A7ABE6D183372CEB21CAA2709BEC928AB2B72E18A3911AA1771639BEF82651E">>}, {<<"cowlib">>, <<"E1E1284DC3FC030A64B1AD0D8382AE7E99DA46C3246B815318A4B848873800A4">>}, {<<"crc32cer">>, <<"A39B8F0B1990AC1BF06C3A247FC6A178B740CDFC33C3B53688DC7DD6B1855942">>}, - {<<"ddskerl">>, <<"4E0F6047C6A002CE38F6EA155276DD918CD635FD0A5EDB9E0B46AEB6F7AFF2C2">>}, {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, - {<<"prometheus">>, <<"109662001328112B860DE112D49B575310BE8FF33FE04BF0482EEC4B1E5A1278">>}, - {<<"prometheus_cowboy">>, <<"2C7EB12F4B970D91E3B47BAAD0F138F6ADC34E53EEB0AE18068FF0AFAB441B24">>}, + {<<"prometheus">>, <<"719862351AABF4DF7079B05DC085D2BBCBE3AC0AC3009E956671B1D5AB88247D">>}, + {<<"prometheus_cowboy">>, <<"5F71C039DEB9E9FF9DD6366BC74C907A463872B85286E619EFF0BDA15111695A">>}, {<<"prometheus_httpd">>, <<"67736D000745184D5013C58A63E947821AB90CB9320BC2E6AE5D3061C6FFE039">>}, + {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>}]} ].