Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ progressor:put(#{
status => <<"running">>,
aux_state => <<"state_data">>
},
action => #{set_timer => 1640995200}
action => {schedule, #{at => 1640995200000000, action => timeout}}
}
}).
```
Expand Down Expand Up @@ -277,8 +277,8 @@ process({TaskType, Args, Process}, Options, Context) ->
% Опционально установить таймер
case should_set_timer(Process, TaskType) of
true ->
TimerTime = erlang:system_time(second) + 60,
{ok, Result#{action => #{set_timer => TimerTime}}};
TimerTime = erlang:system_time(microsecond) + 60 * 1000000,
{ok, Result#{action => {schedule, #{at => TimerTime, action => timeout}}}};
false ->
{ok, Result}
end.
Expand Down
16 changes: 7 additions & 9 deletions benchmark/base_bench/src/base_bench_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ process({init, Args, _Process}, _Opts, _Ctx) ->
Result = #{
metadata => #{finish => Fin},
events => [event(1)],
action => #{set_timer => erlang:system_time(second)}
action => timeout
},
{ok, Result};
%%
process({timeout, _Args, #{history := History, metadata := Meta} = _Process}, _Opts, _Ctx) ->
%Random = rand:uniform(40),
%timer:sleep(60 + Random),
#{finish := FinishTime} = Meta,
Action = case FinishTime > erlang:system_time(second) of
true -> #{set_timer => erlang:system_time(second)};
false -> unset_timer
end,
%Action = #{set_timer => erlang:system_time(second)},
Action =
case FinishTime > erlang:system_time(second) of
true -> timeout;
false -> suspend
end,
NextId = erlang:length(History) + 1,
Result = #{
events => [event(NextId)],
Expand All @@ -31,7 +29,7 @@ process({call, _Args, #{history := History} = _Process}, _Opts, _Ctx) ->
Result = #{
response => erlang:length(History),
events => [],
action => unset_timer
action => suspend
},
{ok, Result}.
%%
Expand Down
20 changes: 18 additions & 2 deletions include/progressor.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
process_id := id(),
task_id := task_id(),
event_id := event_id(),
timestamp := timestamp_sec(),
%% Stored as timestamptz with microsecond precision; the unit is auto-detected
%% (prg_utils:split_timestamp/to_microseconds), so both seconds and microseconds
%% are accepted on write.
timestamp := timestamp_us(),
metadata => #{format => pos_integer()},
payload := binary()
}.
Expand Down Expand Up @@ -157,6 +160,7 @@

-type processor_intent() :: #{
events := [event()],
%% отсутствие ключа = idle
action => action(),
response => term(),
aux_state => binary(),
Expand Down Expand Up @@ -186,7 +190,19 @@
%% (i.e., attempts are not exhausted, and the error is not marked as
%% non-retryable).

-type action() :: #{set_timer := timestamp_sec(), remove => true} | unset_timer.
-type scheduled_action() :: timeout | remove.

-type schedule() :: #{
%% Absolute unix time; unit is auto-detected on write (prg_utils:to_microseconds).
at := timestamp_us(),
action := scheduled_action()
}.

-type action() ::
idle
| suspend
| scheduled_action()
| {schedule, schedule()}.

-type task_result() :: #{
task_id := task_id(),
Expand Down
2 changes: 1 addition & 1 deletion src/prg_echo_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ process({_, _, #{history := History} = _Process}, _Opts, _Ctx) ->
Count ->
Result = #{
events => [event(Count + 1)],
action => #{set_timer => erlang:system_time(second)}
action => timeout
},
{ok, Result}
end.
Expand Down
7 changes: 7 additions & 0 deletions src/prg_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-export([to_seconds/1]).
-export([split_timestamp/1]).
-export([format_microseconds/1]).
-export([action_to_task_type/1]).

-type time_unit() :: second | millisecond | microsecond.

Expand Down Expand Up @@ -141,3 +142,9 @@ format_microseconds(Val) ->
Bin = integer_to_binary(Val),
Pad = 6 - byte_size(Bin),
<<(binary:copy(<<"0">>, Pad))/binary, Bin/binary>>.

-spec action_to_task_type(scheduled_action()) -> task_type().
action_to_task_type(timeout) ->
<<"timeout">>;
action_to_task_type(remove) ->
<<"remove">>.
48 changes: 22 additions & 26 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,21 @@ maybe_restore_history(_, State) ->
State.

handle_result_success(Intent, TaskHeader, Task, Deadline, State) ->
Action = maps:get(action, Intent, undefined),
case Action of
#{set_timer := _Timestamp} ->
success_and_continue(Intent, TaskHeader, Task, Deadline, State);
#{remove := true} ->
success_and_remove(Intent, TaskHeader, Task, Deadline, State);
unset_timer ->
success_and_suspend(Intent, TaskHeader, Task, Deadline, State);
undefined ->
success_and_unlock(Intent, TaskHeader, Task, Deadline, State)
end.
Action = maps:get(action, Intent, idle),
dispatch_action(Action, Intent, TaskHeader, Task, Deadline, State).

dispatch_action(idle, Intent, TaskHeader, Task, Deadline, State) ->
success_and_unlock(Intent, TaskHeader, Task, Deadline, State);
dispatch_action(suspend, Intent, TaskHeader, Task, Deadline, State) ->
success_and_suspend(Intent, TaskHeader, Task, Deadline, State);
dispatch_action(remove, Intent, TaskHeader, Task, Deadline, State) ->
success_and_remove(Intent, TaskHeader, Task, Deadline, State);
dispatch_action(timeout, Intent, TaskHeader, Task, Deadline, State) ->
success_and_continue(
Intent, TaskHeader, Task, Deadline, State, timeout, erlang:system_time(microsecond)
);
dispatch_action({schedule, #{at := Timestamp0, action := Action}}, Intent, TaskHeader, Task, Deadline, State) ->
success_and_continue(Intent, TaskHeader, Task, Deadline, State, Action, Timestamp0).

handle_result_error(Result, {TaskType, _} = TaskHeader, Task, Deadline, State) when
TaskType =:= timeout;
Expand All @@ -253,8 +257,8 @@ handle_result_error(Result, {TaskType, _} = TaskHeader, Task, Deadline, State) w
->
error_and_stop(Result, TaskHeader, Task, Deadline, State).

success_and_continue(Intent, TaskHeader, Task, Deadline, State) ->
#{action := #{set_timer := Timestamp0} = Action, events := Events} = Intent,
success_and_continue(Intent, TaskHeader, Task, Deadline, State, Action, Timestamp0) ->
#{events := Events} = Intent,
#{context := Context} = Task,
#prg_worker_state{
ns_id = NsId,
Expand All @@ -269,7 +273,7 @@ success_and_continue(Intent, TaskHeader, Task, Deadline, State) ->
TaskResult = task_result(Task, <<"finished">>, Response),
NewTask = #{
process_id => ProcessId,
task_type => action_to_task_type(Action),
task_type => prg_utils:action_to_task_type(Action),
status => create_status(Timestamp, Now),
scheduled_time => Timestamp,
context => Context,
Expand Down Expand Up @@ -323,7 +327,7 @@ success_and_remove(Intent, TaskHeader, _Task, Deadline, State) ->
State#prg_worker_state{process = undefined}.

success_and_suspend(Intent, TaskHeader, Task, Deadline, State) ->
#{events := Events, action := unset_timer} = Intent,
#{events := Events} = Intent,
#prg_worker_state{
ns_id = NsId,
ns_opts = #{storage := StorageOpts} = NsOpts,
Expand Down Expand Up @@ -718,12 +722,9 @@ is_retryable(Error, {timeout, undefined}, RetryPolicy, Timeout, Attempts) ->
is_retryable(_Error, _TaskHeader, _RetryPolicy, _Timeout, _Attempts) ->
false.

%% Due to the difference in the time scales used for storage (microseconds)
%% and the schedule time (seconds), the following logic is required:
%% - If the difference between the schedule and the current time is less than a ~1 second
%% the task is assigned the status "running" and is processed immediately
%% - If the difference between the schedule and the current time exceeds ~1 second
%% the task is assigned the status "waiting" and is saved to the schedule
%% Sub-second schedules are coerced to immediate execution: if the gap to
%% `scheduled_time` is below ~1s (scheduler overhead), the task stays `running`
%% instead of `waiting`.
create_status(Timestamp, Now) when Timestamp =< Now ->
<<"running">>;
create_status(Timestamp, Now) ->
Expand All @@ -746,11 +747,6 @@ create_header(#{task_type := <<"repair">>}) ->
{repair, undefined};
create_header(#{task_type := <<"notify">>}) ->
{notify, undefined}.
%%
action_to_task_type(#{remove := true}) ->
<<"remove">>;
action_to_task_type(#{set_timer := _}) ->
<<"timeout">>.

last_event_id([]) ->
0;
Expand Down
25 changes: 14 additions & 11 deletions src/progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ do_put(
#{
process_id := ProcessId
} = Process,
Action = maps:get(action, Args, undefined),
Action = maps:get(action, Args, idle),
Context = maps:get(context, Opts, <<>>),
Now = erlang:system_time(microsecond),
InitTask = #{
Expand Down Expand Up @@ -513,23 +513,26 @@ check_for_run(undefined) ->
check_for_run(Pid) when is_pid(Pid) ->
<<"running">>.

action_to_task(undefined, _ProcessId, _Ctx) ->
action_to_task(idle, _ProcessId, _Ctx) ->
undefined;
action_to_task(unset_timer, _ProcessId, _Ctx) ->
action_to_task(suspend, _ProcessId, _Ctx) ->
undefined;
action_to_task(#{set_timer := Timestamp} = Action, ProcessId, Context) ->
TaskType =
case maps:get(remove, Action, false) of
true -> <<"remove">>;
false -> <<"timeout">>
end,
action_to_task(remove, _ProcessId, _Ctx) ->
undefined;
action_to_task(timeout, ProcessId, Context) ->
action_to_task(
{schedule, #{at => erlang:system_time(microsecond), action => timeout}},
ProcessId,
Context
);
action_to_task({schedule, #{at := Timestamp0, action := Action}}, ProcessId, Context) ->
#{
process_id => ProcessId,
task_type => TaskType,
task_type => prg_utils:action_to_task_type(Action),
status => <<"waiting">>,
args => <<>>,
context => Context,
scheduled_time => prg_utils:to_microseconds(Timestamp),
scheduled_time => prg_utils:to_microseconds(Timestamp0),
last_retry_interval => 0,
attempts_count => 0
}.
Expand Down
Loading
Loading