class OpenAI::Stream
def call(chunk, _bytes, env = nil)
def call(chunk, _bytes, env = nil) handle_http_error(chunk: chunk, env: env) if env && env.status != 200 parser.feed(chunk) do |event, data| next if data == DONE args = [JSON.parse(data), event].first(user_proc_arity) user_proc.call(*args) end end
def handle_http_error(chunk:, env:)
def handle_http_error(chunk:, env:) raise_error = Faraday::Response::RaiseError.new raise_error.on_complete(env.merge(body: try_parse_json(chunk))) end
def initialize(user_proc:, parser: EventStreamParser::Parser.new)
def initialize(user_proc:, parser: EventStreamParser::Parser.new) @user_proc = user_proc @parser = parser # To be backwards compatible, we need to check how many arguments the user_proc takes. @user_proc_arity = case user_proc when Proc user_proc.arity.abs else user_proc.method(:call).arity.abs end end
def to_proc
def to_proc method(:call).to_proc end
def try_parse_json(maybe_json)
def try_parse_json(maybe_json) JSON.parse(maybe_json) rescue JSON::ParserError maybe_json end