diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index a29b556675b3..0047b978d833 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -995,10 +995,11 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& return res; } -void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { VLOG(1) << "Del " << ArgS(args, 0); atomic_uint32_t result{0}; + auto* builder = cmd_cntx.rb; bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); auto cb = [&result](const Transaction* t, EngineShard* shard) { @@ -1009,10 +1010,10 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpStatus::OK; }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, status); - DVLOG(2) << "Del ts " << tx->txid(); + DVLOG(2) << "Del ts " << cmd_cntx.tx->txid(); uint32_t del_cnt = result.load(memory_order_relaxed); if (is_mc) { @@ -1029,18 +1030,16 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } } -void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) { + auto* rb = static_cast(cmd_cntx.rb); if (args.size() > 1) { - return builder->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType); + return rb->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType); } string_view msg; - auto* rb = static_cast(builder); - // If a client in the subscribe state and in resp2 mode, it returns an array for some reason. - if (cntx->conn_state.subscribe_info && !rb->IsResp3()) { + if (cmd_cntx.conn_cntx->conn_state.subscribe_info && !rb->IsResp3()) { if (args.size() == 1) { msg = ArgS(args, 0); } @@ -1050,16 +1049,16 @@ void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui } if (args.size() == 0) { - return builder->SendSimpleString("PONG"); - } else { - msg = ArgS(args, 0); - DVLOG(2) << "Ping " << msg; - - return rb->SendBulkString(msg); + return rb->SendSimpleString("PONG"); } + + msg = ArgS(args, 0); + DVLOG(2) << "Ping " << msg; + + return rb->SendBulkString(msg); } -void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Exists(CmdArgList args, const CommandContext& cmd_cntx) { VLOG(1) << "Exists " << ArgS(args, 0); atomic_uint32_t result{0}; @@ -1072,28 +1071,28 @@ void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* b return OpStatus::OK; }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, status); - return builder->SendLong(result.load(memory_order_acquire)); + return cmd_cntx.rb->SendLong(result.load(memory_order_acquire)); } -void GenericFamily::Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Persist(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpPersist(t->GetOpArgs(shard), key); }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); - builder->SendLong(status == OpStatus::OK); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); + cmd_cntx.rb->SendLong(status == OpStatus::OK); } -void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Expire(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view sec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { - return builder->SendError(kInvalidIntErr); + return cmd_cntx.rb->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, -1); @@ -1103,7 +1102,7 @@ void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* b int_arg = kMaxExpireDeadlineSec; } - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb); if (!expire_options) { return; } @@ -1113,21 +1112,21 @@ void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* b return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); - builder->SendLong(status == OpStatus::OK); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); + cmd_cntx.rb->SendLong(status == OpStatus::OK); } -void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::ExpireAt(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view sec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { - return builder->SendError(kInvalidIntErr); + return cmd_cntx.rb->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, 0L); - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb); if (!expire_options) { return; } @@ -1137,17 +1136,16 @@ void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return builder->SendError(kExpiryOutOfRange); + return cmd_cntx.rb->SendError(kExpiryOutOfRange); } - builder->SendLong(status == OpStatus::OK); + cmd_cntx.rb->SendLong(status == OpStatus::OK); } -void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void GenericFamily::Keys(CmdArgList args, const CommandContext& cmd_cntx) { string_view pattern(ArgS(args, 0)); uint64_t cursor = 0; @@ -1162,27 +1160,27 @@ void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui auto output_limit = absl::GetFlag(FLAGS_keys_output_limit); do { - cursor = ScanGeneric(cursor, scan_opts, &keys, cntx); + cursor = ScanGeneric(cursor, scan_opts, &keys, cmd_cntx.conn_cntx); } while (cursor != 0 && keys.size() < output_limit); - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); rb->StartArray(keys.size()); for (const auto& k : keys) { rb->SendBulkString(k); } } -void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::PexpireAt(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view msec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(msec, &int_arg)) { - return builder->SendError(kInvalidIntErr); + return cmd_cntx.rb->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, 0L); - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb); if (!expire_options) { return; } @@ -1194,22 +1192,22 @@ void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return builder->SendError(kExpiryOutOfRange); + return cmd_cntx.rb->SendError(kExpiryOutOfRange); } else { - builder->SendLong(status == OpStatus::OK); + cmd_cntx.rb->SendLong(status == OpStatus::OK); } } -void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Pexpire(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view msec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(msec, &int_arg)) { - return builder->SendError(kInvalidIntErr); + return cmd_cntx.rb->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, -1); @@ -1218,7 +1216,7 @@ void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* int_arg = kMaxExpireDeadlineMs; } - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb); if (!expire_options) { return; } @@ -1228,16 +1226,16 @@ void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return builder->SendError(kExpiryOutOfRange); + return cmd_cntx.rb->SendError(kExpiryOutOfRange); } - builder->SendLong(status == OpStatus::OK); + cmd_cntx.rb->SendLong(status == OpStatus::OK); } -void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - Transaction* transaction = tx; +void GenericFamily::Stick(CmdArgList args, const CommandContext& cmd_cntx) { + Transaction* transaction = cmd_cntx.tx; VLOG(1) << "Stick " << ArgS(args, 0); atomic_uint32_t result{0}; @@ -1256,7 +1254,7 @@ void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu DVLOG(2) << "Stick ts " << transaction->txid(); uint32_t match_cnt = result.load(memory_order_relaxed); - builder->SendLong(match_cnt); + cmd_cntx.rb->SendLong(match_cnt); } // Used to conditionally store double score @@ -1377,12 +1375,12 @@ OpResultTyped OpFetchSortEntries(const OpArgs& op_args, std::stri return success ? res : OpStatus::INVALID_NUMERIC_RESULT; } -void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Sort(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); bool alpha = false; bool reversed = false; std::optional> bounds; - + auto* builder = cmd_cntx.rb; for (size_t i = 1; i < args.size(); i++) { string arg = absl::AsciiStrToUpper(ArgS(args, i)); if (arg == "ALPHA") { @@ -1409,7 +1407,7 @@ void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui } OpResultTyped fetch_result = - tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { + cmd_cntx.tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha); }); @@ -1454,11 +1452,12 @@ void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui std::visit(std::move(sort_call), fetch_result.value()); } -void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Restore(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); std::string_view serialized_value = ArgS(args, 2); auto rdb_version = GetRdbVersion(serialized_value); + auto* builder = cmd_cntx.rb; if (!rdb_version) { return builder->SendError(kInvalidDumpValueErr); } @@ -1477,7 +1476,7 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* rdb_version.value()); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result) { if (result.value()) { @@ -1492,13 +1491,14 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* } } -void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::FieldExpire(CmdArgList args, const CommandContext& cmd_cntx) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view ttl_str = parser.Next(); uint32_t ttl_sec; + auto* rb = static_cast(cmd_cntx.rb); if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { - return builder->SendError(kInvalidIntErr); + return rb->SendError(kInvalidIntErr); } CmdArgList fields = parser.Tail(); @@ -1506,8 +1506,8 @@ void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuild return OpFieldExpire(t->GetOpArgs(shard), key, ttl_sec, fields); }; - OpResult> result = tx->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(builder); + OpResult> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + if (result) { rb->StartArray(result->size()); const auto& array = result.value(); @@ -1515,33 +1515,33 @@ void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuild rb->SendLong(v); } } else { - builder->SendError(result.status()); + rb->SendError(result.status()); } } // Returns -2 if key not found, WRONG_TYPE if key is not a set or hash // -1 if the field does not have associated TTL on it, and -3 if field is not found. -void GenericFamily::FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::FieldTtl(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { return OpFieldTtl(t, shard, key, field); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result) { - builder->SendLong(*result); + cmd_cntx.rb->SendLong(*result); return; } - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); } -void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Move(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view target_db_sv = ArgS(args, 1); int32_t target_db; - + auto* builder = cmd_cntx.rb; if (!absl::SimpleAtoi(target_db_sv, &target_db)) { return builder->SendError(kInvalidIntErr); } @@ -1550,7 +1550,7 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return builder->SendError(kDbIndOutOfRangeErr); } - if (target_db == tx->GetDbIndex()) { + if (target_db == cmd_cntx.tx->GetDbIndex()) { return builder->SendError("source and destination objects are the same"); } @@ -1570,20 +1570,20 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return OpStatus::OK; }; - tx->ScheduleSingleHop(std::move(cb)); + cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); // Exactly one shard will call OpMove. DCHECK(res != OpStatus::SKIPPED); builder->SendLong(res == OpStatus::OK); } -void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - auto reply = RenameGeneric(args, false, tx); - builder->SendError(reply); +void GenericFamily::Rename(CmdArgList args, const CommandContext& cmd_cntx) { + auto reply = RenameGeneric(args, false, cmd_cntx.tx); + cmd_cntx.rb->SendError(reply); } -void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - auto reply = RenameGeneric(args, true, tx); - +void GenericFamily::RenameNx(CmdArgList args, const CommandContext& cmd_cntx) { + auto reply = RenameGeneric(args, true, cmd_cntx.tx); + auto* builder = cmd_cntx.rb; if (!reply.status) { builder->SendError(reply); return; @@ -1599,26 +1599,26 @@ void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* } } -void GenericFamily::ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - ExpireTimeGeneric(args, TimeUnit::SEC, tx, builder); +void GenericFamily::ExpireTime(CmdArgList args, const CommandContext& cmd_cntx) { + ExpireTimeGeneric(args, TimeUnit::SEC, cmd_cntx.tx, cmd_cntx.rb); } -void GenericFamily::PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - ExpireTimeGeneric(args, TimeUnit::MSEC, tx, builder); +void GenericFamily::PExpireTime(CmdArgList args, const CommandContext& cmd_cntx) { + ExpireTimeGeneric(args, TimeUnit::MSEC, cmd_cntx.tx, cmd_cntx.rb); } -void GenericFamily::Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - TtlGeneric(args, TimeUnit::SEC, tx, builder); +void GenericFamily::Ttl(CmdArgList args, const CommandContext& cmd_cntx) { + TtlGeneric(args, TimeUnit::SEC, cmd_cntx.tx, cmd_cntx.rb); } -void GenericFamily::Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - TtlGeneric(args, TimeUnit::MSEC, tx, builder); +void GenericFamily::Pttl(CmdArgList args, const CommandContext& cmd_cntx) { + TtlGeneric(args, TimeUnit::MSEC, cmd_cntx.tx, cmd_cntx.rb); } -void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void GenericFamily::Select(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); int64_t index; + auto* builder = cmd_cntx.rb; if (!absl::SimpleAtoi(key, &index)) { return builder->SendError(kInvalidDbIndErr); } @@ -1628,7 +1628,7 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { return builder->SendError(kDbIndOutOfRangeErr); } - + auto* cntx = cmd_cntx.conn_cntx; if (cntx->conn_state.db_index == index) { // accept a noop. return builder->SendOk(); @@ -1649,22 +1649,23 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil return builder->SendOk(); } -void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Dump(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key; auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); }; + auto* rb = static_cast(cmd_cntx.rb); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(builder); if (result) { - DVLOG(1) << "Dump " << tx->DebugId() << ": " << key << ", dump size " << result.value().size(); + DVLOG(1) << "Dump " << cmd_cntx.tx->DebugId() << ": " << key << ", dump size " + << result.value().size(); rb->SendBulkString(*result); } else { rb->SendNull(); } } -void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Type(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { @@ -1676,40 +1677,39 @@ void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return OpStatus::KEY_NOTFOUND; } }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (!result) { - builder->SendSimpleString("none"); + cmd_cntx.rb->SendSimpleString("none"); } else { - builder->SendSimpleString(ObjTypeToString(result.value())); + cmd_cntx.rb->SendSimpleString(ObjTypeToString(result.value())); } } -void GenericFamily::Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void GenericFamily::Time(CmdArgList args, const CommandContext& cmd_cntx) { uint64_t now_usec; - if (tx) { - now_usec = tx->GetDbContext().time_now_ms * 1000; + if (cmd_cntx.tx) { + now_usec = cmd_cntx.tx->GetDbContext().time_now_ms * 1000; } else { now_usec = absl::GetCurrentTimeNanos() / 1000; } - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); rb->StartArray(2); rb->SendLong(now_usec / 1000000); rb->SendLong(now_usec % 1000000); } -void GenericFamily::Echo(CmdArgList args, Transaction*, SinkReplyBuilder* builder) { +void GenericFamily::Echo(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); return rb->SendBulkString(key); } // SCAN cursor [MATCH ] [TYPE ] [COUNT ] [BUCKET ] -void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void GenericFamily::Scan(CmdArgList args, const CommandContext& cmd_cntx) { string_view token = ArgS(args, 0); uint64_t cursor = 0; - + auto* builder = static_cast(cmd_cntx.rb); if (!absl::SimpleAtoi(token, &cursor)) { return builder->SendError("invalid cursor"); } @@ -1723,14 +1723,13 @@ void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui ScanOpts scan_op = ops.value(); StringVec keys; - cursor = ScanGeneric(cursor, scan_op, &keys, cntx); + cursor = ScanGeneric(cursor, scan_op, &keys, cmd_cntx.conn_cntx); - auto* rb = static_cast(builder); - rb->StartArray(2); - rb->SendBulkString(absl::StrCat(cursor)); - rb->StartArray(keys.size()); + builder->StartArray(2); + builder->SendBulkString(absl::StrCat(cursor)); + builder->StartArray(keys.size()); for (const auto& k : keys) { - rb->SendBulkString(k); + builder->SendBulkString(k); } } @@ -1746,12 +1745,12 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, const ShardArg return res; } -void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void GenericFamily::RandomKey(CmdArgList args, const CommandContext& cmd_cntx) { const static size_t kMaxAttempts = 3; absl::BitGen bitgen; atomic_size_t candidates_counter{0}; + auto* cntx = cmd_cntx.conn_cntx; DbContext db_cntx{cntx->ns, cntx->conn_state.db_index, GetCurrentTimeMs()}; ScanOpts scan_opts; scan_opts.limit = 3; // number of entries per shard @@ -1785,7 +1784,7 @@ void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* b auto candidates_count = candidates_counter.load(memory_order_relaxed); std::optional random_key = std::nullopt; auto random_idx = absl::Uniform(bitgen, 0, candidates_count); - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); for (const auto& candidate : candidates_collection) { if (random_idx >= candidate.size()) { random_idx -= candidate.size(); diff --git a/src/server/generic_family.h b/src/server/generic_family.h index c74e485bddf6..1af6ddd8dacd 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -22,6 +22,7 @@ using facade::OpResult; class ConnectionContext; class CommandRegistry; class Transaction; +struct CommandContext; class GenericFamily { public: @@ -34,41 +35,36 @@ class GenericFamily { private: using SinkReplyBuilder = facade::SinkReplyBuilder; - static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Del(CmdArgList args, const CommandContext& cmd_cntx); + static void Ping(CmdArgList args, const CommandContext& cmd_cntx); + static void Exists(CmdArgList args, const CommandContext& cmd_cntx); + static void Expire(CmdArgList args, const CommandContext& cmd_cntx); + static void ExpireAt(CmdArgList args, const CommandContext& cmd_cntx); + static void Persist(CmdArgList args, const CommandContext& cmd_cntx); + static void Keys(CmdArgList args, const CommandContext& cmd_cntx); + static void PexpireAt(CmdArgList args, const CommandContext& cmd_cntx); + static void Pexpire(CmdArgList args, const CommandContext& cmd_cntx); + static void Stick(CmdArgList args, const CommandContext& cmd_cntx); + static void Sort(CmdArgList args, const CommandContext& cmd_cntx); + static void Move(CmdArgList args, const CommandContext& cmd_cntx); - static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Rename(CmdArgList args, const CommandContext& cmd_cntx); + static void RenameNx(CmdArgList args, const CommandContext& cmd_cntx); + static void ExpireTime(CmdArgList args, const CommandContext& cmd_cntx); + static void PExpireTime(CmdArgList args, const CommandContext& cmd_cntx); + static void Ttl(CmdArgList args, const CommandContext& cmd_cntx); + static void Pttl(CmdArgList args, const CommandContext& cmd_cntx); - static void Echo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Select(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void Scan(CmdArgList args, Transaction*, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Echo(CmdArgList args, const CommandContext& cmd_cntx); + static void Select(CmdArgList args, const CommandContext& cmd_cntx); + static void Scan(CmdArgList args, const CommandContext& cmd_cntx); + static void Time(CmdArgList args, const CommandContext& cmd_cntx); + static void Type(CmdArgList args, const CommandContext& cmd_cntx); + static void Dump(CmdArgList args, const CommandContext& cmd_cntx); + static void Restore(CmdArgList args, const CommandContext& cmd_cntx); + static void RandomKey(CmdArgList args, const CommandContext& cmd_cntx); + static void FieldTtl(CmdArgList args, const CommandContext& cmd_cntx); + static void FieldExpire(CmdArgList args, const CommandContext& cmd_cntx); }; } // namespace dfly diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index c342855b9a89..b007e6fd61e0 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -125,7 +125,7 @@ OpResult AddToHll(const OpArgs& op_args, string_view key, CmdArgList values return std::min(updated, 1); } -void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void PFAdd(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -133,8 +133,8 @@ void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { return AddToHll(t->GetOpArgs(shard), key, args); }; - OpResult res = tx->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, builder); + OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, cmd_cntx.rb); } OpResult CountHllsSingle(const OpArgs& op_args, string_view key) { @@ -204,7 +204,7 @@ vector ConvertShardVector(const vector>& hlls) { return ptrs; } -OpResult PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +OpResult PFCountMulti(CmdArgList args, const CommandContext& cmd_cntx) { vector> hlls; hlls.resize(shard_set->size()); @@ -218,7 +218,7 @@ OpResult PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilde return result.status(); }; - tx->ScheduleSingleHop(std::move(cb)); + cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); vector ptrs = ConvertShardVector(hlls); int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size()); @@ -229,17 +229,17 @@ OpResult PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilde } } -void PFCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void PFCount(CmdArgList args, const CommandContext& cmd_cntx) { if (args.size() == 1) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return CountHllsSingle(t->GetOpArgs(shard), key); }; - OpResult res = tx->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, builder); + OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, cmd_cntx.rb); } else { - HandleOpValueResult(PFCountMulti(args, tx, builder), builder); + HandleOpValueResult(PFCountMulti(args, cmd_cntx), cmd_cntx.rb); } } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index a2f7eb78f1ef..7a8a61a4447b 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -909,19 +909,18 @@ void MoveGeneric(string_view src, string_view dest, ListDir src_dir, ListDir des } } -void RPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void RPopLPush(CmdArgList args, const CommandContext& cmd_cntx) { string_view src = ArgS(args, 0); string_view dest = ArgS(args, 1); - MoveGeneric(src, dest, ListDir::RIGHT, ListDir::LEFT, tx, builder); + MoveGeneric(src, dest, ListDir::RIGHT, ListDir::LEFT, cmd_cntx.tx, cmd_cntx.rb); } -void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void BRPopLPush(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; auto [src, dest] = parser.Next(); float timeout = parser.Next(); - + auto* builder = static_cast(cmd_cntx.rb); if (auto err = parser.Error(); err) return builder->SendError(err->MakeReply()); @@ -929,17 +928,17 @@ void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, return builder->SendError("timeout is negative"); BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT); - OpResult op_res = bpop_pusher.Run(unsigned(timeout * 1000), tx, cntx); + OpResult op_res = + bpop_pusher.Run(unsigned(timeout * 1000), cmd_cntx.tx, cmd_cntx.conn_cntx); - auto* rb = static_cast(builder); if (op_res) { - return rb->SendBulkString(*op_res); + return builder->SendBulkString(*op_res); } switch (op_res.status()) { case OpStatus::CANCELLED: case OpStatus::TIMED_OUT: - return rb->SendNull(); + return builder->SendNull(); break; default: @@ -948,13 +947,13 @@ void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, } } -void BLMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { +void BLMove(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; auto [src, dest] = parser.Next(); ListDir src_dir = ParseDir(&parser); ListDir dest_dir = ParseDir(&parser); float timeout = parser.Next(); - + auto* builder = static_cast(cmd_cntx.rb); if (auto err = parser.Error(); err) return builder->SendError(err->MakeReply()); @@ -962,17 +961,17 @@ void BLMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect return builder->SendError("timeout is negative"); BPopPusher bpop_pusher(src, dest, src_dir, dest_dir); - OpResult op_res = bpop_pusher.Run(unsigned(timeout * 1000), tx, cntx); + OpResult op_res = + bpop_pusher.Run(unsigned(timeout * 1000), cmd_cntx.tx, cmd_cntx.conn_cntx); - auto* rb = static_cast(builder); if (op_res) { - return rb->SendBulkString(*op_res); + return builder->SendBulkString(*op_res); } switch (op_res.status()) { case OpStatus::CANCELLED: case OpStatus::TIMED_OUT: - return rb->SendNull(); + return builder->SendNull(); break; default: @@ -1176,44 +1175,44 @@ void BPopGeneric(ListDir dir, CmdArgList args, Transaction* tx, SinkReplyBuilder } // namespace -void ListFamily::LPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PushGeneric(ListDir::LEFT, false, std::move(args), tx, builder); +void ListFamily::LPush(CmdArgList args, const CommandContext& cmd_cntx) { + return PushGeneric(ListDir::LEFT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::LPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PushGeneric(ListDir::LEFT, true, std::move(args), tx, builder); +void ListFamily::LPushX(CmdArgList args, const CommandContext& cmd_cntx) { + return PushGeneric(ListDir::LEFT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::LPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PopGeneric(ListDir::LEFT, std::move(args), tx, builder); +void ListFamily::LPop(CmdArgList args, const CommandContext& cmd_cntx) { + return PopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::RPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PushGeneric(ListDir::RIGHT, false, std::move(args), tx, builder); +void ListFamily::RPush(CmdArgList args, const CommandContext& cmd_cntx) { + return PushGeneric(ListDir::RIGHT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::RPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PushGeneric(ListDir::RIGHT, true, std::move(args), tx, builder); +void ListFamily::RPushX(CmdArgList args, const CommandContext& cmd_cntx) { + return PushGeneric(ListDir::RIGHT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::RPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return PopGeneric(ListDir::RIGHT, std::move(args), tx, builder); +void ListFamily::RPop(CmdArgList args, const CommandContext& cmd_cntx) { + return PopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb); } -void ListFamily::LLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LLen(CmdArgList args, const CommandContext& cmd_cntx) { auto key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result) { - builder->SendLong(result.value()); + cmd_cntx.rb->SendLong(result.value()); } else if (result.status() == OpStatus::KEY_NOTFOUND) { - builder->SendLong(0); + cmd_cntx.rb->SendLong(0); } else { - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); } } -void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LPos(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; auto [key, elem] = parser.Next(); @@ -1242,26 +1241,26 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde parser.Skip(1); } + auto* rb = static_cast(cmd_cntx.rb); if (rank == 0) - return builder->SendError(kInvalidIntErr); + return rb->SendError(kInvalidIntErr); if (auto err = parser.Error(); err) - return builder->SendError(err->MakeReply()); + return rb->SendError(err->MakeReply()); auto cb = [&](Transaction* t, EngineShard* shard) { return OpPos(t->GetOpArgs(shard), key, elem, rank, count, max_len); }; - Transaction* trans = tx; + Transaction* trans = cmd_cntx.tx; OpResult> result = trans->ScheduleSingleHopT(std::move(cb)); if (result.status() == OpStatus::WRONG_TYPE) { - return builder->SendError(result.status()); + return rb->SendError(result.status()); } else if (result.status() == OpStatus::INVALID_VALUE) { - return builder->SendError(result.status()); + return rb->SendError(result.status()); } - auto* rb = static_cast(builder); if (skip_count) { if (result->empty()) { rb->SendNull(); @@ -1269,7 +1268,7 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde rb->SendLong((*result)[0]); } } else { - SinkReplyBuilder::ReplyAggregator agg(builder); + SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(result->size()); const auto& array = result.value(); for (const auto& v : array) { @@ -1278,12 +1277,14 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde } } -void ListFamily::LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LIndex(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); std::string_view index_str = ArgS(args, 1); int32_t index; + auto* rb = static_cast(cmd_cntx.rb); + if (!absl::SimpleAtoi(index_str, &index)) { - builder->SendError(kInvalidIntErr); + rb->SendError(kInvalidIntErr); return; } @@ -1291,26 +1292,25 @@ void ListFamily::LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpIndex(t->GetOpArgs(shard), key, index); }; - auto* rb = static_cast(builder); - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result) { rb->SendBulkString(result.value()); } else if (result.status() == OpStatus::WRONG_TYPE) { - builder->SendError(result.status()); + rb->SendError(result.status()); } else { rb->SendNull(); } } /* LINSERT (BEFORE|AFTER) */ -void ListFamily::LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LInsert(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; string_view key = parser.Next(); InsertParam where = parser.MapNext("AFTER", INSERT_AFTER, "BEFORE", INSERT_BEFORE); auto [pivot, elem] = parser.Next(); - + auto* rb = static_cast(cmd_cntx.rb); if (auto err = parser.Error(); err) - return builder->SendError(err->MakeReply()); + return rb->SendError(err->MakeReply()); DCHECK(pivot.data() && elem.data()); @@ -1318,42 +1318,43 @@ void ListFamily::LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result || result == OpStatus::KEY_NOTFOUND) { - return builder->SendLong(result.value_or(0)); + return rb->SendLong(result.value_or(0)); } - builder->SendError(result.status()); + rb->SendError(result.status()); } -void ListFamily::LTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LTrim(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view s_str = ArgS(args, 1); string_view e_str = ArgS(args, 2); int32_t start, end; if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) { - builder->SendError(kInvalidIntErr); + cmd_cntx.rb->SendError(kInvalidIntErr); return; } auto cb = [&](Transaction* t, EngineShard* shard) { return OpTrim(t->GetOpArgs(shard), key, start, end); }; - OpStatus st = tx->ScheduleSingleHop(std::move(cb)); + OpStatus st = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); if (st == OpStatus::KEY_NOTFOUND) st = OpStatus::OK; - builder->SendError(st); + cmd_cntx.rb->SendError(st); } -void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LRange(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); std::string_view s_str = ArgS(args, 1); std::string_view e_str = ArgS(args, 2); int32_t start, end; + auto* rb = static_cast(cmd_cntx.rb); if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) { - builder->SendError(kInvalidIntErr); + rb->SendError(kInvalidIntErr); return; } @@ -1361,79 +1362,76 @@ void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpRange(t->GetOpArgs(shard), key, start, end); }; - auto res = tx->ScheduleSingleHopT(std::move(cb)); + auto res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (!res && res.status() != OpStatus::KEY_NOTFOUND) { - return builder->SendError(res.status()); + return rb->SendError(res.status()); } - auto* rb = static_cast(builder); rb->SendBulkStrArr(*res); } // lrem key 5 foo, will remove foo elements from the list if exists at most 5 times. -void ListFamily::LRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LRem(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); std::string_view index_str = ArgS(args, 1); std::string_view elem = ArgS(args, 2); int32_t count; if (!absl::SimpleAtoi(index_str, &count)) { - builder->SendError(kInvalidIntErr); + cmd_cntx.rb->SendError(kInvalidIntErr); return; } auto cb = [&](Transaction* t, EngineShard* shard) { return OpRem(t->GetOpArgs(shard), key, elem, count); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result || result == OpStatus::KEY_NOTFOUND) { - return builder->SendLong(result.value_or(0)); + return cmd_cntx.rb->SendLong(result.value_or(0)); } - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); } -void ListFamily::LSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LSet(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = ArgS(args, 0); std::string_view index_str = ArgS(args, 1); std::string_view elem = ArgS(args, 2); int32_t count; if (!absl::SimpleAtoi(index_str, &count)) { - builder->SendError(kInvalidIntErr); + cmd_cntx.rb->SendError(kInvalidIntErr); return; } auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, elem, count); }; - OpResult result = tx->ScheduleSingleHop(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); if (result) { - builder->SendOk(); + cmd_cntx.rb->SendOk(); } else { - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); } } -void ListFamily::BLPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - BPopGeneric(ListDir::LEFT, std::move(args), tx, builder, cntx); +void ListFamily::BLPop(CmdArgList args, const CommandContext& cmd_cntx) { + BPopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } -void ListFamily::BRPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - BPopGeneric(ListDir::RIGHT, std::move(args), tx, builder, cntx); +void ListFamily::BRPop(CmdArgList args, const CommandContext& cmd_cntx) { + BPopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } -void ListFamily::LMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ListFamily::LMove(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; auto [src, dest] = parser.Next(); ListDir src_dir = ParseDir(&parser); ListDir dest_dir = ParseDir(&parser); if (auto err = parser.Error(); err) - return builder->SendError(err->MakeReply()); + return cmd_cntx.rb->SendError(err->MakeReply()); - MoveGeneric(src, dest, src_dir, dest_dir, tx, builder); + MoveGeneric(src, dest, src_dir, dest_dir, cmd_cntx.tx, cmd_cntx.rb); } using CI = CommandId; diff --git a/src/server/list_family.h b/src/server/list_family.h index f282390fcf62..73518a438816 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -15,9 +15,8 @@ namespace dfly { using facade::OpResult; -class ConnectionContext; class CommandRegistry; -class Transaction; +struct CommandContext; class ListFamily { public: @@ -26,25 +25,23 @@ class ListFamily { private: using SinkReplyBuilder = facade::SinkReplyBuilder; - static void LPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void RPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void RPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void RPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void BLPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void BRPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void LLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void LMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void LPush(CmdArgList args, const CommandContext& cmd_cntx); + static void LPushX(CmdArgList args, const CommandContext& cmd_cntx); + static void RPush(CmdArgList args, const CommandContext& cmd_cntx); + static void RPushX(CmdArgList args, const CommandContext& cmd_cntx); + static void LPop(CmdArgList args, const CommandContext& cmd_cntx); + static void RPop(CmdArgList args, const CommandContext& cmd_cntx); + static void BLPop(CmdArgList args, const CommandContext& cmd_cntx); + static void BRPop(CmdArgList args, const CommandContext& cmd_cntx); + static void LLen(CmdArgList args, const CommandContext& cmd_cntx); + static void LPos(CmdArgList args, const CommandContext& cmd_cntx); + static void LIndex(CmdArgList args, const CommandContext& cmd_cntx); + static void LInsert(CmdArgList args, const CommandContext& cmd_cntx); + static void LTrim(CmdArgList args, const CommandContext& cmd_cntx); + static void LRange(CmdArgList args, const CommandContext& cmd_cntx); + static void LRem(CmdArgList args, const CommandContext& cmd_cntx); + static void LSet(CmdArgList args, const CommandContext& cmd_cntx); + static void LMove(CmdArgList args, const CommandContext& cmd_cntx); }; } // namespace dfly