@@ -9,251 +9,109 @@ defmodule QuantumStoragePersistentEts do
99
1010 alias __MODULE__ . State
1111
12- @ server __MODULE__
13-
1412 @ behaviour Quantum.Storage
1513
1614 @ doc false
1715 def start_link ( opts ) ,
18- do: GenServer . start_link ( __MODULE__ , opts , name: Keyword . get ( opts , :name , @ server ) )
16+ do: GenServer . start_link ( __MODULE__ , opts , opts )
1917
2018 @ doc false
2119 @ impl GenServer
22- def init ( opts ) , do: { :ok , % State { schedulers: % { } , name: Keyword . get ( opts , :name , @ server ) } }
20+ def init ( opts ) do
21+ table_name =
22+ opts
23+ |> Keyword . fetch! ( :name )
24+ |> Module . concat ( Table )
25+
26+ path =
27+ Application . app_dir (
28+ :quantum_storage_persistent_ets ,
29+ "priv/tables/#{ table_name } .tab"
30+ )
31+
32+ File . mkdir_p! ( Path . dirname ( path ) )
33+
34+ table =
35+ PersistentEts . new ( table_name , path , [
36+ :named_table ,
37+ :ordered_set ,
38+ :protected
39+ ] )
2340
24- @ doc false
25- @ impl Quantum.Storage
26- def jobs ( server \\ @ server , scheduler_module ) ,
27- do: GenServer . call ( server , { :jobs , scheduler_module } )
41+ { :ok , % State { table: table } }
42+ end
2843
2944 @ doc false
3045 @ impl Quantum.Storage
31- def add_job ( server \\ @ server , scheduler_module , job ) ,
32- do: GenServer . call ( server , { :add_job , scheduler_module , job } )
46+ def jobs ( storage_pid ) , do: GenServer . call ( storage_pid , :jobs )
3347
3448 @ doc false
3549 @ impl Quantum.Storage
36- def delete_job ( server \\ @ server , scheduler_module , job_name ) ,
37- do: GenServer . call ( server , { :delete_job , scheduler_module , job_name } )
50+ def add_job ( storage_pid , job ) , do: GenServer . call ( storage_pid , { :add_job , job } )
3851
3952 @ doc false
4053 @ impl Quantum.Storage
41- def update_job_state ( server \\ @ server , scheduler_module , job_name , state ) ,
42- do: GenServer . call ( server , { :update_job_state , scheduler_module , job_name , state } )
54+ def delete_job ( storage_pid , job_name ) , do: GenServer . call ( storage_pid , { :delete_job , job_name } )
4355
4456 @ doc false
4557 @ impl Quantum.Storage
46- def last_execution_date ( server \\ @ server , scheduler_module ) ,
47- do: GenServer . call ( server , { :last_execution_date , scheduler_module } )
58+ def update_job_state ( storage_pid , job_name , state ) ,
59+ do: GenServer . call ( storage_pid , { :update_job_state , job_name , state } )
4860
4961 @ doc false
5062 @ impl Quantum.Storage
51- def update_last_execution_date ( server \\ @ server , scheduler_module , last_execution_date ) ,
52- do:
53- GenServer . call ( server , { :update_last_execution_date , scheduler_module , last_execution_date } )
63+ def last_execution_date ( storage_pid ) , do: GenServer . call ( storage_pid , :last_execution_date )
5464
5565 @ doc false
5666 @ impl Quantum.Storage
57- def purge ( server \\ @ server , scheduler_module ) ,
58- do: GenServer . call ( server , { :purge , scheduler_module } )
67+ def update_last_execution_date ( storage_pid , last_execution_date ) ,
68+ do: GenServer . call ( storage_pid , { :update_last_execution_date , last_execution_date } )
5969
6070 @ doc false
61- def purge_all ( server \\ @ server ) , do: GenServer . call ( server , :purge_all )
71+ @ impl Quantum.Storage
72+ def purge ( storage_pid ) , do: GenServer . call ( storage_pid , :purge )
6273
6374 @ doc false
6475 @ impl GenServer
65- def handle_call (
66- { :add_job , scheduler_module , job } ,
67- _from ,
68- % State { schedulers: schedulers , name: name } = state
69- ) do
70- {
71- :reply ,
72- do_add_job ( name , scheduler_module , job ) ,
73- % {
74- state
75- | schedulers:
76- schedulers
77- |> Map . put_new_lazy ( scheduler_module , fn ->
78- create_scheduler_module_atom ( name , scheduler_module )
79- end )
80- }
81- }
82- end
83-
84- def handle_call (
85- { :jobs , scheduler_module } ,
86- _from ,
87- % State { schedulers: schedulers , name: name } = state
88- ) do
89- {
90- :reply ,
91- do_get_jobs ( name , scheduler_module ) ,
92- % {
93- state
94- | schedulers:
95- schedulers
96- |> Map . put_new_lazy ( scheduler_module , fn ->
97- create_scheduler_module_atom ( name , scheduler_module )
98- end )
99- }
100- }
76+ def handle_call ( { :add_job , job } , _from , % State { table: table } = state ) do
77+ { :reply , do_add_job ( table , job ) , state }
10178 end
10279
103- def handle_call (
104- { :delete_job , scheduler_module , job } ,
105- _from ,
106- % State { schedulers: schedulers , name: name } = state
107- ) do
108- {
109- :reply ,
110- do_delete_job ( name , scheduler_module , job ) ,
111- % {
112- state
113- | schedulers:
114- schedulers
115- |> Map . put_new_lazy ( scheduler_module , fn ->
116- create_scheduler_module_atom ( name , scheduler_module )
117- end )
118- }
119- }
80+ def handle_call ( :jobs , _from , % State { table: table } = state ) do
81+ { :reply , do_get_jobs ( table ) , state }
12082 end
12183
122- def handle_call (
123- { :update_job_state , scheduler_module , job_name , job_state } ,
124- _from ,
125- % State { schedulers: schedulers , name: name } = state
126- ) do
127- {
128- :reply ,
129- do_update_job_state ( name , scheduler_module , job_name , job_state ) ,
130- % {
131- state
132- | schedulers:
133- schedulers
134- |> Map . put_new_lazy ( scheduler_module , fn ->
135- create_scheduler_module_atom ( name , scheduler_module )
136- end )
137- }
138- }
84+ def handle_call ( { :delete_job , job } , _from , % State { table: table } = state ) do
85+ { :reply , do_delete_job ( table , job ) , state }
13986 end
14087
141- def handle_call (
142- { :last_execution_date , scheduler_module } ,
143- _from ,
144- % State { schedulers: schedulers , name: name } = state
145- ) do
146- {
147- :reply ,
148- do_get_last_execution_date ( name , scheduler_module ) ,
149- % {
150- state
151- | schedulers:
152- schedulers
153- |> Map . put_new_lazy ( scheduler_module , fn ->
154- create_scheduler_module_atom ( name , scheduler_module )
155- end )
156- }
157- }
88+ def handle_call ( { :update_job_state , job_name , job_state } , _from , % State { table: table } = state ) do
89+ { :reply , do_update_job_state ( table , job_name , job_state ) , state }
15890 end
15991
160- def handle_call (
161- { :update_last_execution_date , scheduler_module , last_execution_date } ,
162- _from ,
163- % State { schedulers: schedulers , name: name } = state
164- ) do
165- {
166- :reply ,
167- do_update_last_execution_date ( name , scheduler_module , last_execution_date ) ,
168- % {
169- state
170- | schedulers:
171- schedulers
172- |> Map . put_new_lazy ( scheduler_module , fn ->
173- create_scheduler_module_atom ( name , scheduler_module )
174- end )
175- }
176- }
92+ def handle_call ( :last_execution_date , _from , % State { table: table } = state ) do
93+ { :reply , do_get_last_execution_date ( table ) , state }
17794 end
17895
17996 def handle_call (
180- { :purge , scheduler_module } ,
97+ { :update_last_execution_date , last_execution_date } ,
18198 _from ,
182- % State { schedulers: schedulers , name: name } = state
99+ % State { table: table } = state
183100 ) do
184- {
185- :reply ,
186- do_purge ( name , scheduler_module ) ,
187- % {
188- state
189- | schedulers:
190- schedulers
191- |> Map . put_new_lazy ( scheduler_module , fn ->
192- create_scheduler_module_atom ( name , scheduler_module )
193- end )
194- }
195- }
196- end
197-
198- def handle_call ( :purge_all , _from , % State { schedulers: schedulers , name: name } = state ) do
199- schedulers |> Map . values ( ) |> Enum . each ( fn scheduler -> :ok = do_purge ( name , scheduler ) end )
200- { :reply , :ok , state }
101+ { :reply , do_update_last_execution_date ( table , last_execution_date ) , state }
201102 end
202103
203- defp create_scheduler_module_atom ( storage_name , scheduler_module ) do
204- Module . concat ( storage_name , scheduler_module )
104+ def handle_call ( :purge , _from , % State { table: table } = state ) do
105+ { :reply , do_purge ( table ) , state }
205106 end
206107
207108 defp job_key ( job_name ) do
208109 { :job , job_name }
209110 end
210111
211- defp get_ets_by_scheduler ( storage_name , scheduler_module ) do
212- scheduler_module_atom = create_scheduler_module_atom ( storage_name , scheduler_module )
213-
214- if ets_exist? ( scheduler_module_atom ) do
215- scheduler_module_atom
216- else
217- path =
218- Application . app_dir (
219- :quantum_storage_persistent_ets ,
220- "priv/tables/#{ scheduler_module_atom } .tab"
221- )
222-
223- File . mkdir_p! ( Path . dirname ( path ) )
224-
225- PersistentEts . new ( scheduler_module_atom , path , [
226- :named_table ,
227- :ordered_set
228- ] )
229- end
230- end
231-
232- defp ets_exist? ( ets_name ) do
233- Logger . debug ( fn ->
234- "[#{ inspect ( Node . self ( ) ) } ][#{ __MODULE__ } ] Determining whether ETS table with name [#{
235- inspect ( ets_name )
236- } ] exists"
237- end )
238-
239- result =
240- case :ets . info ( ets_name ) do
241- :undefined -> false
242- _ -> true
243- end
244-
245- Logger . debug ( fn ->
246- "[#{ inspect ( Node . self ( ) ) } ][#{ __MODULE__ } ] ETS table with name [#{ inspect ( ets_name ) } ] #{
247- if result , do: ~S| exists| , else: ~S| does not exist|
248- } "
249- end )
250-
251- result
252- end
253-
254- defp do_add_job ( storage_name , scheduler_module , job ) do
255- table = get_ets_by_scheduler ( storage_name , scheduler_module )
112+ defp do_add_job ( table , job ) do
256113 :ets . insert ( table , entry = { job_key ( job . name ) , job } )
114+ :ets . insert ( table , { :init_jobs } )
257115
258116 Logger . debug ( fn ->
259117 "[#{ inspect ( Node . self ( ) ) } ][#{ __MODULE__ } ] inserting [#{ inspect ( entry ) } ] into Persistent ETS table [#{
@@ -264,31 +122,27 @@ defmodule QuantumStoragePersistentEts do
264122 :ok
265123 end
266124
267- defp do_get_jobs ( storage_name , scheduler_module ) do
268- storage_name
269- |> create_scheduler_module_atom ( scheduler_module )
270- |> ets_exist? ( )
271- |> if do
272- storage_name
273- |> get_ets_by_scheduler ( scheduler_module )
274- |> :ets . match ( { { :job , :_ } , :"$1" } )
275- |> List . flatten ( )
276- else
277- :not_applicable
125+ defp do_get_jobs ( table ) do
126+ table
127+ |> :ets . lookup ( :init_jobs )
128+ |> case do
129+ [ { :init_jobs } ] ->
130+ table
131+ |> :ets . match ( { { :job , :_ } , :"$1" } )
132+ |> List . flatten ( )
133+
134+ [ ] ->
135+ :not_applicable
278136 end
279137 end
280138
281- defp do_delete_job ( storage_name , scheduler_module , job_name ) do
282- storage_name
283- |> get_ets_by_scheduler ( scheduler_module )
284- |> :ets . delete ( job_key ( job_name ) )
139+ defp do_delete_job ( table , job_name ) do
140+ :ets . delete ( table , job_key ( job_name ) )
285141
286142 :ok
287143 end
288144
289- defp do_update_job_state ( storage_name , scheduler_module , job_name , state ) do
290- table = get_ets_by_scheduler ( storage_name , scheduler_module )
291-
145+ defp do_update_job_state ( table , job_name , state ) do
292146 table
293147 |> :ets . lookup ( job_key ( job_name ) )
294148 |> Enum . map ( & { elem ( & 1 , 0 ) , % { elem ( & 1 , 1 ) | state: state } } )
@@ -297,24 +151,21 @@ defmodule QuantumStoragePersistentEts do
297151 :ok
298152 end
299153
300- defp do_get_last_execution_date ( storage_name , scheduler_module ) do
301- storage_name
302- |> get_ets_by_scheduler ( scheduler_module )
154+ defp do_get_last_execution_date ( table ) do
155+ table
303156 |> :ets . lookup ( :last_execution_date )
304157 |> case do
305158 [ ] -> :unknown
306159 [ { :last_execution_date , date } | _t ] -> date
307160 end
308161 end
309162
310- defp do_update_last_execution_date ( storage_name , scheduler_module , last_execution_date ) do
311- table = get_ets_by_scheduler ( storage_name , scheduler_module )
163+ defp do_update_last_execution_date ( table , last_execution_date ) do
312164 :ets . insert ( table , { :last_execution_date , last_execution_date } )
313165 :ok
314166 end
315167
316- defp do_purge ( storage_name , scheduler_module ) do
317- table = get_ets_by_scheduler ( storage_name , scheduler_module )
168+ defp do_purge ( table ) do
318169 :ets . delete_all_objects ( table )
319170 :ok
320171 end
0 commit comments