11defmodule QuantumStorageEts do
22 @ moduledoc """
3- persistent_ets based implementation of a `Quantum.Storage.Adapter`.
4- See https://hexdocs.pm/persistent_ets
3+ `PersistentEts` based implementation of a `Quantum.Storage`.
54 """
6- require Logger
5+
76 use GenServer
8- defstruct [ :schedulers ]
97
10- def start_link do
11- GenServer . start_link ( __MODULE__ , nil , name: __MODULE__ )
12- end
8+ require Logger
139
14- # Callbacks
10+ alias __MODULE__ . State
1511
16- defp __server__ , do: __MODULE__
12+ @ server __MODULE__
1713
18- def init ( _ ) do
19- { :ok , % __MODULE__ { schedulers: % { } } }
20- end
14+ @ behaviour Quantum.Storage
15+
16+ def start_link ( opts ) ,
17+ do: GenServer . start_link ( __MODULE__ , opts , name: Keyword . get ( opts , :name , @ server ) )
18+
19+ @ impl GenServer
20+ def init ( opts ) , do: { :ok , % State { schedulers: % { } , name: Keyword . get ( opts , :name , @ server ) } }
21+
22+ @ impl Quantum.Storage
23+ def jobs ( server \\ @ server , scheduler_module ) ,
24+ do: GenServer . call ( server , { :jobs , scheduler_module } )
25+
26+ @ impl Quantum.Storage
27+ def add_job ( server \\ @ server , scheduler_module , job ) ,
28+ do: GenServer . call ( server , { :add_job , scheduler_module , job } )
29+
30+ @ impl Quantum.Storage
31+ def delete_job ( server \\ @ server , scheduler_module , job_name ) ,
32+ do: GenServer . call ( server , { :delete_job , scheduler_module , job_name } )
33+
34+ @ impl Quantum.Storage
35+ def update_job_state ( server \\ @ server , scheduler_module , job_name , state ) ,
36+ do: GenServer . call ( server , { :update_job_state , scheduler_module , job_name , state } )
37+
38+ @ impl Quantum.Storage
39+ def last_execution_date ( server \\ @ server , scheduler_module ) ,
40+ do: GenServer . call ( server , { :last_execution_date , scheduler_module } )
41+
42+ @ impl Quantum.Storage
43+ def update_last_execution_date ( server \\ @ server , scheduler_module , last_execution_date ) ,
44+ do:
45+ GenServer . call ( server , { :update_last_execution_date , scheduler_module , last_execution_date } )
2146
47+ @ impl Quantum.Storage
48+ def purge ( server \\ @ server , scheduler_module ) ,
49+ do: GenServer . call ( server , { :purge , scheduler_module } )
50+
51+ def purge_all ( server \\ @ server ) , do: GenServer . call ( server , :purge_all )
52+
53+ @ impl GenServer
2254 def handle_call (
2355 { :add_job , scheduler_module , job } ,
2456 _from ,
25- % __MODULE__ { schedulers: schedulers } = state
57+ % State { schedulers: schedulers , name: name } = state
2658 ) do
2759 {
2860 :reply ,
29- do_add_job ( scheduler_module , job ) ,
61+ do_add_job ( name , scheduler_module , job ) ,
3062 % {
3163 state
3264 | schedulers:
3365 schedulers
3466 |> Map . put_new_lazy ( scheduler_module , fn ->
35- create_scheduler_module_atom ( scheduler_module )
67+ create_scheduler_module_atom ( name , scheduler_module )
3668 end )
3769 }
3870 }
3971 end
4072
41- def handle_call ( { :jobs , scheduler_module } , _from , % __MODULE__ { schedulers: schedulers } = state ) do
73+ def handle_call (
74+ { :jobs , scheduler_module } ,
75+ _from ,
76+ % State { schedulers: schedulers , name: name } = state
77+ ) do
4278 {
4379 :reply ,
44- do_get_jobs ( scheduler_module ) ,
80+ do_get_jobs ( name , scheduler_module ) ,
4581 % {
4682 state
4783 | schedulers:
4884 schedulers
4985 |> Map . put_new_lazy ( scheduler_module , fn ->
50- create_scheduler_module_atom ( scheduler_module )
86+ create_scheduler_module_atom ( name , scheduler_module )
5187 end )
5288 }
5389 }
@@ -56,17 +92,17 @@ defmodule QuantumStorageEts do
5692 def handle_call (
5793 { :delete_job , scheduler_module , job } ,
5894 _from ,
59- % __MODULE__ { schedulers: schedulers } = state
95+ % State { schedulers: schedulers , name: name } = state
6096 ) do
6197 {
6298 :reply ,
63- do_delete_job ( scheduler_module , job ) ,
99+ do_delete_job ( name , scheduler_module , job ) ,
64100 % {
65101 state
66102 | schedulers:
67103 schedulers
68104 |> Map . put_new_lazy ( scheduler_module , fn ->
69- create_scheduler_module_atom ( scheduler_module )
105+ create_scheduler_module_atom ( name , scheduler_module )
70106 end )
71107 }
72108 }
@@ -75,17 +111,17 @@ defmodule QuantumStorageEts do
75111 def handle_call (
76112 { :update_job_state , scheduler_module , job_name , job_state } ,
77113 _from ,
78- % __MODULE__ { schedulers: schedulers } = state
114+ % State { schedulers: schedulers , name: name } = state
79115 ) do
80116 {
81117 :reply ,
82- do_update_job_state ( scheduler_module , job_name , job_state ) ,
118+ do_update_job_state ( name , scheduler_module , job_name , job_state ) ,
83119 % {
84120 state
85121 | schedulers:
86122 schedulers
87123 |> Map . put_new_lazy ( scheduler_module , fn ->
88- create_scheduler_module_atom ( scheduler_module )
124+ create_scheduler_module_atom ( name , scheduler_module )
89125 end )
90126 }
91127 }
@@ -94,17 +130,17 @@ defmodule QuantumStorageEts do
94130 def handle_call (
95131 { :last_execution_date , scheduler_module } ,
96132 _from ,
97- % __MODULE__ { schedulers: schedulers } = state
133+ % State { schedulers: schedulers , name: name } = state
98134 ) do
99135 {
100136 :reply ,
101- do_get_last_execution_date ( scheduler_module ) ,
137+ do_get_last_execution_date ( name , scheduler_module ) ,
102138 % {
103139 state
104140 | schedulers:
105141 schedulers
106142 |> Map . put_new_lazy ( scheduler_module , fn ->
107- create_scheduler_module_atom ( scheduler_module )
143+ create_scheduler_module_atom ( name , scheduler_module )
108144 end )
109145 }
110146 }
@@ -113,17 +149,17 @@ defmodule QuantumStorageEts do
113149 def handle_call (
114150 { :update_last_execution_date , scheduler_module , last_execution_date } ,
115151 _from ,
116- % __MODULE__ { schedulers: schedulers } = state
152+ % State { schedulers: schedulers , name: name } = state
117153 ) do
118154 {
119155 :reply ,
120- do_update_last_execution_date ( scheduler_module , last_execution_date ) ,
156+ do_update_last_execution_date ( name , scheduler_module , last_execution_date ) ,
121157 % {
122158 state
123159 | schedulers:
124160 schedulers
125161 |> Map . put_new_lazy ( scheduler_module , fn ->
126- create_scheduler_module_atom ( scheduler_module )
162+ create_scheduler_module_atom ( name , scheduler_module )
127163 end )
128164 }
129165 }
@@ -132,40 +168,48 @@ defmodule QuantumStorageEts do
132168 def handle_call (
133169 { :purge , scheduler_module } ,
134170 _from ,
135- % __MODULE__ { schedulers: schedulers } = state
171+ % State { schedulers: schedulers , name: name } = state
136172 ) do
137173 {
138174 :reply ,
139- do_purge ( scheduler_module ) ,
175+ do_purge ( name , scheduler_module ) ,
140176 % {
141177 state
142178 | schedulers:
143179 schedulers
144180 |> Map . put_new_lazy ( scheduler_module , fn ->
145- create_scheduler_module_atom ( scheduler_module )
181+ create_scheduler_module_atom ( name , scheduler_module )
146182 end )
147183 }
148184 }
149185 end
150186
151- # Helpers
152- defp create_scheduler_module_atom ( scheduler_module ) do
153- scheduler_module
187+ def handle_call ( :purge_all , _from , % State { schedulers: schedulers , name: name } = state ) do
188+ schedulers |> Map . values ( ) |> Enum . each ( fn scheduler -> :ok = do_purge ( name , scheduler ) end )
189+ { :reply , :ok , state }
190+ end
191+
192+ defp create_scheduler_module_atom ( storage_name , scheduler_module ) do
193+ Module . concat ( storage_name , scheduler_module )
154194 end
155195
156196 defp job_key ( job_name ) do
157197 { :job , job_name }
158198 end
159199
160- defp get_ets_by_scheduler ( scheduler_module ) do
161- scheduler_module_atom = create_scheduler_module_atom ( scheduler_module )
200+ defp get_ets_by_scheduler ( storage_name , scheduler_module ) do
201+ scheduler_module_atom = create_scheduler_module_atom ( storage_name , scheduler_module )
162202
163203 if ets_exist? ( scheduler_module_atom ) do
164204 scheduler_module_atom
165205 else
166- PersistentEts . new ( scheduler_module_atom , "#{ scheduler_module_atom } .tab" , [
206+ path = Application . app_dir ( :quantum_storage_ets , "priv/tables/#{ scheduler_module_atom } .tab" )
207+
208+ File . mkdir_p! ( Path . dirname ( path ) )
209+
210+ PersistentEts . new ( scheduler_module_atom , path , [
167211 :named_table ,
168- :set
212+ :ordered_set
169213 ] )
170214 end
171215 end
@@ -192,9 +236,8 @@ defmodule QuantumStorageEts do
192236 result
193237 end
194238
195- # Private functions
196- defp do_add_job ( scheduler_module , job ) do
197- table = get_ets_by_scheduler ( scheduler_module )
239+ defp do_add_job ( storage_name , scheduler_module , job ) do
240+ table = get_ets_by_scheduler ( storage_name , scheduler_module )
198241 :ets . insert ( table , entry = { job_key ( job . name ) , job } )
199242
200243 Logger . debug ( fn ->
@@ -206,48 +249,41 @@ defmodule QuantumStorageEts do
206249 :ok
207250 end
208251
209- defp do_get_jobs ( scheduler_module ) do
210- table = get_ets_by_scheduler ( scheduler_module )
211-
212- result =
213- case :ets . match ( table , { { :job , :_ } , :"$1" } ) do
214- [ ] -> :not_applicable
215- [ _h | _t ] = jobs -> jobs |> List . flatten ( )
216- end
217-
218- Logger . debug ( fn ->
219- "[#{ inspect ( Node . self ( ) ) } ][#{ __MODULE__ } ] jobs are: #{ inspect ( result ) } "
220- end )
221-
222- result
252+ defp do_get_jobs ( storage_name , scheduler_module ) do
253+ storage_name
254+ |> create_scheduler_module_atom ( scheduler_module )
255+ |> ets_exist? ( )
256+ |> if do
257+ storage_name
258+ |> get_ets_by_scheduler ( scheduler_module )
259+ |> :ets . match ( { { :job , :_ } , :"$1" } )
260+ |> List . flatten ( )
261+ else
262+ :not_applicable
263+ end
223264 end
224265
225- defp do_delete_job ( scheduler_module , job_name ) do
226- table = get_ets_by_scheduler ( scheduler_module )
227- :ets . delete ( table , job_key ( job_name ) )
266+ defp do_delete_job ( storage_name , scheduler_module , job_name ) do
267+ storage_name
268+ |> get_ets_by_scheduler ( scheduler_module )
269+ |> :ets . delete ( job_key ( job_name ) )
270+
228271 :ok
229272 end
230273
231- defp do_update_job_state ( scheduler_module , job_name , state ) do
232- table = get_ets_by_scheduler ( scheduler_module )
233-
234- job =
235- case :ets . lookup ( table , { :job , job_name } ) do
236- # TODO: should we raise here or should we handle the situation with a return value of a special kind?
237- [ ] ->
238- raise "Job #{ job_name } does not exist in the storage"
274+ defp do_update_job_state ( storage_name , scheduler_module , job_name , state ) do
275+ table = get_ets_by_scheduler ( storage_name , scheduler_module )
239276
240- [ j | _t ] ->
241- j
242- end
277+ table
278+ |> :ets . lookup ( job_key ( job_name ) )
279+ |> Enum . map ( & { elem ( & 1 , 0 ) , % { elem ( & 1 , 1 ) | state: state } } )
280+ |> Enum . each ( & :ets . update_element ( table , elem ( & 1 , 0 ) , { 2 , elem ( & 1 , 1 ) } ) )
243281
244- upd_job = % { job | state: state }
245- :ets . update_element ( table , job_key ( job_name ) , { 1 , upd_job } )
246282 :ok
247283 end
248284
249- defp do_get_last_execution_date ( scheduler_module ) do
250- table = get_ets_by_scheduler ( scheduler_module )
285+ defp do_get_last_execution_date ( storage_name , scheduler_module ) do
286+ table = get_ets_by_scheduler ( storage_name , scheduler_module )
251287
252288 case :ets . lookup ( table , :last_execution_date ) do
253289 [ ] -> :unknown
@@ -256,46 +292,15 @@ defmodule QuantumStorageEts do
256292 end
257293 end
258294
259- defp do_update_last_execution_date ( scheduler_module , last_execution_date ) do
260- table = get_ets_by_scheduler ( scheduler_module )
295+ defp do_update_last_execution_date ( storage_name , scheduler_module , last_execution_date ) do
296+ table = get_ets_by_scheduler ( storage_name , scheduler_module )
261297 :ets . insert ( table , { :last_execution_date , last_execution_date } )
262298 :ok
263299 end
264300
265- defp do_purge ( scheduler_module ) do
266- table = get_ets_by_scheduler ( scheduler_module )
301+ defp do_purge ( storage_name , scheduler_module ) do
302+ table = get_ets_by_scheduler ( storage_name , scheduler_module )
267303 :ets . delete_all_objects ( table )
268304 :ok
269305 end
270-
271- @ behaviour Quantum.Storage.Adapter
272-
273- def jobs ( scheduler_module ) do
274- __server__ |> GenServer . call ( { :jobs , scheduler_module } )
275- end
276-
277- def add_job ( scheduler_module , job ) do
278- __server__ |> GenServer . call ( { :add_job , scheduler_module , job } )
279- end
280-
281- def delete_job ( scheduler_module , job_name ) do
282- __server__ |> GenServer . call ( { :delete_job , scheduler_module , job_name } )
283- end
284-
285- def update_job_state ( scheduler_module , job_name , state ) do
286- __server__ |> GenServer . call ( { :update_job_state , scheduler_module , job_name , state } )
287- end
288-
289- def last_execution_date ( scheduler_module ) do
290- __server__ |> GenServer . call ( { :last_execution_date , scheduler_module } )
291- end
292-
293- def update_last_execution_date ( scheduler_module , last_execution_date ) do
294- __server__
295- |> GenServer . call ( { :update_last_execution_date , scheduler_module , last_execution_date } )
296- end
297-
298- def purge ( scheduler_module ) do
299- __server__ |> GenServer . call ( { :purge , scheduler_module } )
300- end
301306end
0 commit comments