diff --git a/src/payload.jl b/src/payload.jl index 6c4ac39..9730852 100644 --- a/src/payload.jl +++ b/src/payload.jl @@ -35,7 +35,7 @@ function _reenqueue_ordered!(queue::Channel{T}, waiting_room::Vector{T}, payload nrows = length(payload) payload = first(waiting_room) if payload.row_num == (nrows + row) - put!(queue, popfirst!(waiting_room)) + _put_buffered_nolock!(queue, popfirst!(waiting_room)) else break end @@ -52,21 +52,63 @@ function _reorder!(queue::Channel{T}, waiting_room::Vector{T}, payload::T, expec row = payload.row_num if row == expected_row _reenqueue_ordered!(queue, waiting_room, payload) - return false + return false # doesn't need to wait end insertsorted!(waiting_room, payload, x->x.row_num) - return true + return true # needs to wait end Base.put!(o::PayloadOrderer{B,C}, x::ParsedPayload{B,C}) where {B,C} = put!(o.queue, x) Base.close(o::PayloadOrderer, excp::Exception=Base.closed_exception()) = close(o.queue, excp) Base.isopen(o::PayloadOrderer) = isopen(o.queue) +# This is `take_buffered` from channels.jl, with the enclosing lock/unlock calls removed +function _take_buffered_nolock!(c::Channel) + while isempty(c.data) + Base.check_channel_state(c) + wait(c.cond_take) + end + v = popfirst!(c.data) + Base._increment_n_avail(c, -1) + notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. + return v +end + +# This is `put_buffered` from channels.jl, with the enclosing lock/unlock calls removed +function _put_buffered_nolock!(c::Channel, v) + did_buffer = false + try + # Increment channel n_avail eagerly (before push!) to count data in the + # buffer as well as offers from tasks which are blocked in wait(). + Base._increment_n_avail(c, 1) + while length(c.data) == c.sz_max + Base.check_channel_state(c) + wait(c.cond_put) + end + Base.check_channel_state(c) + push!(c.data, v) + did_buffer = true + # notify all, since some of the waiters may be on a "fetch" call. + notify(c.cond_take, nothing, true, false) + finally + # Decrement the available items if this task had an exception before pushing the + # item to the buffer (e.g., during `wait(c.cond_put)`): + did_buffer || Base._increment_n_avail(c, -1) + end + return v +end + +# TODO: since we own our put and take methods, we can ditch the waiting room and just use the channls `data` field function Base.take!(o::PayloadOrderer) # Blocks until a the next payload in order arrives - payload = take!(o.queue) - while _reorder!(o.queue, o.waititng_room, payload, o.expected_row) - payload = take!(o.queue) + lock(o.queue) + try + payload = _take_buffered_nolock!(o.queue) + while _reorder!(o.queue, o.waititng_room, payload, o.expected_row) + payload = _take_buffered_nolock!(o.queue) + end + o.expected_row = last_row(payload) + 1 + return payload + finally + unlock(o.queue) end - o.expected_row = last_row(payload) + 1 - return payload end