class Concurrent::BufferedChannel

def buffer_queue_size

def buffer_queue_size
  @mutex.synchronize { @buffer.count }
end

def initialize(size)

def initialize(size)
  @mutex = Mutex.new
  @condition = Condition.new
  @buffer_condition = Condition.new
  @probe_set = WaitableList.new
  @buffer = RingBuffer.new(size)
end

def peek_buffer

def peek_buffer
  @buffer_condition.wait(@mutex) while @buffer.empty?
  @buffer.peek
end

def pop

def pop
  probe = Channel::Probe.new
  select(probe)
  probe.value
end

def probe_set_size

def probe_set_size
  @probe_set.size
end

def push(value)

def push(value)
  until set_probe_or_push_into_buffer(value)
  end
end

def push_into_buffer(value)

def push_into_buffer(value)
  @buffer_condition.wait(@mutex) while @buffer.full?
  @buffer.offer value
  @buffer_condition.broadcast
end

def remove_probe(probe)

def remove_probe(probe)
  @probe_set.delete(probe)
end

def select(probe)

def select(probe)
  @mutex.synchronize do
    if @buffer.empty?
      @probe_set.put(probe)
      true
    else
      shift_buffer if probe.set_unless_assigned(peek_buffer, self)
    end
  end
end

def set_probe_or_push_into_buffer(value)

def set_probe_or_push_into_buffer(value)
  @mutex.synchronize do
    if @probe_set.empty?
      push_into_buffer(value)
      true
    else
      @probe_set.take.set_unless_assigned(value, self)
    end
  end
end

def shift_buffer

def shift_buffer
  @buffer_condition.wait(@mutex) while @buffer.empty?
  result = @buffer.poll
  @buffer_condition.broadcast
  result
end