Skip to content

Commit ab51302

Browse files
committed
Add processes
1 parent 2cf4a2b commit ab51302

6 files changed

Lines changed: 166 additions & 3 deletions

File tree

src/SimJulia.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module SimJulia
77

88
using DataStructures
99

10-
import Base.run, Base.now, Base.isless, Base.show
10+
import Base.run, Base.now, Base.isless, Base.show, Base.interrupt, Base.yield
1111
import Base.(&), Base.(|)
1212

1313
export AbstractEvent
@@ -18,10 +18,13 @@ module SimJulia
1818
export succeed, fail, append_callback, @callback, remove_callback
1919
export Operator
2020
export (&), (|)
21+
export Process, @process
22+
export yield, interrupt
2123

2224
include("base.jl")
2325
include("simulations.jl")
2426
include("events.jl")
2527
include("operators.jl")
26-
28+
include("tasks/base.jl")
29+
include("processes.jl")
2730
end

src/processes.jl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
type Process <: AbstractProcess
2+
bev :: BaseEvent
3+
task :: Task
4+
target :: AbstractEvent
5+
resume :: Function
6+
function Process(func::Function, env::Environment, args::Any...)
7+
proc = new()
8+
proc.bev = BaseEvent(env)
9+
proc.task = @task func(env, args...)
10+
proc.target = Timeout(env)
11+
proc.resume = @callback execute(proc.target, proc)
12+
return proc
13+
end
14+
end
15+
16+
macro process(expr)
17+
expr.head != :call && error("Expression is not a function call!")
18+
func = esc(expr.args[1])
19+
args = [esc(expr.args[n]) for n in 2:length(expr.args)]
20+
:(Process($(func), $(args...)))
21+
end
22+
23+
function yield(target::AbstractEvent)
24+
env = environment(target)
25+
proc = active_process(env)
26+
proc.target = state(target) == triggered ? Timeout(env, value=value(target)) : target
27+
proc.resume = append_callback(execute, proc.target, proc)
28+
ret = SimJulia.produce(nothing)
29+
isa(ret, Exception) && throw(ret)
30+
return ret
31+
end
32+
33+
function execute(ev::AbstractEvent, proc::Process)
34+
try
35+
env = environment(ev)
36+
set_active_process(env, proc)
37+
ret = SimJulia.consume(proc.task, value(ev))
38+
reset_active_process(env)
39+
istaskdone(proc.task) && schedule(proc.bev, value=ret)
40+
catch exc
41+
rethrow(exc)
42+
end
43+
end
44+
45+
function interrupt(proc::Process, cause::Any=nothing)
46+
if !istaskdone(proc.task)
47+
remove_callback(proc.resume, proc.target)
48+
proc.target = Timeout(environment(proc), priority=typemax(Int8), value=InterruptException(proc, cause))
49+
proc.resume = append_callback(execute, proc.target, proc)
50+
end
51+
end

src/tasks/base.jl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
function produce(v)
2+
ct = current_task()
3+
consumer = ct.consumers
4+
ct.consumers = nothing
5+
Base.schedule_and_wait(consumer, v)
6+
return consumer.result
7+
end
8+
9+
function consume(producer::Task, values...)
10+
istaskdone(producer) && return producer.value
11+
ct = current_task()
12+
ct.result = length(values)==1 ? values[1] : values
13+
producer.consumers = ct
14+
Base.schedule_and_wait(producer)
15+
end

test/processes.jl

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using SimJulia
2+
3+
function fibonnaci(sim::Simulation)
4+
a = 0
5+
b = 1
6+
while true
7+
println(a)
8+
yield(Timeout(sim, 1))
9+
a, b = b, a+b
10+
end
11+
end
12+
13+
function test_process(sim::Simulation, ev::AbstractEvent)
14+
yield(ev)
15+
end
16+
17+
function test_process_exception(sim::Simulation, ev::AbstractEvent)
18+
try
19+
value = yield(ev)
20+
println("hi")
21+
catch exc
22+
println("$exc has been thrown")
23+
end
24+
end
25+
26+
function test_interrupter(sim::Simulation, proc::Process)
27+
yield(Timeout(sim, 2))
28+
interrupt(proc)
29+
end
30+
31+
function test_interrupted(sim::Simulation)
32+
try
33+
yield(Timeout(sim, 10))
34+
catch exc
35+
if isa(exc, SimJulia.InterruptException)
36+
println("$(active_process(sim)) interrupted")
37+
end
38+
end
39+
yield(Timeout(sim, 10))
40+
throw(TestException())
41+
end
42+
43+
sim = Simulation()
44+
@process fibonnaci(sim)
45+
run(sim, 10)
46+
47+
sim = Simulation()
48+
@process test_process(sim, succeed(Event(sim)))
49+
run(sim)
50+
51+
sim = Simulation()
52+
@process test_process_exception(sim, Timeout(sim, 1, value=TestException()))
53+
try
54+
run(sim)
55+
catch exc
56+
println("$exc has been thrown")
57+
end
58+
59+
sim = Simulation()
60+
@process test_process_exception(sim, Timeout(sim, value=TestException()))
61+
run(sim)
62+
63+
sim = Simulation()
64+
proc = @process test_interrupted(sim)
65+
@process test_interrupter(sim, proc)
66+
try
67+
run(sim)
68+
catch exc
69+
println("$exc has been thrown")
70+
end

test/runtests.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ for test_file in [
44
"base.jl",
55
"simulations.jl",
66
"events.jl",
7-
"operators.jl",]
7+
"operators.jl",
8+
"tasks/base.jl",
9+
"processes.jl",]
810
include(testpath(test_file))
911
end

test/tasks/base.jl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using SimJulia
2+
3+
function fibonnaci(sim::Simulation)
4+
a = 0.0
5+
b = 1.0
6+
for i = 1:10
7+
println("Fibonnaci value equals ", a, " at time ", now(sim))
8+
yield(Timeout(sim, 1.0))
9+
a, b = b, a+b
10+
end
11+
a
12+
end
13+
14+
function wait_for_process(sim::Simulation, process::Process)
15+
val = yield(process)
16+
println("Fibonnaci ended with ", val, " at time ", now(sim))
17+
end
18+
19+
sim = Simulation()
20+
fib = @process fibonnaci(sim)
21+
wait = @process wait_for_process(sim, fib)
22+
run(sim)

0 commit comments

Comments
 (0)