module Redis::Commands::Streams
def _xread(args, keys, ids, blocking_timeout_msec)
def _xread(args, keys, ids, blocking_timeout_msec) keys = keys.is_a?(Array) ? keys : [keys] ids = ids.is_a?(Array) ? ids : [ids] args << 'STREAMS' args.concat(keys) args.concat(ids) if blocking_timeout_msec.nil? send_command(args, &HashifyStreams) elsif blocking_timeout_msec.to_f.zero? send_blocking_command(args, 0, &HashifyStreams) else send_blocking_command(args, blocking_timeout_msec.to_f / 1_000, &HashifyStreams) end end
def xack(key, group, *ids)
-
(Integer)
- the number of entries successfully acknowledged
Parameters:
-
ids
(Array
) -- one or multiple entry ids -
group
(String
) -- the consumer group name -
key
(String
) -- the stream key
Other tags:
- Example: With arrayed entry ids -
Example: With splatted entry ids -
Example: With a entry id -
def xack(key, group, *ids) args = [:xack, key, group].concat(ids.flatten) send_command(args) end
def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*')
-
(String)
- the entry id
Options Hash:
(**opts)
-
:nomkstream
(Boolean
) -- whether to add NOMKSTREAM, default is not to add -
:approximate
(Boolean
) -- whether to add `~` modifier of maxlen/minid or not -
:minid
(Integer
) -- min id of entries to keep -
:maxlen
(Integer
) -- max length of entries to keep -
:id
(String
) -- the entry id, default value is `*`, it means auto generation
Parameters:
-
opts
(Hash
) -- several options for `XADD` command -
entry
(Hash
) -- one or multiple field-value pairs -
key
(String
) -- the stream key
Other tags:
- Example: With options -
Example: Without options -
def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*') args = [:xadd, key] args << 'NOMKSTREAM' if nomkstream if maxlen raise ArgumentError, "can't supply both maxlen and minid" if minid args << "MAXLEN" args << "~" if approximate args << maxlen elsif minid args << "MINID" args << "~" if approximate args << minid end args << id args.concat(entry.flatten) send_command(args) end
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
-
(Array
- the entry ids successfully claimed if justid option is `true`) -
(Hash{String => Hash})
- the entries successfully claimed
Parameters:
-
justid
(Boolean
) -- whether to fetch just an array of entry ids or not. -
count
(Integer
) -- number of messages to claim (default 1) -
start
(String
) -- entry id to start scanning from or 0-0 for everything -
min_idle_time
(Integer
) -- the number of milliseconds -
consumer
(String
) -- the consumer name -
group
(String
) -- the consumer group name -
key
(String
) -- the stream key
Other tags:
- Example: Claim next pending message after this id stuck > 5 minutes and mark as retry -
Example: Claim next pending message stuck > 5 minutes and don't mark as retry -
Example: Claim 50 next pending messages stuck > 5 minutes and mark as retry -
Example: Claim next pending message stuck > 5 minutes and mark as retry -
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) args = [:xautoclaim, key, group, consumer, min_idle_time, start] if count args << 'COUNT' << count.to_s end args << 'JUSTID' if justid blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim send_command(args, &blk) end
def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
-
(Array
- the entry ids successfully claimed if justid option is `true`) -
(Hash{String => Hash})
- the entries successfully claimed
Options Hash:
(**opts)
-
:justid
(Boolean
) -- whether to fetch just an array of entry ids or not -
:force
(Boolean
) -- whether to create the pending entry to the pending entries list or not -
:retrycount
(Integer
) -- the number of retry counter -
:time
(Integer
) -- the number of milliseconds as a specific Unix Epoch time -
:idle
(Integer
) -- the number of milliseconds as last time it was delivered of the entry
Parameters:
-
opts
(Hash
) -- several options for `XCLAIM` command -
ids
(Array
) -- one or multiple entry ids -
min_idle_time
(Integer
) -- the number of milliseconds -
consumer
(String
) -- the consumer name -
group
(String
) -- the consumer group name -
key
(String
) -- the stream key
Other tags:
- Example: With justid option -
Example: With force option -
Example: With retrycount option -
Example: With time option -
Example: With idle option -
Example: With arrayed entry ids -
Example: With splatted entry ids -
def xclaim(key, group, consumer, min_idle_time, *ids, **opts) args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten) args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle] args.concat(['TIME', opts[:time].to_i]) if opts[:time] args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount] args << 'FORCE' if opts[:force] args << 'JUSTID' if opts[:justid] blk = opts[:justid] ? Noop : HashifyStreamEntries send_command(args, &blk) end
def xdel(key, *ids)
-
(Integer)
- the number of entries actually deleted
Parameters:
-
ids
(Array
) -- one or multiple entry ids -
key
(String
) -- the stream key
Other tags:
- Example: With arrayed entry ids -
Example: With splatted entry ids -
def xdel(key, *ids) args = [:xdel, key].concat(ids.flatten) send_command(args) end
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
-
(Integer)
- effected count if subcommand is `destroy` or `delconsumer` -
(String)
- `OK` if subcommand is `create` or `setid`
Parameters:
-
mkstream
(Boolean
) -- whether to create an empty stream automatically or not -
id_or_consumer
(String
) -- -
group
(String
) -- the consumer group name -
key
(String
) -- the stream key -
subcommand
(String
) -- `create` `setid` `destroy` `delconsumer`
Other tags:
- Example: With `delconsumer` subcommand -
Example: With `destroy` subcommand -
Example: With `setid` subcommand -
Example: With `create` subcommand -
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact send_command(args) end
def xinfo(subcommand, key, group = nil)
-
(Array
- information of the consumers if subcommand is `consumers`) -
(Array
- information of the consumer groups if subcommand is `groups`) -
(Hash)
- information of the stream if subcommand is `stream`
Parameters:
-
group
(String
) -- the consumer group name, required if subcommand is `consumers` -
key
(String
) -- the stream key -
subcommand
(String
) -- e.g. `stream` `groups` `consumers`
Other tags:
- Example: consumers -
Example: groups -
Example: stream -
def xinfo(subcommand, key, group = nil) args = [:xinfo, subcommand, key, group].compact block = case subcommand.to_s.downcase when 'stream' then Hashify when 'groups', 'consumers' then proc { |r| r.map(&Hashify) } end send_command(args, &block) end
def xlen(key)
-
(Integer)
- the number of entries
Parameters:
-
key
(String
) -- the stream key
Other tags:
- Example: With key -
def xlen(key) send_command([:xlen, key]) end
def xpending(key, group, *args, idle: nil)
-
(Array
- the pending entries details if options were specified) -
(Hash)
- the summary of pending entries
Options Hash:
(**opts)
-
:idle
(Integer
) -- pending message minimum idle time in milliseconds
Parameters:
-
consumer
(String
) -- the consumer name -
count
(Integer
) -- count the number of entries as limit -
end
(String
) -- end last entry id of range -
start
(String
) -- start first entry id of range -
group
(String
) -- the consumer group name -
key
(String
) -- the stream key
Other tags:
- Example: With range and consumer options -
Example: With range and idle time options -
Example: With range options -
Example: With key and group -
def xpending(key, group, *args, idle: nil) command_args = [:xpending, key, group] command_args << 'IDLE' << Integer(idle) if idle case args.size when 0, 3, 4 command_args.concat(args) else raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)" end summary_needed = args.empty? blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails send_command(command_args, &blk) end
def xrange(key, start = '-', range_end = '+', count: nil)
-
(Array
- the ids and entries pairs>)
Parameters:
-
count
(Integer
) -- the number of entries as limit -
end
(String
) -- last entry id of range, default value is `+` -
start
(String
) -- first entry id of range, default value is `-` -
key
(String
) -- the stream key
Other tags:
- Example: With count options -
Example: With a specific start and end -
Example: With a specific start -
Example: Without options -
def xrange(key, start = '-', range_end = '+', count: nil) args = [:xrange, key, start, range_end] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end
def xread(keys, ids, count: nil, block: nil)
-
(Hash{String => Hash{String => Hash}})
- the entries
Parameters:
-
block
(Integer
) -- the number of milliseconds as blocking timeout -
count
(Integer
) -- the number of entries as limit per stream -
ids
(Array
) -- one or multiple entry ids -
keys
(Array
) -- one or multiple stream keys
Other tags:
- Example: With block option -
Example: With count option -
Example: With multiple keys -
Example: With a key -
def xread(keys, ids, count: nil, block: nil) args = [:xread] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block _xread(args, keys, ids, block) end
def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)
-
(Hash{String => Hash{String => Hash}})
- the entries
Options Hash:
(**opts)
-
:noack
(Boolean
) -- whether message loss is acceptable or not -
:block
(Integer
) -- the number of milliseconds as blocking timeout -
:count
(Integer
) -- the number of entries as limit
Parameters:
-
opts
(Hash
) -- several options for `XREADGROUP` command -
ids
(Array
) -- one or multiple entry ids -
keys
(Array
) -- one or multiple stream keys -
consumer
(String
) -- the consumer name -
group
(String
) -- the consumer group name
Other tags:
- Example: With noack option -
Example: With block option -
Example: With count option -
Example: With multiple keys -
Example: With a key -
def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) args = [:xreadgroup, 'GROUP', group, consumer] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block args << 'NOACK' if noack _xread(args, keys, ids, block) end
def xrevrange(key, range_end = '+', start = '-', count: nil)
-
(Array
- the ids and entries pairs>)
Parameters:
-
start
(String
) -- last entry id of range, default value is `-` -
end
(String
) -- first entry id of range, default value is `+` -
key
(String
) -- the stream key
Other tags:
- Example: With count options -
Example: With a specific end and start -
Example: With a specific end -
Example: Without options -
def xrevrange(key, range_end = '+', start = '-', count: nil) args = [:xrevrange, key, range_end, start] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end
def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil)
-
(Integer)
- the number of entries actually deleted
Parameters:
-
limit
(Integer
) -- maximum count of entries to be evicted -
approximate
(Boolean
) -- whether to add `~` modifier of minid or not -
strategy
(String
) -- the limit strategy, must be MINID -
minid
(String
) -- minimum id of entries -
key
(String
) -- the stream key -
limit
(Integer
) -- maximum count of entries to be evicted -
approximate
(Boolean
) -- whether to add `~` modifier of maxlen or not -
strategy
(String
) -- the limit strategy, must be MAXLEN -
maxlen
(Integer
) -- max length of entries -
key
(String
) -- the stream key
Overloads:
-
xtrim(key, minid, strategy: 'MINID', approximate: true)
-
xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true)
Other tags:
- Example: With strategy -
Example: With options -
Example: Without options -
def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil) strategy = strategy.to_s.upcase args = [:xtrim, key, strategy] args << '~' if approximate args << len_or_id args.concat(['LIMIT', limit]) if limit send_command(args) end