class Concurrent::BufferedChannel
def buffer_queue_size
def buffer_queue_size @mutex.synchronize { @buffer.count } end
def initialize(size)
def initialize(size) @mutex = Mutex.new @condition = Condition.new @buffer_condition = Condition.new @probe_set = WaitableList.new @buffer = RingBuffer.new(size) end
def peek_buffer
def peek_buffer @buffer_condition.wait(@mutex) while @buffer.empty? @buffer.peek end
def pop
def pop probe = Channel::Probe.new select(probe) probe.value end
def probe_set_size
def probe_set_size @probe_set.size end
def push(value)
def push(value) until set_probe_or_push_into_buffer(value) end end
def push_into_buffer(value)
def push_into_buffer(value) @buffer_condition.wait(@mutex) while @buffer.full? @buffer.offer value @buffer_condition.broadcast end
def remove_probe(probe)
def remove_probe(probe) @probe_set.delete(probe) end
def select(probe)
def select(probe) @mutex.synchronize do if @buffer.empty? @probe_set.put(probe) true else shift_buffer if probe.set_unless_assigned(peek_buffer, self) end end end
def set_probe_or_push_into_buffer(value)
def set_probe_or_push_into_buffer(value) @mutex.synchronize do if @probe_set.empty? push_into_buffer(value) true else @probe_set.take.set_unless_assigned(value, self) end end end
def shift_buffer
def shift_buffer @buffer_condition.wait(@mutex) while @buffer.empty? result = @buffer.poll @buffer_condition.broadcast result end