From 774a8d40e94c52dc6686740c313f292419be3277 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Sun, 2 Dec 2018 10:10:45 -0700 Subject: [PATCH 1/5] wip: optional stat collector workers --- src/oc_stat.erl | 4 ++-- src/oc_stat_collector.erl | 46 +++++++++++++++++++++++++++++++++++++++ src/oc_stat_measure.erl | 25 +++++++++++++++------ src/opencensus_sup.erl | 17 ++++++++++++++- 4 files changed, 82 insertions(+), 10 deletions(-) create mode 100644 src/oc_stat_collector.erl diff --git a/src/oc_stat.erl b/src/oc_stat.erl index 03f709d..b8c736f 100644 --- a/src/oc_stat.erl +++ b/src/oc_stat.erl @@ -35,8 +35,8 @@ -define(RECORD(Tags, MeasureName, Value), begin - Module = oc_stat_measure:measure_module(MeasureName), - Module:record(Tags, Value), + Module = oc_stat_measure:record_module(MeasureName), + Module:record(MeasureName, Tags, Value), ok end). diff --git a/src/oc_stat_collector.erl b/src/oc_stat_collector.erl new file mode 100644 index 0000000..db9c047 --- /dev/null +++ b/src/oc_stat_collector.erl @@ -0,0 +1,46 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2018, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Worker process for sending recorded stats. +%% @end +%%%----------------------------------------------------------------------- +-module(oc_stat_collector). + +-export([start_link/0, + record/3]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +record(MeasureName, Tags, Value) -> + oc_stat_collector ! {record, MeasureName, Tags, Value}. + +init([]) -> + {ok, #{}}. + +handle_call(_, _, State) -> + {noreply, State}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info({record, MeasureName, Tags, Value}, State) -> + Module = oc_stat_measure:measure_module(MeasureName), + Module:record(MeasureName, Tags, Value), + {noreply, State}. diff --git a/src/oc_stat_measure.erl b/src/oc_stat_measure.erl index a30e543..ad03e49 100644 --- a/src/oc_stat_measure.erl +++ b/src/oc_stat_measure.erl @@ -38,6 +38,7 @@ %% codegen -export([measure_module/1, + record_module/1, module_name/1, maybe_module_name/1, regen_record/2, @@ -56,10 +57,11 @@ unit/0, measure/0]). --record(measure, {name :: name(), - module :: module(), - description :: description(), - unit :: unit()}). +-record(measure, {name :: name(), + module :: module(), + record_module :: module(), + description :: description(), + unit :: unit()}). -type name() :: atom() | binary() | string(). -type description() :: binary() | string(). @@ -79,9 +81,11 @@ %% @end -spec new(name(), description(), unit()) -> oc_stat_view:measure(). new(Name, Description, Unit) -> + RecordModule = application:get_env(opencensus, stat_record_module, oc_stat_measure:module_name(Name)), gen_server:call(oc_stat, {measure_register, #measure{name=Name, module=oc_stat_measure:module_name(Name), + record_module=RecordModule, description=Description, unit=Unit}}). %% @doc @@ -163,6 +167,13 @@ measure_module(Name) -> _ -> erlang:error({unknown_measure, Name}) end. +record_module(Name) -> + case ets:lookup(?MEASURES_TABLE, Name) of + [#measure{record_module=Module}] -> + Module; + _ -> erlang:error({unknown_measure, Name}) + end. + %% @private -spec module_name(name()) -> module(). module_name(Name) -> @@ -208,11 +219,11 @@ regen_module(ModuleName, RecordBody, Subs) -> 1}}, {attribute, 1, module, ModuleName}, {attribute, 1, export, - [{record, 2}]}, + [{record, 3}]}, {attribute, 1, export, [{subs, 0}]}, - {function, 1, record, 2, - [{clause, 1, [{var, 1, 'ContextTags'}, {var, 1, 'Value'}], [], + {function, 1, record, 3, + [{clause, 1, [{var, 1, '_MeasureName'}, {var, 1, 'ContextTags'}, {var, 1, 'Value'}], [], RecordBody ++ [{atom, 1, ok}] }]}, {function, 1, subs, 0, diff --git a/src/opencensus_sup.erl b/src/opencensus_sup.erl index 4879373..4ab8c21 100644 --- a/src/opencensus_sup.erl +++ b/src/opencensus_sup.erl @@ -32,6 +32,13 @@ start_link() -> init([]) -> ok = oc_sampler:init(application:get_env(opencensus, sampler, {oc_sampler_always, []})), + StatCollector = #{id => oc_stat_collector, + start => {oc_stat_collector, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [oc_stat_collector]}, + Reporter = #{id => oc_reporter, start => {oc_reporter, start_link, []}, restart => permanent, @@ -60,6 +67,14 @@ init([]) -> type => worker, modules => [oc_server]}, + Children = + case application:get_env(opencensus, stat_record_module, undefined) of + oc_stat_collector -> + [Reporter, Exporter, ViewServer, TraceServer, StatCollector]; + _ -> + [Reporter, Exporter, ViewServer, TraceServer] + end, + {ok, {#{strategy => one_for_one, intensity => 1, - period => 5}, [Reporter, Exporter, ViewServer, TraceServer]}}. + period => 5}, Children}}. From 5b81eb463af7067ecb1328da0292d3e21d8a69b6 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Sun, 2 Dec 2018 15:07:12 -0700 Subject: [PATCH 2/5] start a collector for each scheduler --- rebar.lock | 4 +-- src/oc_stat_collector.erl | 8 ++--- src/oc_stat_collectors.erl | 63 ++++++++++++++++++++++++++++++++++++++ src/opencensus_sup.erl | 18 +++++------ 4 files changed, 75 insertions(+), 18 deletions(-) create mode 100644 src/oc_stat_collectors.erl diff --git a/rebar.lock b/rebar.lock index 28955f9..db39064 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,12 +1,10 @@ {"1.1.0", -[{<<"counters">>,{pkg,<<"counters">>,<<"0.2.0">>},0}, - {<<"ctx">>,{pkg,<<"ctx">>,<<"0.4.1">>},0}, +[{<<"ctx">>,{pkg,<<"ctx">>,<<"0.4.1">>},0}, {<<"jsx">>,{pkg,<<"jsx">>,<<"2.9.0">>},0}, {<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.9.0">>},1}, {<<"wts">>,{pkg,<<"wts">>,<<"0.3.0">>},0}]}. [ {pkg_hash,[ - {<<"counters">>, <<"EF00F33404FDD9BD233F9B7966233222469E4560DBE1C712EA2E1AB63BB8FEFD">>}, {<<"ctx">>, <<"E4297DD25CCDE992BC7DE298F514BEACD0A44FAA9126A1F2567306D94C519A13">>}, {<<"jsx">>, <<"D2F6E5F069C00266CAD52FB15D87C428579EA4D7D73A33669E12679E203329DD">>}, {<<"rfc3339">>, <<"2075653DC9407541C84B1E15F8BDA2ABE95FB17C9694025E079583F2D19C1060">>}, diff --git a/src/oc_stat_collector.erl b/src/oc_stat_collector.erl index db9c047..ed35a2f 100644 --- a/src/oc_stat_collector.erl +++ b/src/oc_stat_collector.erl @@ -17,8 +17,7 @@ %%%----------------------------------------------------------------------- -module(oc_stat_collector). --export([start_link/0, - record/3]). +-export([start_link/0]). -export([init/1, handle_call/3, @@ -26,10 +25,7 @@ handle_info/2]). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -record(MeasureName, Tags, Value) -> - oc_stat_collector ! {record, MeasureName, Tags, Value}. + gen_server:start_link(?MODULE, [], []). init([]) -> {ok, #{}}. diff --git a/src/oc_stat_collectors.erl b/src/oc_stat_collectors.erl new file mode 100644 index 0000000..6968909 --- /dev/null +++ b/src/oc_stat_collectors.erl @@ -0,0 +1,63 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2018, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Worker process for sending recorded stats. +%% @end +%%%----------------------------------------------------------------------- +-module(oc_stat_collectors). + +-behaviour(gen_server). + +-export([start_link/0, + record/3]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +record(MeasureName, Tags, Value) -> + Workers = persistent_term:get(?MODULE), + element(erlang:system_info(scheduler_id), Workers) ! {record, MeasureName, Tags, Value}. + +init([]) -> + erlang:process_flag(trap_exit, true), + NumSchedulers = erlang:system_info(schedulers), + Workers = + lists:map(fun(_) -> + {ok, Pid} = oc_stat_collector:start_link(), + Pid + end, lists:seq(1, NumSchedulers)), + persistent_term:put(?MODULE, list_to_tuple(Workers)), + {ok, Workers}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info({'EXIT', FromPid, _Reason}, Workers) -> + case lists:member(FromPid, Workers) of + true -> + {ok, Pid} = oc_stat_collector:start_link(), + Workers1 = [Pid | lists:delete(FromPid, Workers)], + persistent_term:put(?MODULE, list_to_tuple(Workers1)), + {noreply, Workers1}; + false -> + {noreply, Workers} + end. diff --git a/src/opencensus_sup.erl b/src/opencensus_sup.erl index 4ab8c21..0112568 100644 --- a/src/opencensus_sup.erl +++ b/src/opencensus_sup.erl @@ -32,12 +32,12 @@ start_link() -> init([]) -> ok = oc_sampler:init(application:get_env(opencensus, sampler, {oc_sampler_always, []})), - StatCollector = #{id => oc_stat_collector, - start => {oc_stat_collector, start_link, []}, - restart => permanent, - shutdown => 1000, - type => worker, - modules => [oc_stat_collector]}, + StatCollectors = #{id => oc_stat_collectors, + start => {oc_stat_collectors, start_link, []}, + restart => permanent, + shutdown => 1000, + type => worker, + modules => [oc_stat_collectors]}, Reporter = #{id => oc_reporter, start => {oc_reporter, start_link, []}, @@ -68,9 +68,9 @@ init([]) -> modules => [oc_server]}, Children = - case application:get_env(opencensus, stat_record_module, undefined) of - oc_stat_collector -> - [Reporter, Exporter, ViewServer, TraceServer, StatCollector]; + case application:get_env(opencensus, stat_record_module, oc_stat_collectors) of + oc_stat_collectors -> + [Reporter, Exporter, ViewServer, TraceServer, StatCollectors]; _ -> [Reporter, Exporter, ViewServer, TraceServer] end, From ecb303d3447735934d70f24c27d16ad347550f07 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Mon, 3 Dec 2018 10:04:46 -0700 Subject: [PATCH 3/5] grab origin/master rebar.lock to get around use of _checkouts --- rebar.lock | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rebar.lock b/rebar.lock index db39064..28955f9 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,10 +1,12 @@ {"1.1.0", -[{<<"ctx">>,{pkg,<<"ctx">>,<<"0.4.1">>},0}, +[{<<"counters">>,{pkg,<<"counters">>,<<"0.2.0">>},0}, + {<<"ctx">>,{pkg,<<"ctx">>,<<"0.4.1">>},0}, {<<"jsx">>,{pkg,<<"jsx">>,<<"2.9.0">>},0}, {<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.9.0">>},1}, {<<"wts">>,{pkg,<<"wts">>,<<"0.3.0">>},0}]}. [ {pkg_hash,[ + {<<"counters">>, <<"EF00F33404FDD9BD233F9B7966233222469E4560DBE1C712EA2E1AB63BB8FEFD">>}, {<<"ctx">>, <<"E4297DD25CCDE992BC7DE298F514BEACD0A44FAA9126A1F2567306D94C519A13">>}, {<<"jsx">>, <<"D2F6E5F069C00266CAD52FB15D87C428579EA4D7D73A33669E12679E203329DD">>}, {<<"rfc3339">>, <<"2075653DC9407541C84B1E15F8BDA2ABE95FB17C9694025E079583F2D19C1060">>}, From d1cd525e06993d30fb50739e63fe0a277650817b Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Mon, 3 Dec 2018 10:18:00 -0700 Subject: [PATCH 4/5] only update workers once in case of a flood of EXITs --- src/oc_stat_collectors.erl | 43 +++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/src/oc_stat_collectors.erl b/src/oc_stat_collectors.erl index 6968909..5b0ff90 100644 --- a/src/oc_stat_collectors.erl +++ b/src/oc_stat_collectors.erl @@ -27,6 +27,9 @@ handle_cast/2, handle_info/2]). +-record(state, {workers :: [], + num_workers :: integer()}). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -43,7 +46,8 @@ init([]) -> Pid end, lists:seq(1, NumSchedulers)), persistent_term:put(?MODULE, list_to_tuple(Workers)), - {ok, Workers}. + {ok, #state{workers=Workers, + num_workers=NumSchedulers}}. handle_call(_, _From, State) -> {noreply, State}. @@ -51,13 +55,32 @@ handle_call(_, _From, State) -> handle_cast(_, State) -> {noreply, State}. -handle_info({'EXIT', FromPid, _Reason}, Workers) -> - case lists:member(FromPid, Workers) of - true -> - {ok, Pid} = oc_stat_collector:start_link(), - Workers1 = [Pid | lists:delete(FromPid, Workers)], - persistent_term:put(?MODULE, list_to_tuple(Workers1)), - {noreply, Workers1}; - false -> - {noreply, Workers} +handle_info({'EXIT', FromPid, _Reason}, State=#state{workers=Workers}) -> + %% hopefully this goes behind any other immediate EXITs in the mailbox + %% so we only update once if more than 1 crash at the same time + self() ! update_workers, + {noreply, State#state{workers=lists:delete(FromPid, Workers)}}; +handle_info(update_workers, State=#state{workers=Workers, + num_workers=NumWorkers}) -> + flush_update_workers(), + case NumWorkers - length(Workers) of + N when N > 0 -> + Workers1 = + lists:map(fun(_) -> + {ok, Pid} = oc_stat_collector:start_link(), + Pid + end, lists:seq(1, N)), + persistent_term:put(?MODULE, list_to_tuple(Workers1++Workers)), + {noreply, State#state{workers=Workers1}}; + _ -> + {noreply, State} + end. + +flush_update_workers() -> + receive + update_workers -> + flush_update_workers() + after + 0 -> + ok end. From de37fc236febfd7d3c79b14d33974a3f09474176 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Mon, 3 Dec 2018 10:21:17 -0700 Subject: [PATCH 5/5] update doc description of oc_stat_collectors --- src/oc_stat_collectors.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/oc_stat_collectors.erl b/src/oc_stat_collectors.erl index 5b0ff90..ab50a9f 100644 --- a/src/oc_stat_collectors.erl +++ b/src/oc_stat_collectors.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %% -%% @doc Worker process for sending recorded stats. +%% @doc Starts and monitors stat collecting worker processes. %% @end %%%----------------------------------------------------------------------- -module(oc_stat_collectors).