Skip to content

Commit

Permalink
xadd/xack/xread/xreadgroup/xlen/xdel do not retry by default
Browse files Browse the repository at this point in the history
  • Loading branch information
eyjian committed Sep 6, 2022
1 parent 766c53d commit 7a271cf
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions r3c.h
Original file line number Diff line number Diff line change
Expand Up @@ -929,8 +929,8 @@ class CRedisClient
public: // STREAM (key like kafka's topic), available since 5.0.0.
// Removes one or multiple messages from the pending entries list (PEL) of a stream consumer group.
// The command returns the number of messages successfully acknowledged.
int xack(const std::string& key, const std::string& groupname, const std::vector<std::string>& ids, Node* which=NULL, int num_retries=NUM_RETRIES);
int xack(const std::string& key, const std::string& groupname, const std::string& id, Node* which=NULL, int num_retries=NUM_RETRIES);
int xack(const std::string& key, const std::string& groupname, const std::vector<std::string>& ids, Node* which=NULL, int num_retries=0);
int xack(const std::string& key, const std::string& groupname, const std::string& id, Node* which=NULL, int num_retries=0);

// Returns the ID of the added entry. The ID is the one auto-
// generated if * is passed as ID argument, otherwise the command just
Expand All @@ -939,10 +939,10 @@ class CRedisClient
// c '~' or '='
std::string xadd(const std::string& key, const std::string& id,
const std::vector<FVPair>& values, int64_t maxlen, char c,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
std::string xadd(const std::string& key, const std::string& id,
const std::vector<FVPair>& values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// Create a new consumer group associated with a stream.
// There are no hard limits to the number of consumer groups you can associate to a given stream.
Expand Down Expand Up @@ -972,22 +972,22 @@ class CRedisClient
// NOTICE: The size of keys should be equal to the size of ids.
void xread(const std::vector<std::string>& keys, const std::vector<std::string>& ids,
int64_t count, int64_t block_milliseconds, std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xread(const std::vector<std::string>& keys, const std::vector<std::string>& ids,
int64_t count, std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xread(const std::vector<std::string>& keys, const std::vector<std::string>& ids,
std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// Only read one key
void xread(const std::string& key, const std::string& id,
int64_t count, int64_t block_milliseconds, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
// Use '>' as id
void xread(const std::string& key,
int64_t count, int64_t block_milliseconds, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// The consumer name is the string that is used by the client to identify itself inside the group.
// The consumer is auto created inside the consumer group the first time it is saw.
Expand Down Expand Up @@ -1016,53 +1016,55 @@ class CRedisClient
void xreadgroup(const std::string& groupname, const std::string& consumername,
const std::vector<std::string>& keys, const std::vector<std::string>& ids,
int64_t count, int64_t block_milliseconds, bool noack, std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xreadgroup(const std::string& groupname,
const std::string& consumername, const std::vector<std::string>& keys,
const std::vector<std::string>& ids, int64_t count, bool noack, std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xreadgroup(const std::string& groupname, const std::string& consumername,
const std::vector<std::string>& keys, const std::vector<std::string>& ids,
bool noack, std::vector<Stream>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xreadgroup(const std::string& groupname, const std::string& consumername,
const std::string& key, const std::string& id,
int64_t count, int64_t block_milliseconds, bool noack, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
// Use '>' as id
void xreadgroup(const std::string& groupname, const std::string& consumername,
const std::string& key, int64_t count, int64_t block_milliseconds,
bool noack, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// Removes the specified entries from a stream, and returns the number of
// entries deleted, that may be different from the number of IDs passed to the
// command in case certain IDs do not exist.
int xdel(const std::string& key, const std::vector<std::string>& ids, Node* which=NULL, int num_retries=NUM_RETRIES);
int xdel(const std::string& key, const std::string& id, Node* which=NULL, int num_retries=NUM_RETRIES);
int xdel(const std::string& key, const std::vector<std::string>& ids, Node* which=NULL, int num_retries=0);
int xdel(const std::string& key, const std::string& id, Node* which=NULL, int num_retries=0);

// Trims the stream to a given number of items, evicting older items (items with lower IDs) if needed
// Returns the number of entries deleted from the stream
int64_t xtrim(const std::string& key, int64_t maxlen, char c, Node* which=NULL, int num_retries=NUM_RETRIES);
int64_t xtrim(const std::string& key, int64_t maxlen, Node* which=NULL, int num_retries=NUM_RETRIES);
int64_t xtrim(const std::string& key, int64_t maxlen, char c, Node* which=NULL, int num_retries=0);
int64_t xtrim(const std::string& key, int64_t maxlen, Node* which=NULL, int num_retries=0);

// Returns the number of entries inside a stream. If the specified key does not
// exist the command returns zero, as if the stream was empty.
int64_t xlen(const std::string& key, Node* which=NULL, int num_retries=NUM_RETRIES);
int64_t xlen(const std::string& key, Node* which=NULL, int num_retries=0);

// Returns the stream entries matching a given range of IDs
// start The '-' special ID mean respectively the minimum ID possible inside a stream
// end The '+' special ID mean respectively the maximum ID possible inside a stream
void xrange(const std::string& key,
const std::string& start, const std::string& end, int64_t count, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
void xrange(const std::string& key, const std::string& start, const std::string& end, std::vector<StreamEntry>* values, Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xrange(const std::string& key,
const std::string& start, const std::string& end, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=0);
void xrevrange(const std::string& key,
const std::string& end, const std::string& start, int64_t count, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xrevrange(const std::string& key,
const std::string& end, const std::string& start, std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// Fetching data from a stream via a consumer group,
// and not acknowledging such data, has the effect of creating pending entries.
Expand All @@ -1071,19 +1073,21 @@ class CRedisClient
int xpending(const std::string& key, const std::string& groupname,
const std::string& start, const std::string& end, int count, const std::string& consumer,
std::vector<struct DetailedPending>* pendings,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
int xpending(const std::string& key, const std::string& groupname,
const std::string& start, const std::string& end, int count,
std::vector<struct DetailedPending>* pendings,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// start the smallest ID among the pending messages
// end the greatest ID among the pending messages
// count the total number of pending messages for this consumer group
// consumers the list of consumers in the consumer group with at least one pending message
//
// Returns the total number of pending messages for this consumer group
int xpending(const std::string& key, const std::string& groupname, struct GroupPending* groups, Node* which=NULL, int num_retries=NUM_RETRIES);
int xpending(const std::string& key,
const std::string& groupname, struct GroupPending* groups,
Node* which=NULL, int num_retries=0);

// Gets ownership of one or multiple messages in the Pending Entries List
// of a given stream consumer group.
Expand All @@ -1092,23 +1096,23 @@ class CRedisClient
int64_t minidle, const std::vector<std::string>& ids,
int64_t idletime, int64_t unixtime, int64_t retrycount, bool force,
std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xclaim(
const std::string& key, const std::string& groupname, const std::string& consumer,
int64_t minidle, const std::vector<std::string>& ids,
std::vector<StreamEntry>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xclaim(
const std::string& key, const std::string& groupname, const std::string& consumer,
int64_t minidle, const std::vector<std::string>& ids,
int64_t idletime, int64_t unixtime, int64_t retrycount, bool force,
std::vector<std::string>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);
void xclaim(
const std::string& key, const std::string& groupname, const std::string& consumer,
int64_t minidle, const std::vector<std::string>& ids,
std::vector<std::string>* values,
Node* which=NULL, int num_retries=NUM_RETRIES);
Node* which=NULL, int num_retries=0);

// Get the list of every consumer in a specific consumer group
int xinfo_consumers(const std::string& key, const std::string& groupname, std::vector<struct ConsumerInfo>* infos, Node* which=NULL, int num_retries=NUM_RETRIES);
Expand Down

0 comments on commit 7a271cf

Please sign in to comment.