Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/payload.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload
nrows = length(payload)
payload = first(waiting_room)
if payload.row_num == (nrows + row)
put!(queue, popfirst!(waiting_room))
_put_buffered_nolock!(queue, popfirst!(waiting_room))
else
break
end
Expand All @@ -52,21 +52,63 @@ function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expec
row = payload.row_num
if row == expected_row
_reenqueue_ordered!(queue, waiting_room, payload)
return false
return false # doesn't need to wait
end
insertsorted!(waiting_room, payload, x->x.row_num)
return true
return true # needs to wait
end

Base.put!(o::PayloadOrderer{B,C}, x::ParsedPayload{B,C}) where {B,C} = put!(o.queue, x)
Base.close(o::PayloadOrderer, excp::Exception=Base.closed_exception()) = close(o.queue, excp)
Base.isopen(o::PayloadOrderer) = isopen(o.queue)

# This is `take_buffered` from channels.jl, with the enclosing lock/unlock calls removed
function _take_buffered_nolock!(c::Channel)
while isempty(c.data)
Base.check_channel_state(c)
wait(c.cond_take)
end
v = popfirst!(c.data)
Base._increment_n_avail(c, -1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
return v
end

# This is `put_buffered` from channels.jl, with the enclosing lock/unlock calls removed
function _put_buffered_nolock!(c::Channel, v)
did_buffer = false
try
# Increment channel n_avail eagerly (before push!) to count data in the
# buffer as well as offers from tasks which are blocked in wait().
Base._increment_n_avail(c, 1)
while length(c.data) == c.sz_max
Base.check_channel_state(c)
wait(c.cond_put)
end
Base.check_channel_state(c)
push!(c.data, v)
did_buffer = true
# notify all, since some of the waiters may be on a "fetch" call.
notify(c.cond_take, nothing, true, false)
finally
# Decrement the available items if this task had an exception before pushing the
# item to the buffer (e.g., during `wait(c.cond_put)`):
did_buffer || Base._increment_n_avail(c, -1)
end
return v
end

# TODO: since we own our put and take methods, we can ditch the waiting room and just use the channls `data` field
function Base.take!(o::PayloadOrderer) # Blocks until a the next payload in order arrives
payload = take!(o.queue)
while _reorder!(o.queue, o.waititng_room, payload, o.expected_row)
payload = take!(o.queue)
lock(o.queue)
try
payload = _take_buffered_nolock!(o.queue)
while _reorder!(o.queue, o.waititng_room, payload, o.expected_row)
payload = _take_buffered_nolock!(o.queue)
end
o.expected_row = last_row(payload) + 1
return payload
finally
unlock(o.queue)
end
o.expected_row = last_row(payload) + 1
return payload
end