diff --git a/README.md b/README.md index 78f511a..047671d 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ progressor:put(#{ status => <<"running">>, aux_state => <<"state_data">> }, - action => #{set_timer => 1640995200} + action => {schedule, #{at => 1640995200000000, action => timeout}} } }). ``` @@ -277,8 +277,8 @@ process({TaskType, Args, Process}, Options, Context) -> % Опционально установить таймер case should_set_timer(Process, TaskType) of true -> - TimerTime = erlang:system_time(second) + 60, - {ok, Result#{action => #{set_timer => TimerTime}}}; + TimerTime = erlang:system_time(microsecond) + 60 * 1000000, + {ok, Result#{action => {schedule, #{at => TimerTime, action => timeout}}}}; false -> {ok, Result} end. diff --git a/benchmark/base_bench/src/base_bench_processor.erl b/benchmark/base_bench/src/base_bench_processor.erl index 71e3898..45353b1 100644 --- a/benchmark/base_bench/src/base_bench_processor.erl +++ b/benchmark/base_bench/src/base_bench_processor.erl @@ -7,19 +7,17 @@ process({init, Args, _Process}, _Opts, _Ctx) -> Result = #{ metadata => #{finish => Fin}, events => [event(1)], - action => #{set_timer => erlang:system_time(second)} + action => timeout }, {ok, Result}; %% process({timeout, _Args, #{history := History, metadata := Meta} = _Process}, _Opts, _Ctx) -> - %Random = rand:uniform(40), - %timer:sleep(60 + Random), #{finish := FinishTime} = Meta, - Action = case FinishTime > erlang:system_time(second) of - true -> #{set_timer => erlang:system_time(second)}; - false -> unset_timer - end, - %Action = #{set_timer => erlang:system_time(second)}, + Action = + case FinishTime > erlang:system_time(second) of + true -> timeout; + false -> suspend + end, NextId = erlang:length(History) + 1, Result = #{ events => [event(NextId)], @@ -31,7 +29,7 @@ process({call, _Args, #{history := History} = _Process}, _Opts, _Ctx) -> Result = #{ response => erlang:length(History), events => [], - action => unset_timer + action => suspend }, {ok, Result}. %% diff --git a/include/progressor.hrl b/include/progressor.hrl index 0a58de1..7b9c3d7 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -50,7 +50,10 @@ process_id := id(), task_id := task_id(), event_id := event_id(), - timestamp := timestamp_sec(), + %% Stored as timestamptz with microsecond precision; the unit is auto-detected + %% (prg_utils:split_timestamp/to_microseconds), so both seconds and microseconds + %% are accepted on write. + timestamp := timestamp_us(), metadata => #{format => pos_integer()}, payload := binary() }. @@ -157,6 +160,7 @@ -type processor_intent() :: #{ events := [event()], + %% отсутствие ключа = idle action => action(), response => term(), aux_state => binary(), @@ -186,7 +190,19 @@ %% (i.e., attempts are not exhausted, and the error is not marked as %% non-retryable). --type action() :: #{set_timer := timestamp_sec(), remove => true} | unset_timer. +-type scheduled_action() :: timeout | remove. + +-type schedule() :: #{ + %% Absolute unix time; unit is auto-detected on write (prg_utils:to_microseconds). + at := timestamp_us(), + action := scheduled_action() +}. + +-type action() :: + idle + | suspend + | scheduled_action() + | {schedule, schedule()}. -type task_result() :: #{ task_id := task_id(), diff --git a/src/prg_echo_processor.erl b/src/prg_echo_processor.erl index d96f9af..c8429fd 100644 --- a/src/prg_echo_processor.erl +++ b/src/prg_echo_processor.erl @@ -13,7 +13,7 @@ process({_, _, #{history := History} = _Process}, _Opts, _Ctx) -> Count -> Result = #{ events => [event(Count + 1)], - action => #{set_timer => erlang:system_time(second)} + action => timeout }, {ok, Result} end. diff --git a/src/prg_utils.erl b/src/prg_utils.erl index f8d9ea0..782d169 100644 --- a/src/prg_utils.erl +++ b/src/prg_utils.erl @@ -18,6 +18,7 @@ -export([to_seconds/1]). -export([split_timestamp/1]). -export([format_microseconds/1]). +-export([action_to_task_type/1]). -type time_unit() :: second | millisecond | microsecond. @@ -141,3 +142,9 @@ format_microseconds(Val) -> Bin = integer_to_binary(Val), Pad = 6 - byte_size(Bin), <<(binary:copy(<<"0">>, Pad))/binary, Bin/binary>>. + +-spec action_to_task_type(scheduled_action()) -> task_type(). +action_to_task_type(timeout) -> + <<"timeout">>; +action_to_task_type(remove) -> + <<"remove">>. diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 8939522..7a665d6 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -229,17 +229,21 @@ maybe_restore_history(_, State) -> State. handle_result_success(Intent, TaskHeader, Task, Deadline, State) -> - Action = maps:get(action, Intent, undefined), - case Action of - #{set_timer := _Timestamp} -> - success_and_continue(Intent, TaskHeader, Task, Deadline, State); - #{remove := true} -> - success_and_remove(Intent, TaskHeader, Task, Deadline, State); - unset_timer -> - success_and_suspend(Intent, TaskHeader, Task, Deadline, State); - undefined -> - success_and_unlock(Intent, TaskHeader, Task, Deadline, State) - end. + Action = maps:get(action, Intent, idle), + dispatch_action(Action, Intent, TaskHeader, Task, Deadline, State). + +dispatch_action(idle, Intent, TaskHeader, Task, Deadline, State) -> + success_and_unlock(Intent, TaskHeader, Task, Deadline, State); +dispatch_action(suspend, Intent, TaskHeader, Task, Deadline, State) -> + success_and_suspend(Intent, TaskHeader, Task, Deadline, State); +dispatch_action(remove, Intent, TaskHeader, Task, Deadline, State) -> + success_and_remove(Intent, TaskHeader, Task, Deadline, State); +dispatch_action(timeout, Intent, TaskHeader, Task, Deadline, State) -> + success_and_continue( + Intent, TaskHeader, Task, Deadline, State, timeout, erlang:system_time(microsecond) + ); +dispatch_action({schedule, #{at := Timestamp0, action := Action}}, Intent, TaskHeader, Task, Deadline, State) -> + success_and_continue(Intent, TaskHeader, Task, Deadline, State, Action, Timestamp0). handle_result_error(Result, {TaskType, _} = TaskHeader, Task, Deadline, State) when TaskType =:= timeout; @@ -253,8 +257,8 @@ handle_result_error(Result, {TaskType, _} = TaskHeader, Task, Deadline, State) w -> error_and_stop(Result, TaskHeader, Task, Deadline, State). -success_and_continue(Intent, TaskHeader, Task, Deadline, State) -> - #{action := #{set_timer := Timestamp0} = Action, events := Events} = Intent, +success_and_continue(Intent, TaskHeader, Task, Deadline, State, Action, Timestamp0) -> + #{events := Events} = Intent, #{context := Context} = Task, #prg_worker_state{ ns_id = NsId, @@ -269,7 +273,7 @@ success_and_continue(Intent, TaskHeader, Task, Deadline, State) -> TaskResult = task_result(Task, <<"finished">>, Response), NewTask = #{ process_id => ProcessId, - task_type => action_to_task_type(Action), + task_type => prg_utils:action_to_task_type(Action), status => create_status(Timestamp, Now), scheduled_time => Timestamp, context => Context, @@ -323,7 +327,7 @@ success_and_remove(Intent, TaskHeader, _Task, Deadline, State) -> State#prg_worker_state{process = undefined}. success_and_suspend(Intent, TaskHeader, Task, Deadline, State) -> - #{events := Events, action := unset_timer} = Intent, + #{events := Events} = Intent, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, @@ -718,12 +722,9 @@ is_retryable(Error, {timeout, undefined}, RetryPolicy, Timeout, Attempts) -> is_retryable(_Error, _TaskHeader, _RetryPolicy, _Timeout, _Attempts) -> false. -%% Due to the difference in the time scales used for storage (microseconds) -%% and the schedule time (seconds), the following logic is required: -%% - If the difference between the schedule and the current time is less than a ~1 second -%% the task is assigned the status "running" and is processed immediately -%% - If the difference between the schedule and the current time exceeds ~1 second -%% the task is assigned the status "waiting" and is saved to the schedule +%% Sub-second schedules are coerced to immediate execution: if the gap to +%% `scheduled_time` is below ~1s (scheduler overhead), the task stays `running` +%% instead of `waiting`. create_status(Timestamp, Now) when Timestamp =< Now -> <<"running">>; create_status(Timestamp, Now) -> @@ -746,11 +747,6 @@ create_header(#{task_type := <<"repair">>}) -> {repair, undefined}; create_header(#{task_type := <<"notify">>}) -> {notify, undefined}. -%% -action_to_task_type(#{remove := true}) -> - <<"remove">>; -action_to_task_type(#{set_timer := _}) -> - <<"timeout">>. last_event_id([]) -> 0; diff --git a/src/progressor.erl b/src/progressor.erl index 0351b10..723317e 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -373,7 +373,7 @@ do_put( #{ process_id := ProcessId } = Process, - Action = maps:get(action, Args, undefined), + Action = maps:get(action, Args, idle), Context = maps:get(context, Opts, <<>>), Now = erlang:system_time(microsecond), InitTask = #{ @@ -513,23 +513,26 @@ check_for_run(undefined) -> check_for_run(Pid) when is_pid(Pid) -> <<"running">>. -action_to_task(undefined, _ProcessId, _Ctx) -> +action_to_task(idle, _ProcessId, _Ctx) -> undefined; -action_to_task(unset_timer, _ProcessId, _Ctx) -> +action_to_task(suspend, _ProcessId, _Ctx) -> undefined; -action_to_task(#{set_timer := Timestamp} = Action, ProcessId, Context) -> - TaskType = - case maps:get(remove, Action, false) of - true -> <<"remove">>; - false -> <<"timeout">> - end, +action_to_task(remove, _ProcessId, _Ctx) -> + undefined; +action_to_task(timeout, ProcessId, Context) -> + action_to_task( + {schedule, #{at => erlang:system_time(microsecond), action => timeout}}, + ProcessId, + Context + ); +action_to_task({schedule, #{at := Timestamp0, action := Action}}, ProcessId, Context) -> #{ process_id => ProcessId, - task_type => TaskType, + task_type => prg_utils:action_to_task_type(Action), status => <<"waiting">>, args => <<>>, context => Context, - scheduled_time => prg_utils:to_microseconds(Timestamp), + scheduled_time => prg_utils:to_microseconds(Timestamp0), last_retry_interval => 0, attempts_count => 0 }. diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 8d30359..42c5cb2 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -39,6 +39,8 @@ -define(NS(C), proplists:get_value(ns_id, C, 'default/default')). -define(AWAIT_TIMEOUT(C), proplists:get_value(repl_timeout, C, 5)). +-define(TIMEOUT_IN(Sec), {schedule, #{at => erlang:system_time(microsecond) + (Sec) * 1000000, action => timeout}}). +-define(REMOVE_IN(Sec), {schedule, #{at => erlang:system_time(microsecond) + (Sec) * 1000000, action => remove}}). init_per_suite(Config) -> Config. @@ -844,7 +846,7 @@ put_process_with_timeout_test(C) -> status => <<"running">>, history => [event(1)] }, - action => #{set_timer => erlang:system_time(microsecond) + 1000000} + action => ?TIMEOUT_IN(1) }, {ok, ok} = progressor:put(#{ns => ?NS(C), id => Id, args => Args}), timer:sleep(?AWAIT_TIMEOUT(C)), @@ -942,7 +944,7 @@ put_process_with_remove_test(C) -> status => <<"running">>, history => [event(1)] }, - action => #{set_timer => erlang:system_time(microsecond) + 1000000, remove => true} + action => ?REMOVE_IN(1) }, {ok, ok} = progressor:put(#{ns => ?NS(C), id => Id, args => Args}), timer:sleep(?AWAIT_TIMEOUT(C)), @@ -987,7 +989,7 @@ mock_processor(simple_timers_test = TestCase) -> events => [event(1)], metadata => #{<<"k">> => <<"v">>}, %% postponed timer - action => #{set_timer => erlang:system_time(microsecond) + 2000000}, + action => ?TIMEOUT_IN(2), aux_state => erlang:term_to_binary(<<"aux_state1">>) }, Self ! 1, @@ -996,7 +998,7 @@ mock_processor(simple_timers_test = TestCase) -> Result = #{ events => [event(2)], %% continuation timer - action => #{set_timer => erlang:system_time(microsecond)}, + action => timeout, aux_state => erlang:term_to_binary(<<"aux_state2">>) }, Self ! 2, @@ -1017,7 +1019,7 @@ mock_processor(simple_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000} + action => ?TIMEOUT_IN(2) }, Self ! 1, {ok, Result}; @@ -1047,7 +1049,7 @@ mock_processor(reschedule_after_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000} + action => ?TIMEOUT_IN(2) }, Self ! 1, {ok, Result}; @@ -1106,7 +1108,7 @@ mock_processor(simple_call_with_range_test = TestCase) -> Result = #{ response => <<"response">>, events => [event(6)], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 3, {ok, Result}; @@ -1127,7 +1129,7 @@ mock_processor(call_replace_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000, remove => true} + action => ?REMOVE_IN(2) }, Self ! 1, {ok, Result}; @@ -1136,7 +1138,7 @@ mock_processor(call_replace_timer_test = TestCase) -> Result = #{ response => <<"response">>, events => [event(2), event(3)], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 2, {ok, Result}; @@ -1157,7 +1159,7 @@ mock_processor(call_unset_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000} + action => ?TIMEOUT_IN(2) }, Self ! 1, {ok, Result}; @@ -1166,7 +1168,7 @@ mock_processor(call_unset_timer_test = TestCase) -> Result = #{ response => <<"response">>, events => [], - action => unset_timer + action => suspend }, Self ! 2, {ok, Result}; @@ -1187,7 +1189,7 @@ mock_processor(postponed_call_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 1, {ok, Result}; @@ -1195,7 +1197,7 @@ mock_processor(postponed_call_test = TestCase) -> timer:sleep(3000), Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 2, {ok, Result}; @@ -1223,7 +1225,7 @@ mock_processor(postponed_call_to_suspended_process_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 1, {ok, Result}; @@ -1271,7 +1273,7 @@ mock_processor(simple_repair_after_non_retriable_error_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 1, {ok, Result}; @@ -1282,7 +1284,7 @@ mock_processor(simple_repair_after_non_retriable_error_test = TestCase) -> %% timeout via simple repair Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 3, {ok, Result}; @@ -1302,7 +1304,7 @@ mock_processor(repair_after_non_retriable_error_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 1, {ok, Result}; @@ -1331,7 +1333,7 @@ mock_processor(error_after_max_retries_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{set_timer => erlang:system_time(microsecond)} + action => timeout }, Self ! 1, {ok, Result}; @@ -1391,7 +1393,7 @@ mock_processor(remove_by_timer_test = TestCase) -> MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1), event(2)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000, remove => true} + action => ?REMOVE_IN(2) }, {ok, Result} end, @@ -1403,14 +1405,14 @@ mock_processor(remove_without_timer_test = TestCase) -> ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> Result = #{ events => [event(1)], - action => #{set_timer => erlang:system_time(microsecond) + 2000000} + action => ?TIMEOUT_IN(2) }, Self ! 1, {ok, Result}; ({timeout, <<>>, _Process}, _Opts, _Ctx) -> Result = #{ events => [], - action => #{remove => true} + action => remove }, Self ! 2, {ok, Result}