Skip to content

Commit

Permalink
Merge pull request #4002 from esl/measured-sql-requests
Browse files Browse the repository at this point in the history
Add capability of measuring async sql execute requests
  • Loading branch information
NelsonVides authored Apr 14, 2023
2 parents be964c5 + 5ad1b66 commit 4df69e2
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 23 deletions.
39 changes: 20 additions & 19 deletions .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -295,23 +295,24 @@ commands:
restore_workspace:
steps:
- attach_workspace: {at: ~/project}
fetch_coverage_packages:
steps:
- run:
name: Install pip3 and codecov packages
command: |
tools/circle-install-packages.sh python3-pip && \
pip3 install codecov
run_coverage_analysis:
steps:
- fetch_coverage_packages
- run:
name: Coverage
when: on_success
command: |
echo "Success!"
$EXEC ./rebar3 codecov analyze
codecov --disable=gcov --env PRESET
# TODO migrate to a new codecov uploader, see - https://docs.codecov.com/docs/deprecated-uploader-migration-guide#python-uploader
# fetch_coverage_packages:
# steps:
# - run:
# name: Install pip3 and codecov packages
# command: |
# tools/circle-install-packages.sh python3-pip && \
# pip3 install codecov
# run_coverage_analysis:
# steps:
# - fetch_coverage_packages
# - run:
# name: Coverage
# when: on_success
# command: |
# echo "Success!"
# $EXEC ./rebar3 codecov analyze
# codecov --disable=gcov --env PRESET
run_small_tests:
steps:
- restore_workspace
Expand All @@ -323,7 +324,7 @@ commands:
name: Run Small Tests
command: |
$EXEC tools/test.sh -p small_tests -s true -e true
- run_coverage_analysis
# - run_coverage_analysis
- upload_results_to_aws
- publish_github_comment
run_docker_smoke_test:
Expand Down Expand Up @@ -650,7 +651,7 @@ jobs:
- store_test_results:
when: always
path: junit_report.xml
- run_coverage_analysis
# - run_coverage_analysis
- run:
name: Build Failed - Logs
when: on_fail
Expand Down
111 changes: 110 additions & 1 deletion big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ rdbms_queries_cases() ->
insert_batch_with_null_case,
test_cast_insert,
test_request_insert,
test_wrapped_request,
test_failed_wrapper,
test_request_transaction,
test_restart_transaction_with_execute,
test_restart_transaction_with_execute_eventually_passes,
test_failed_transaction_with_execute_wrapped,
test_failed_wrapper_transaction,
test_incremental_upsert,
arguments_from_two_tables].

Expand Down Expand Up @@ -102,7 +106,10 @@ init_per_testcase(CaseName, Config) ->

end_per_testcase(CaseName, Config)
when CaseName =:= test_restart_transaction_with_execute;
CaseName =:= test_restart_transaction_with_execute_eventually_passes ->
CaseName =:= test_restart_transaction_with_execute_eventually_passes;
CaseName =:= test_failed_transaction_with_execute_wrapped;
CaseName =:= test_failed_wrapper;
CaseName =:= test_failed_wrapper_transaction ->
rpc(mim(), meck, unload, []),
escalus:end_per_testcase(CaseName, Config);
end_per_testcase(test_incremental_upsert, Config) ->
Expand Down Expand Up @@ -387,6 +394,49 @@ test_request_insert(Config) ->
selected_to_sorted(SelectResult))
end, ok, #{name => request_queries}).

test_wrapped_request(Config) ->
% given
erase_table(Config),
sql_prepare(Config, insert_one, test_types, [unicode],
<<"INSERT INTO test_types(unicode) VALUES (?)">>),
rpc(mim(), mongoose_metrics, ensure_metric, [global, [test_metric], histogram]),
WrapperFun = fun(SqlExecute) ->
{Time, Result} = timer:tc(SqlExecute),
mongoose_metrics:update(global, [test_metric], Time),
Result
end,

% when
sql_execute_wrapped_request_and_wait_response(Config, insert_one, [<<"check1">>], WrapperFun),

% then
mongoose_helper:wait_until(
fun() ->
SelectResult = sql_query(Config, "SELECT unicode FROM test_types"),
?assertEqual({selected, [{<<"check1">>}]}, selected_to_sorted(SelectResult))
end, ok, #{name => request_queries}),

{ok, Metric} = rpc(mim(), mongoose_metrics, get_metric_value, [global, [test_metric]]),
MetricValue = proplists:get_value(mean, Metric),
?assert(MetricValue > 0).

test_failed_wrapper(Config) ->
% given
erase_table(Config),
sql_prepare(Config, insert_one, test_types, [unicode],
<<"INSERT INTO test_types(unicode) VALUES (?)">>),
ok = rpc(mim(), meck, new, [supervisor, [passthrough, no_link, unstick]]),
WrapperFun = fun(_SqlExecute) ->
error(wrapper_crashed)
end,

% when
Result = sql_execute_wrapped_request_and_wait_response(Config, insert_one, [<<"check1">>], WrapperFun),

% then
?assertEqual({reply,{error,wrapper_crashed}}, Result),
?assertEqual([], rpc(mim(), meck, history, [supervisor])).

test_request_transaction(Config) ->
erase_table(Config),
Queries = [<<"INSERT INTO test_types(unicode) VALUES ('check1')">>,
Expand Down Expand Up @@ -431,6 +481,45 @@ test_restart_transaction_with_execute_eventually_passes(Config) ->
called_times(3),
ok.

test_failed_transaction_with_execute_wrapped(Config) ->
% given
HostType = host_type(),
Pid = self(),
erase_table(Config),
prepare_insert_int8(Config),
ok = rpc(mim(), meck, new, [mongoose_rdbms_backend, [passthrough, no_link]]),
ok = rpc(mim(), meck, expect, [mongoose_rdbms_backend, execute, 4,
{error, simulated_db_error}]),
WrapperFun = fun(SqlExecute) ->
Pid ! msg_before,
Result = SqlExecute(),
Pid ! msg_after,
Result
end,

% when
F = fun() -> mongoose_rdbms:execute_wrapped_request(HostType, insert_int8, [2], WrapperFun) end,
{aborted, #{reason := simulated_db_error}} = sql_transaction(Config, F),

% then
check_not_received(msg_after).

test_failed_wrapper_transaction(Config) ->
% given
erase_table(Config),
prepare_insert_int8(Config),
ok = rpc(mim(), meck, new, [supervisor, [passthrough, no_link, unstick]]),
WrapperFun = fun(_SqlExecute) ->
error(wrapper_crashed)
end,

% when
F = fun() -> sql_execute_wrapped_request(Config, insert_one, [<<"check1">>], WrapperFun) end,
sql_transaction(Config, F),

% then
?assertEqual([], rpc(mim(), meck, history, [supervisor])).

prepare_insert_int8(Config) ->
Q = <<"INSERT INTO test_types(", (escape_column(<<"int8">>))/binary, ") VALUES (?)">>,
sql_prepare(Config, insert_int8, test_types, [int8], Q).
Expand Down Expand Up @@ -515,6 +604,16 @@ sql_query_cast(_Config, Query) ->
sql_execute_request(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute_request, [host_type(), Name, Parameters]).

sql_execute_wrapped_request(_Config, Name, Parameters, WrapperFun) ->
slow_rpc(mongoose_rdbms, execute_wrapped_request, [host_type(), Name, Parameters, WrapperFun]).

sql_execute_wrapped_request_and_wait_response(_Config, Name, Parameters, WrapperFun) ->
slow_rpc(?MODULE, execute_wrapped_request_and_wait_response, [host_type(), Name, Parameters, WrapperFun]).

execute_wrapped_request_and_wait_response(HostType, Name, Parameters, WrapperFun) ->
RequestId = mongoose_rdbms:execute_wrapped_request(HostType, Name, Parameters, WrapperFun),
gen_server:wait_response(RequestId, 100).

sql_execute_upsert(_Config, Name, Insert, Update, Unique) ->
slow_rpc(rdbms_queries, execute_upsert, [host_type(), Name, Insert, Update, Unique]).

Expand Down Expand Up @@ -1030,6 +1129,16 @@ slow_rpc(M, F, A) ->
Res
end.

check_not_received(Msg) ->
receive
Msg ->
error({msg_received, Msg});
_ ->
check_not_received(Msg)
after 0 ->
ok
end.

check_like_prep(Config, TextMap = #{text := TextValue,
matching := MatchingList,
not_matching := NotMatchingList}) ->
Expand Down
30 changes: 27 additions & 3 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
execute/3,
execute_cast/3,
execute_request/3,
execute_wrapped_request/4,
execute_successfully/3,
sql_query/2,
sql_query_cast/2,
Expand Down Expand Up @@ -128,6 +129,7 @@

-ignore_xref([sql_query_cast/2, sql_query_request/2,
execute_cast/3, execute_request/3,
execute_wrapped_request/4,
sql_transaction_request/2,
sql_query_t/1, use_escaped/1,
escape_like/1, escape_like_prefix/1, use_escaped_like/1,
Expand Down Expand Up @@ -159,10 +161,12 @@
-define(CONNECT_RETRIES, 5).

-type server() :: mongooseim:host_type() | global.
-type request_wrapper() :: fun((fun(() -> T)) -> T).
-type rdbms_msg() :: {sql_query, _}
| {sql_transaction, fun()}
| {sql_dirty, fun()}
| {sql_execute, atom(), [binary() | boolean() | integer()]}.
| {sql_execute, atom(), [binary() | boolean() | integer()]}
| {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
-type single_query_result() :: {selected, [tuple()]} |
{updated, non_neg_integer() | undefined} |
{updated, non_neg_integer(), [tuple()]} |
Expand Down Expand Up @@ -241,6 +245,15 @@ execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters)
execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_request(HostType, {sql_execute, Name, Parameters}).

-spec execute_wrapped_request(
HostType :: server(),
Name :: atom(),
Parameters :: [term()],
Wrapper :: request_wrapper()) -> request_id().
execute_wrapped_request(HostType, Name, Parameters, Wrapper)
when is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
sql_request(HostType, {sql_execute_wrapped, Name, Parameters, Wrapper}).

%% Same as execute/3, but would fail loudly on any error.
-spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
Expand Down Expand Up @@ -689,7 +702,16 @@ outer_op({sql_transaction, F}, State) ->
outer_op({sql_dirty, F}, State) ->
sql_dirty_internal(F, State);
outer_op({sql_execute, Name, Params}, State) ->
sql_execute(outer_op, Name, Params, State).
sql_execute(outer_op, Name, Params, State);
outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
try
Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
catch
_Class:Error ->
?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
statement_name => Name, wrapper_fun => Wrapper}),
{{error, Error}, State}
end.

%% @doc Called via sql_query/transaction/bloc from client code when inside a
%% nested operation
Expand All @@ -703,7 +725,9 @@ nested_op({sql_transaction, F}, State) ->
%% Transaction inside a transaction
inner_transaction(F, State);
nested_op({sql_execute, Name, Params}, State) ->
sql_execute(nested_op, Name, Params, State).
sql_execute(nested_op, Name, Params, State);
nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).

%% @doc Never retry nested transactions - only outer transactions
-spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
Expand Down

0 comments on commit 4df69e2

Please sign in to comment.