Skip to content

Commit 97a35a4

Browse files
pyatkovmaennchen
authored andcommitted
Storage adapter implementation: Implemented Quantum.Storage.Adapter behavior via PersistentETS.
1 parent 4985280 commit 97a35a4

4 files changed

Lines changed: 304 additions & 1 deletion

File tree

config/.credo.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
checks: [
1010
# For others you can also set parameters
1111
{Credo.Check.Readability.MaxLineLength, priority: :low, max_length: 120},
12+
{Credo.Check.Design.TagTODO, false}
1213
]
1314
}
1415
]

lib/quantum_storage_ets.ex

Lines changed: 297 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,301 @@
11
defmodule QuantumStorageEts do
22
@moduledoc """
3-
Documentation for QuantumStorageEts.
3+
persistent_ets based implementation of a `Quantum.Storage.Adapter`.
4+
See https://hexdocs.pm/persistent_ets
45
"""
6+
require Logger
7+
use GenServer
8+
defstruct [:schedulers]
9+
10+
def start_link do
11+
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
12+
end
13+
14+
# Callbacks
15+
16+
defp __server__, do: __MODULE__
17+
18+
def init(_) do
19+
{:ok, %__MODULE__{schedulers: %{}}}
20+
end
21+
22+
def handle_call(
23+
{:add_job, scheduler_module, job},
24+
_from,
25+
%__MODULE__{schedulers: schedulers} = state
26+
) do
27+
{
28+
:reply,
29+
do_add_job(scheduler_module, job),
30+
%{
31+
state
32+
| schedulers:
33+
schedulers
34+
|> Map.put_new_lazy(scheduler_module, fn ->
35+
create_scheduler_module_atom(scheduler_module)
36+
end)
37+
}
38+
}
39+
end
40+
41+
def handle_call({:jobs, scheduler_module}, _from, %__MODULE__{schedulers: schedulers} = state) do
42+
{
43+
:reply,
44+
do_get_jobs(scheduler_module),
45+
%{
46+
state
47+
| schedulers:
48+
schedulers
49+
|> Map.put_new_lazy(scheduler_module, fn ->
50+
create_scheduler_module_atom(scheduler_module)
51+
end)
52+
}
53+
}
54+
end
55+
56+
def handle_call(
57+
{:delete_job, scheduler_module, job},
58+
_from,
59+
%__MODULE__{schedulers: schedulers} = state
60+
) do
61+
{
62+
:reply,
63+
do_delete_job(scheduler_module, job),
64+
%{
65+
state
66+
| schedulers:
67+
schedulers
68+
|> Map.put_new_lazy(scheduler_module, fn ->
69+
create_scheduler_module_atom(scheduler_module)
70+
end)
71+
}
72+
}
73+
end
74+
75+
def handle_call(
76+
{:update_job_state, scheduler_module, job_name, job_state},
77+
_from,
78+
%__MODULE__{schedulers: schedulers} = state
79+
) do
80+
{
81+
:reply,
82+
do_update_job_state(scheduler_module, job_name, job_state),
83+
%{
84+
state
85+
| schedulers:
86+
schedulers
87+
|> Map.put_new_lazy(scheduler_module, fn ->
88+
create_scheduler_module_atom(scheduler_module)
89+
end)
90+
}
91+
}
92+
end
93+
94+
def handle_call(
95+
{:last_execution_date, scheduler_module},
96+
_from,
97+
%__MODULE__{schedulers: schedulers} = state
98+
) do
99+
{
100+
:reply,
101+
do_get_last_execution_date(scheduler_module),
102+
%{
103+
state
104+
| schedulers:
105+
schedulers
106+
|> Map.put_new_lazy(scheduler_module, fn ->
107+
create_scheduler_module_atom(scheduler_module)
108+
end)
109+
}
110+
}
111+
end
112+
113+
def handle_call(
114+
{:update_last_execution_date, scheduler_module, last_execution_date},
115+
_from,
116+
%__MODULE__{schedulers: schedulers} = state
117+
) do
118+
{
119+
:reply,
120+
do_update_last_execution_date(scheduler_module, last_execution_date),
121+
%{
122+
state
123+
| schedulers:
124+
schedulers
125+
|> Map.put_new_lazy(scheduler_module, fn ->
126+
create_scheduler_module_atom(scheduler_module)
127+
end)
128+
}
129+
}
130+
end
131+
132+
def handle_call(
133+
{:purge, scheduler_module},
134+
_from,
135+
%__MODULE__{schedulers: schedulers} = state
136+
) do
137+
{
138+
:reply,
139+
do_purge(scheduler_module),
140+
%{
141+
state
142+
| schedulers:
143+
schedulers
144+
|> Map.put_new_lazy(scheduler_module, fn ->
145+
create_scheduler_module_atom(scheduler_module)
146+
end)
147+
}
148+
}
149+
end
150+
151+
# Helpers
152+
defp create_scheduler_module_atom(scheduler_module) do
153+
scheduler_module
154+
end
155+
156+
defp job_key(job_name) do
157+
{:job, job_name}
158+
end
159+
160+
defp get_ets_by_scheduler(scheduler_module) do
161+
scheduler_module_atom = create_scheduler_module_atom(scheduler_module)
162+
163+
if ets_exist?(scheduler_module_atom) do
164+
scheduler_module_atom
165+
else
166+
PersistentEts.new(scheduler_module_atom, "#{scheduler_module_atom}.tab", [
167+
:named_table,
168+
:set
169+
])
170+
end
171+
end
172+
173+
defp ets_exist?(ets_name) do
174+
Logger.debug(fn ->
175+
"[#{inspect(Node.self())}][#{__MODULE__}] Determining whether ETS table with name [#{
176+
inspect(ets_name)
177+
}] exists"
178+
end)
179+
180+
result =
181+
case :ets.info(ets_name) do
182+
:undefined -> false
183+
_ -> true
184+
end
185+
186+
Logger.debug(fn ->
187+
"[#{inspect(Node.self())}][#{__MODULE__}] ETS table with name [#{inspect(ets_name)}] #{
188+
if result, do: ~S|exists|, else: ~S|does not exist|
189+
}"
190+
end)
191+
192+
result
193+
end
194+
195+
# Private functions
196+
defp do_add_job(scheduler_module, job) do
197+
table = get_ets_by_scheduler(scheduler_module)
198+
:ets.insert(table, entry = {job_key(job.name), job})
199+
200+
Logger.debug(fn ->
201+
"[#{inspect(Node.self())}][#{__MODULE__}] inserting [#{inspect(entry)}] into Persistent ETS table [#{
202+
table
203+
}]"
204+
end)
205+
206+
:ok
207+
end
208+
209+
defp do_get_jobs(scheduler_module) do
210+
table = get_ets_by_scheduler(scheduler_module)
211+
212+
result =
213+
case :ets.match(table, {{:job, :_}, :"$1"}) do
214+
[] -> :not_applicable
215+
[_h | _t] = jobs -> jobs |> List.flatten()
216+
end
217+
218+
Logger.debug(fn ->
219+
"[#{inspect(Node.self())}][#{__MODULE__}] jobs are: #{inspect(result)}"
220+
end)
221+
222+
result
223+
end
224+
225+
defp do_delete_job(scheduler_module, job_name) do
226+
table = get_ets_by_scheduler(scheduler_module)
227+
:ets.delete(table, job_key(job_name))
228+
:ok
229+
end
230+
231+
defp do_update_job_state(scheduler_module, job_name, state) do
232+
table = get_ets_by_scheduler(scheduler_module)
233+
234+
job =
235+
case :ets.lookup(table, {:job, job_name}) do
236+
# TODO: should we raise here or should we handle the situation with a return value of a special kind?
237+
[] ->
238+
raise "Job #{job_name} does not exist in the storage"
239+
240+
[j | _t] ->
241+
j
242+
end
243+
244+
upd_job = %{job | state: state}
245+
:ets.update_element(table, job_key(job_name), {1, upd_job})
246+
:ok
247+
end
248+
249+
defp do_get_last_execution_date(scheduler_module) do
250+
table = get_ets_by_scheduler(scheduler_module)
251+
252+
case :ets.lookup(table, :last_execution_date) do
253+
[] -> :unknown
254+
[{:last_execution_date, date} | _t] -> date
255+
{:last_execution_date, d} -> d
256+
end
257+
end
258+
259+
defp do_update_last_execution_date(scheduler_module, last_execution_date) do
260+
table = get_ets_by_scheduler(scheduler_module)
261+
:ets.insert(table, {:last_execution_date, last_execution_date})
262+
:ok
263+
end
264+
265+
defp do_purge(scheduler_module) do
266+
table = get_ets_by_scheduler(scheduler_module)
267+
:ets.delete_all_objects(table)
268+
:ok
269+
end
270+
271+
@behaviour Quantum.Storage.Adapter
272+
273+
def jobs(scheduler_module) do
274+
__server__ |> GenServer.call({:jobs, scheduler_module})
275+
end
276+
277+
def add_job(scheduler_module, job) do
278+
__server__ |> GenServer.call({:add_job, scheduler_module, job})
279+
end
280+
281+
def delete_job(scheduler_module, job_name) do
282+
__server__ |> GenServer.call({:delete_job, scheduler_module, job_name})
283+
end
284+
285+
def update_job_state(scheduler_module, job_name, state) do
286+
__server__ |> GenServer.call({:update_job_state, scheduler_module, job_name, state})
287+
end
288+
289+
def last_execution_date(scheduler_module) do
290+
__server__ |> GenServer.call({:last_execution_date, scheduler_module})
291+
end
292+
293+
def update_last_execution_date(scheduler_module, last_execution_date) do
294+
__server__
295+
|> GenServer.call({:update_last_execution_date, scheduler_module, last_execution_date})
296+
end
297+
298+
def purge(scheduler_module) do
299+
__server__ |> GenServer.call({:purge, scheduler_module})
300+
end
5301
end

mix.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ defmodule QuantumStorageEts.MixProject do
6060
# Run "mix help deps" to learn about dependencies.
6161
defp deps do
6262
[
63+
{:persistent_ets, "~> 0.1.0"},
64+
{:quantum, github: "c-rack/quantum-elixir", branch: "storage"},
6365
{:earmark, "~> 1.0", only: [:dev, :docs], runtime: false},
6466
{:ex_doc, "~> 0.13", only: [:dev, :docs], runtime: false},
6567
{:excoveralls, "~> 0.5", only: [:dev, :test], runtime: false},

mix.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
33
"certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [:rebar3], [], "hexpm"},
44
"credo": {:hex, :credo, "0.8.10", "261862bb7363247762e1063713bb85df2bbd84af8d8610d1272cd9c1943bba63", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"},
5+
"crontab": {:hex, :crontab, "1.1.2", "4784a50987b4a19af07a908f98e8a308b00f9c93efc5a7892155dc10cd8fc7d9", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 2.1", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"},
56
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
67
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
78
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
89
"excoveralls": {:hex, :excoveralls, "0.8.1", "0bbf67f22c7dbf7503981d21a5eef5db8bbc3cb86e70d3798e8c802c74fa5e27", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
910
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
11+
"gen_stage": {:hex, :gen_stage, "0.13.1", "edff5bca9cab22c5d03a834062515e6a1aeeb7665fb44eddae086252e39c4378", [:mix], [], "hexpm"},
1012
"hackney": {:hex, :hackney, "1.11.0", "4951ee019df102492dabba66a09e305f61919a8a183a7860236c0fde586134b6", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
1113
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
1214
"inch_ex": {:hex, :inch_ex, "0.5.6", "418357418a553baa6d04eccd1b44171936817db61f4c0840112b420b8e378e67", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
1315
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"},
1416
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
1517
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
18+
"persistent_ets": {:hex, :persistent_ets, "0.1.0", "a9ea1d6e41094441bb7fd3ea6a5717b66de82343bc6a2899e678a5bad46e495b", [:mix], [], "hexpm"},
1619
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
20+
"quantum": {:git, "https://github.com/c-rack/quantum-elixir.git", "d66854ffad75dcd8c0cbed978eee25fcf89e0ea6", [branch: "storage"]},
1721
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"},
1822
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"},
1923
}

0 commit comments

Comments
 (0)