module Redis::Commands::Streams

def _xread(args, keys, ids, blocking_timeout_msec)

def _xread(args, keys, ids, blocking_timeout_msec)
  keys = keys.is_a?(Array) ? keys : [keys]
  ids = ids.is_a?(Array) ? ids : [ids]
  args << 'STREAMS'
  args.concat(keys)
  args.concat(ids)
  if blocking_timeout_msec.nil?
    send_command(args, &HashifyStreams)
  elsif blocking_timeout_msec.to_f.zero?
    send_blocking_command(args, 0, &HashifyStreams)
  else
    send_blocking_command(args, blocking_timeout_msec.to_f / 1_000, &HashifyStreams)
  end
end

def xack(key, group, *ids)

Returns:
  • (Integer) - the number of entries successfully acknowledged

Parameters:
  • ids (Array) -- one or multiple entry ids
  • group (String) -- the consumer group name
  • key (String) -- the stream key

Other tags:
    Example: With arrayed entry ids -
    Example: With splatted entry ids -
    Example: With a entry id -
def xack(key, group, *ids)
  args = [:xack, key, group].concat(ids.flatten)
  send_command(args)
end

def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*')

Returns:
  • (String) - the entry id

Options Hash: (**opts)
  • :nomkstream (Boolean) -- whether to add NOMKSTREAM, default is not to add
  • :approximate (Boolean) -- whether to add `~` modifier of maxlen/minid or not
  • :minid (Integer) -- min id of entries to keep
  • :maxlen (Integer) -- max length of entries to keep
  • :id (String) -- the entry id, default value is `*`, it means auto generation

Parameters:
  • opts (Hash) -- several options for `XADD` command
  • entry (Hash) -- one or multiple field-value pairs
  • key (String) -- the stream key

Other tags:
    Example: With options -
    Example: Without options -
def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*')
  args = [:xadd, key]
  args << 'NOMKSTREAM' if nomkstream
  if maxlen
    raise ArgumentError, "can't supply both maxlen and minid" if minid
    args << "MAXLEN"
    args << "~" if approximate
    args << maxlen
  elsif minid
    args << "MINID"
    args << "~" if approximate
    args << minid
  end
  args << id
  args.concat(entry.flatten)
  send_command(args)
end

def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)

Returns:
  • (Array) - the entry ids successfully claimed if justid option is `true`
  • (Hash{String => Hash}) - the entries successfully claimed

Parameters:
  • justid (Boolean) -- whether to fetch just an array of entry ids or not.
  • count (Integer) -- number of messages to claim (default 1)
  • start (String) -- entry id to start scanning from or 0-0 for everything
  • min_idle_time (Integer) -- the number of milliseconds
  • consumer (String) -- the consumer name
  • group (String) -- the consumer group name
  • key (String) -- the stream key

Other tags:
    Example: Claim next pending message after this id stuck > 5 minutes and mark as retry -
    Example: Claim next pending message stuck > 5 minutes and don't mark as retry -
    Example: Claim 50 next pending messages stuck > 5 minutes and mark as retry -
    Example: Claim next pending message stuck > 5 minutes and mark as retry -
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
  args = [:xautoclaim, key, group, consumer, min_idle_time, start]
  if count
    args << 'COUNT' << count.to_s
  end
  args << 'JUSTID' if justid
  blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
  send_command(args, &blk)
end

def xclaim(key, group, consumer, min_idle_time, *ids, **opts)

Returns:
  • (Array) - the entry ids successfully claimed if justid option is `true`
  • (Hash{String => Hash}) - the entries successfully claimed

Options Hash: (**opts)
  • :justid (Boolean) -- whether to fetch just an array of entry ids or not
  • :force (Boolean) -- whether to create the pending entry to the pending entries list or not
  • :retrycount (Integer) -- the number of retry counter
  • :time (Integer) -- the number of milliseconds as a specific Unix Epoch time
  • :idle (Integer) -- the number of milliseconds as last time it was delivered of the entry

Parameters:
  • opts (Hash) -- several options for `XCLAIM` command
  • ids (Array) -- one or multiple entry ids
  • min_idle_time (Integer) -- the number of milliseconds
  • consumer (String) -- the consumer name
  • group (String) -- the consumer group name
  • key (String) -- the stream key

Other tags:
    Example: With justid option -
    Example: With force option -
    Example: With retrycount option -
    Example: With time option -
    Example: With idle option -
    Example: With arrayed entry ids -
    Example: With splatted entry ids -
def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
  args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
  args.concat(['IDLE',       opts[:idle].to_i])  if opts[:idle]
  args.concat(['TIME',       opts[:time].to_i])  if opts[:time]
  args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
  args << 'FORCE'                                if opts[:force]
  args << 'JUSTID'                               if opts[:justid]
  blk = opts[:justid] ? Noop : HashifyStreamEntries
  send_command(args, &blk)
end

def xdel(key, *ids)

Returns:
  • (Integer) - the number of entries actually deleted

Parameters:
  • ids (Array) -- one or multiple entry ids
  • key (String) -- the stream key

Other tags:
    Example: With arrayed entry ids -
    Example: With splatted entry ids -
def xdel(key, *ids)
  args = [:xdel, key].concat(ids.flatten)
  send_command(args)
end

def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)

Returns:
  • (Integer) - effected count if subcommand is `destroy` or `delconsumer`
  • (String) - `OK` if subcommand is `create` or `setid`

Parameters:
  • mkstream (Boolean) -- whether to create an empty stream automatically or not
  • id_or_consumer (String) --
  • group (String) -- the consumer group name
  • key (String) -- the stream key
  • subcommand (String) -- `create` `setid` `destroy` `delconsumer`

Other tags:
    Example: With `delconsumer` subcommand -
    Example: With `destroy` subcommand -
    Example: With `setid` subcommand -
    Example: With `create` subcommand -
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
  args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
  send_command(args)
end

def xinfo(subcommand, key, group = nil)

Returns:
  • (Array) - information of the consumers if subcommand is `consumers`
  • (Array) - information of the consumer groups if subcommand is `groups`
  • (Hash) - information of the stream if subcommand is `stream`

Parameters:
  • group (String) -- the consumer group name, required if subcommand is `consumers`
  • key (String) -- the stream key
  • subcommand (String) -- e.g. `stream` `groups` `consumers`

Other tags:
    Example: consumers -
    Example: groups -
    Example: stream -
def xinfo(subcommand, key, group = nil)
  args = [:xinfo, subcommand, key, group].compact
  block = case subcommand.to_s.downcase
  when 'stream'              then Hashify
  when 'groups', 'consumers' then proc { |r| r.map(&Hashify) }
  end
  send_command(args, &block)
end

def xlen(key)

Returns:
  • (Integer) - the number of entries

Parameters:
  • key (String) -- the stream key

Other tags:
    Example: With key -
def xlen(key)
  send_command([:xlen, key])
end

def xpending(key, group, *args, idle: nil)

Returns:
  • (Array) - the pending entries details if options were specified
  • (Hash) - the summary of pending entries

Options Hash: (**opts)
  • :idle (Integer) -- pending message minimum idle time in milliseconds

Parameters:
  • consumer (String) -- the consumer name
  • count (Integer) -- count the number of entries as limit
  • end (String) -- end last entry id of range
  • start (String) -- start first entry id of range
  • group (String) -- the consumer group name
  • key (String) -- the stream key

Other tags:
    Example: With range and consumer options -
    Example: With range and idle time options -
    Example: With range options -
    Example: With key and group -
def xpending(key, group, *args, idle: nil)
  command_args = [:xpending, key, group]
  command_args << 'IDLE' << Integer(idle) if idle
  case args.size
  when 0, 3, 4
    command_args.concat(args)
  else
    raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)"
  end
  summary_needed = args.empty?
  blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
  send_command(command_args, &blk)
end

def xrange(key, start = '-', range_end = '+', count: nil)

Returns:
  • (Array>) - the ids and entries pairs

Parameters:
  • count (Integer) -- the number of entries as limit
  • end (String) -- last entry id of range, default value is `+`
  • start (String) -- first entry id of range, default value is `-`
  • key (String) -- the stream key

Other tags:
    Example: With count options -
    Example: With a specific start and end -
    Example: With a specific start -
    Example: Without options -
def xrange(key, start = '-', range_end = '+', count: nil)
  args = [:xrange, key, start, range_end]
  args.concat(['COUNT', count]) if count
  send_command(args, &HashifyStreamEntries)
end

def xread(keys, ids, count: nil, block: nil)

Returns:
  • (Hash{String => Hash{String => Hash}}) - the entries

Parameters:
  • block (Integer) -- the number of milliseconds as blocking timeout
  • count (Integer) -- the number of entries as limit per stream
  • ids (Array) -- one or multiple entry ids
  • keys (Array) -- one or multiple stream keys

Other tags:
    Example: With block option -
    Example: With count option -
    Example: With multiple keys -
    Example: With a key -
def xread(keys, ids, count: nil, block: nil)
  args = [:xread]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  _xread(args, keys, ids, block)
end

def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)

Returns:
  • (Hash{String => Hash{String => Hash}}) - the entries

Options Hash: (**opts)
  • :noack (Boolean) -- whether message loss is acceptable or not
  • :block (Integer) -- the number of milliseconds as blocking timeout
  • :count (Integer) -- the number of entries as limit

Parameters:
  • opts (Hash) -- several options for `XREADGROUP` command
  • ids (Array) -- one or multiple entry ids
  • keys (Array) -- one or multiple stream keys
  • consumer (String) -- the consumer name
  • group (String) -- the consumer group name

Other tags:
    Example: With noack option -
    Example: With block option -
    Example: With count option -
    Example: With multiple keys -
    Example: With a key -
def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)
  args = [:xreadgroup, 'GROUP', group, consumer]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  args << 'NOACK' if noack
  _xread(args, keys, ids, block)
end

def xrevrange(key, range_end = '+', start = '-', count: nil)

Returns:
  • (Array>) - the ids and entries pairs

Parameters:
  • start (String) -- last entry id of range, default value is `-`
  • end (String) -- first entry id of range, default value is `+`
  • key (String) -- the stream key

Other tags:
    Example: With count options -
    Example: With a specific end and start -
    Example: With a specific end -
    Example: Without options -
def xrevrange(key, range_end = '+', start = '-', count: nil)
  args = [:xrevrange, key, range_end, start]
  args.concat(['COUNT', count]) if count
  send_command(args, &HashifyStreamEntries)
end

def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil)

Returns:
  • (Integer) - the number of entries actually deleted

Parameters:
  • limit (Integer) -- maximum count of entries to be evicted
  • approximate (Boolean) -- whether to add `~` modifier of minid or not
  • strategy (String) -- the limit strategy, must be MINID
  • minid (String) -- minimum id of entries
  • key (String) -- the stream key
  • limit (Integer) -- maximum count of entries to be evicted
  • approximate (Boolean) -- whether to add `~` modifier of maxlen or not
  • strategy (String) -- the limit strategy, must be MAXLEN
  • maxlen (Integer) -- max length of entries
  • key (String) -- the stream key

Overloads:
  • xtrim(key, minid, strategy: 'MINID', approximate: true)
  • xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true)

Other tags:
    Example: With strategy -
    Example: With options -
    Example: Without options -
def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil)
  strategy = strategy.to_s.upcase
  args = [:xtrim, key, strategy]
  args << '~' if approximate
  args << len_or_id
  args.concat(['LIMIT', limit]) if limit
  send_command(args)
end