Skip to content

Commit d035f79

Browse files
committed
Both Coroutines as Processes are supported
1 parent b5e7569 commit d035f79

14 files changed

Lines changed: 282 additions & 50 deletions

src/SimJulia.jl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module SimJulia
1818
export Operator
1919
export (&), (|)
2020
export FiniteStateMachine, @stateful, @yield
21+
export Coroutine, @Coroutine
2122
export Process, @Process
2223
export interrupt
2324
export Simulation, StopSimulation
@@ -28,12 +29,14 @@ module SimJulia
2829
include("base.jl")
2930
include("events.jl")
3031
include("operators.jl")
31-
include("coroutine/utils.jl")
32-
include("coroutine/transforms.jl")
33-
include("coroutine/macro.jl")
34-
include("process.jl")
3532
include("time.jl")
3633
include("simulation.jl")
34+
include("coroutines/utils.jl")
35+
include("coroutines/transforms.jl")
36+
include("coroutines/macro.jl")
37+
include("coroutines.jl")
38+
include("processes/base.jl")
39+
include("processes.jl")
3740
include("resources/base.jl")
3841
include("resources/containers.jl")
3942
include("resources/stores.jl")

src/coroutines.jl

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""
2+
A `Coroutine` is an abstraction for an event yielding function, i.e. a process function.
3+
4+
The process function can suspend its execution by yielding an instance of `AbstractEvent`. The `Environment` will take care of resuming the process function with the value of that event once it has happened. The exception of failed events is also thrown into the process function.
5+
6+
A `Coroutine` is a subtype of `AbstractEvent`. It is triggered, once the process functions returns or raises an exception. The value of the process is the return value of the process function or the exception, respectively.
7+
8+
**Signature**:
9+
10+
Coroutine{E<:Environment} <: AbstractEvent{E}
11+
12+
**Fields**:
13+
14+
- bev :: BaseEvent{E}
15+
- task :: Task
16+
- target :: AbstractEvent{E}
17+
- resume :: Function
18+
19+
**Constructor**:
20+
21+
Coroutine{E<:Environment}(func::Function, env::E, args::Any...)
22+
"""
23+
type Coroutine{E<:Environment} <: AbstractProcess{E}
24+
bev :: BaseEvent{E}
25+
fsm :: FiniteStateMachine
26+
target :: AbstractEvent{E}
27+
resume :: Function
28+
function Coroutine{E}(func::Function, env::E, args::Any...) where E<:Environment
29+
proc = new()
30+
proc.bev = BaseEvent(env)
31+
proc.fsm = func(env, args...)
32+
proc.target = Timeout(env)
33+
proc.resume = append_callback(execute, proc.target, proc)
34+
return proc
35+
end
36+
end
37+
38+
function Coroutine{E<:Environment}(func::Function, env::E, args::Any...)
39+
Coroutine{E}(func, env, args...)
40+
end
41+
42+
"""
43+
Creates a `Coroutine` with process function `func` having a required argument `env`, i.e. an instance of a subtype of `Environment`, and a variable number of arguments `args...`.
44+
45+
**Signature**:
46+
47+
@Coroutine func(env, args...)
48+
"""
49+
macro Coroutine(ex)
50+
if ex.head == :call
51+
func = esc(ex.args[1])
52+
args = [esc(ex.args[n]) for n in 2:length(ex.args)]
53+
return :(Coroutine($(func), $(args...)))
54+
end
55+
end
56+
57+
function execute{E<:Environment}(ev::AbstractEvent{E}, proc::Coroutine{E})
58+
try
59+
env = environment(ev)
60+
set_active_process(env, proc)
61+
target = proc.fsm(value(ev))
62+
if iscoroutinedone(proc.fsm)
63+
schedule(proc.bev, value=target)
64+
else
65+
if state(target) == triggered
66+
proc.target = Timeout(env, value=value(target))
67+
else
68+
proc.target = target
69+
end
70+
proc.resume = append_callback(execute, proc.target, proc)
71+
end
72+
set_active_process(env)
73+
catch exc
74+
rethrow(exc)
75+
end
76+
end
77+
78+
function interrupt{E<:Environment}(proc::Coroutine{E}, cause::Any=nothing)
79+
if !iscoroutinedone(proc.fsm)
80+
remove_callback(proc.resume, proc.target)
81+
proc.target = Timeout(environment(proc), priority=true, value=InterruptException(proc, cause))
82+
proc.resume = append_callback(execute, proc.target, proc)
83+
end
84+
end

src/process.jl renamed to src/processes.jl

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ Process{E<:Environment} <: AbstractEvent{E}
2020
2121
Process{E<:Environment}(func::Function, env::E, args::Any...)
2222
"""
23-
type Process{E<:Environment} <: AbstractEvent{E}
23+
type Process{E<:Environment} <: AbstractProcess{E}
2424
bev :: BaseEvent{E}
25-
fsm :: FiniteStateMachine
25+
task :: Task
2626
target :: AbstractEvent{E}
2727
resume :: Function
2828
function Process{E}(func::Function, env::E, args::Any...) where E<:Environment
2929
proc = new()
3030
proc.bev = BaseEvent(env)
31-
proc.fsm = func(env, args...)
31+
proc.task = @task func(env, args...)
3232
proc.target = Timeout(env)
3333
proc.resume = append_callback(execute, proc.target, proc)
3434
return proc
@@ -54,34 +54,38 @@ macro Process(ex)
5454
end
5555
end
5656

57+
function yield{E<:Environment}(target::AbstractEvent{E})
58+
env = environment(target)
59+
proc = active_process(env)
60+
if state(target) == triggered
61+
proc.target = Timeout(env, value=value(target))
62+
else
63+
proc.target = target
64+
end
65+
proc.resume = append_callback(execute, proc.target, proc)
66+
ret = SimJulia.produce(nothing)
67+
if isa(ret, Exception)
68+
throw(ret)
69+
end
70+
return ret
71+
end
72+
5773
function execute{E<:Environment}(ev::AbstractEvent{E}, proc::Process{E})
5874
try
5975
env = environment(ev)
6076
set_active_process(env, proc)
61-
target = proc.fsm(value(ev))
62-
if iscoroutinedone(proc.fsm)
63-
schedule(proc.bev, value=target)
64-
else
65-
if state(target) == triggered
66-
proc.target = Timeout(env, value=value(target))
67-
else
68-
proc.target = target
69-
end
70-
proc.resume = append_callback(execute, proc.target, proc)
71-
end
77+
ret = SimJulia.consume(proc.task, value(ev))
7278
set_active_process(env)
79+
if istaskdone(proc.task)
80+
schedule(proc.bev, value=ret)
81+
end
7382
catch exc
7483
rethrow(exc)
7584
end
7685
end
7786

78-
struct InterruptException{E<:Environment} <: Exception
79-
by :: Process{E}
80-
cause :: Any
81-
end
82-
8387
function interrupt{E<:Environment}(proc::Process{E}, cause::Any=nothing)
84-
if !iscoroutinedone(proc.fsm)
88+
if !istaskdone(proc.fsm)
8589
remove_callback(proc.resume, proc.target)
8690
proc.target = Timeout(environment(proc), priority=true, value=InterruptException(proc, cause))
8791
proc.resume = append_callback(execute, proc.target, proc)

src/processes/base.jl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
function produce(consumer::Task, values...)
2+
ct = current_task()
3+
ct.result = length(values)==1 ? values[1] : values
4+
Base.schedule_and_wait(consumer)
5+
return consumer.result
6+
end
7+
8+
function produce(v)
9+
ct = current_task()
10+
consumer = ct.consumers
11+
ct.consumers = nothing
12+
Base.schedule_and_wait(consumer, v)
13+
return consumer.result
14+
end
15+
produce(v...) = produce(v)
16+
17+
function consume(producer::Task, values...)
18+
istaskdone(producer) && return wait(producer)
19+
ct = current_task()
20+
ct.result = length(values)==1 ? values[1] : values
21+
producer.consumers = ct
22+
producer.state == :runnable ? Base.schedule_and_wait(producer) : wait()
23+
end

src/resources/containers.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ macro Request(res, req, expr)
5252
end)
5353
end
5454

55+
function Request{E<:Environment}(func::Function, res::Resource{E}; priority::Int=0)
56+
req = Request(res, priority=priority)
57+
try
58+
func(req)
59+
finally
60+
if state(req) == triggered
61+
yield(Release(res, priority=priority))
62+
else
63+
cancel(res, req)
64+
end
65+
end
66+
end
67+
5568
function Get{N<:Number, E<:Environment}(con::Container{N, E}, amount::N; priority::Int=0) :: Get{E}
5669
get_ev = Get{E}(con.env)
5770
con.Get_queue[get_ev] = ContainerKey(priority, con.seid+=one(UInt), amount)

src/simulation.jl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ function isless{T<:TimeType}(a::EventKey{T}, b::EventKey{T}) :: Bool
88
(a.time < b.time) || (a.time == b.time && a.priority > b.priority) || (a.time == b.time && a.priority == b.priority && a.id < b.id)
99
end
1010

11+
abstract type AbstractProcess{E<:Environment} <: AbstractEvent{E} end
12+
1113
"""
1214
Execution environment for a simulation.
1315
@@ -22,7 +24,7 @@ Simulation{T<:TimeType} <: Environment
2224
- heap :: PriorityQueue{BaseEvent{Simulation{T}}, EventKey{T}}
2325
- eid :: UInt
2426
- sid :: UInt
25-
- active_proc :: Nullable{Process}
27+
- active_proc :: Nullable{AbstractProcess}
2628
2729
**Constructors**:
2830
@@ -36,9 +38,9 @@ mutable struct Simulation{T<:TimeType} <: Environment
3638
heap :: DataStructures.PriorityQueue{BaseEvent{Simulation{T}}, EventKey{T}}
3739
eid :: UInt
3840
sid :: UInt
39-
active_proc :: Nullable{Process{Simulation{T}}}
41+
active_proc :: Nullable{AbstractProcess{Simulation{T}}}
4042
function Simulation{T}(initial_time::T) where T<:TimeType
41-
new(initial_time, DataStructures.PriorityQueue(BaseEvent{Simulation{T}}, EventKey{T}), zero(UInt), zero(UInt), Nullable{Process{Simulation{T}}}())
43+
new(initial_time, DataStructures.PriorityQueue(BaseEvent{Simulation{T}}, EventKey{T}), zero(UInt), zero(UInt), Nullable{AbstractProcess{Simulation{T}}}())
4244
end
4345
end
4446

@@ -60,15 +62,15 @@ function now{T<:TimeType}(sim::Simulation{T}) :: T
6062
sim.time
6163
end
6264

63-
function active_process{T<:TimeType}(sim::Simulation{T}) :: Process{Simulation{T}}
65+
function active_process{T<:TimeType}(sim::Simulation{T}) :: AbstractProcess{Simulation{T}}
6466
get(sim.active_proc)
6567
end
6668

6769
function set_active_process{T<:TimeType}(sim::Simulation{T})
68-
sim.active_proc = Nullable{Process{Simulation{T}}}()
70+
sim.active_proc = Nullable{AbstractProcess{Simulation{T}}}()
6971
end
7072

71-
function set_active_process{T<:TimeType}(sim::Simulation{T}, proc::Process{Simulation{T}})
73+
function set_active_process{T<:TimeType}(sim::Simulation{T}, proc::AbstractProcess{Simulation{T}})
7274
sim.active_proc = Nullable(proc)
7375
end
7476

@@ -162,3 +164,8 @@ function schedule{T<:TimeType}(bev::BaseEvent{Simulation{T}}, delay::Number=0; p
162164
P = typeof(eps(bev.env.time))
163165
schedule(bev, P(delay), priority=priority, value=value)
164166
end
167+
168+
struct InterruptException{E<:Environment} <: Exception
169+
by :: AbstractProcess{E}
170+
cause :: Any
171+
end

test/runtests.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ for test_file in [
55
"test_events.jl",
66
"test_operators.jl",
77
"test_simulation.jl",
8-
"test_process.jl",
8+
"test_processes.jl",
9+
"test_coroutines.jl",
910
"test_containers.jl",
1011
"test_stores.jl",
1112
]

0 commit comments

Comments
 (0)