lib/openai/stream.rb
module OpenAI class Stream DONE = "[DONE]".freeze private_constant :DONE def initialize(user_proc:, parser: EventStreamParser::Parser.new) @user_proc = user_proc @parser = parser # To be backwards compatible, we need to check how many arguments the user_proc takes. @user_proc_arity = case user_proc when Proc user_proc.arity.abs else user_proc.method(:call).arity.abs end end def call(chunk, _bytes, env = nil) handle_http_error(chunk: chunk, env: env) if env && env.status != 200 parser.feed(chunk) do |event, data| next if data == DONE args = [JSON.parse(data), event].first(user_proc_arity) user_proc.call(*args) end end def to_proc method(:call).to_proc end private attr_reader :user_proc, :parser, :user_proc_arity def handle_http_error(chunk:, env:) raise_error = Faraday::Response::RaiseError.new raise_error.on_complete(env.merge(body: try_parse_json(chunk))) end def try_parse_json(maybe_json) JSON.parse(maybe_json) rescue JSON::ParserError maybe_json end end end