Skip to content

Commit

Permalink
Multi table bulk upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm committed Dec 3, 2024
1 parent 9f93101 commit 08f8eba
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 67 deletions.
86 changes: 70 additions & 16 deletions ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,34 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
return Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()));
}

const TString& GetTable() override {
return GetProtoRequest(Request.get())->table();
const TString &GetTable(ui32 idx) override {
auto *request = GetProtoRequest(Request.get());
if(request->has_multi_table()) {
return request->multi_table().tables(idx);
} else {
Y_ABORT_UNLESS(request->has_table());
Y_ABORT_UNLESS(idx == 0);
return request->table();
}
}

ui32 GetNumTables() override {
auto *request = GetProtoRequest(Request.get());
if(request->has_multi_table()) {
return request->multi_table().tables().size();
}
Y_ABORT_UNLESS(request->has_table());
return 1;
}

ui64 GetTableSize(ui64 idx) override {
auto *request = GetProtoRequest(Request.get());

if(request->has_multi_table()) {
return request->multi_table().numrows(idx);
}
Y_ABORT_UNLESS(request->has_table());
return Batch->num_rows();
}

const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override {
Expand All @@ -176,6 +202,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
}

bool CheckAccess(TString& errorMessage) override {
// TODO: !!!!
if (Request->GetSerializedToken().empty())
return true;

Expand All @@ -184,9 +211,9 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
auto resolveResult = GetResolveNameResult();
if (!resolveResult) {
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " table '" << GetProtoRequest(Request.get())->table()
<< "' has not been resolved yet";
// explanation << "Access denied for " << userToken.GetUserSID()
// << " table '" << GetProtoRequest(Request.get())->table() // TODO
// << "' has not been resolved yet";

errorMessage = explanation.Str();
return false;
Expand All @@ -197,9 +224,9 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
&& !entry.SecurityObject->CheckAccess(access, userToken))
{
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " with access " << NACLib::AccessRightsToString(access)
<< " to table '" << GetProtoRequest(Request.get())->table() << "'";
// explanation << "Access denied for " << userToken.GetUserSID()
// << " with access " << NACLib::AccessRightsToString(access)
// << " to table '" << GetProtoRequest(Request.get())->table() << "'"; // TODO

errorMessage = explanation.Str();
return false;
Expand Down Expand Up @@ -329,8 +356,34 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
return Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()));
}

const TString& GetTable() override {
return GetProtoRequest(Request.get())->table();
const TString &GetTable(ui32 idx) override {
auto *request = GetProtoRequest(Request.get());
if(request->has_multi_table()) {
return request->multi_table().tables(idx);
} else {
Y_ABORT_UNLESS(request->has_table());
Y_ABORT_UNLESS(idx == 0);
return request->table();
}
}

ui32 GetNumTables() override {
auto *request = GetProtoRequest(Request.get());
if(request->has_multi_table()) {
return request->multi_table().tables().size();
}
Y_ABORT_UNLESS(request->has_table());
return 1;
}

ui64 GetTableSize(ui64 idx) override {
auto *request = GetProtoRequest(Request.get());

if(request->has_multi_table()) {
return request->multi_table().numrows(idx);
}
Y_ABORT_UNLESS(request->has_table());
return Batch->num_rows();
}

const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override {
Expand Down Expand Up @@ -366,6 +419,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
}

bool CheckAccess(TString& errorMessage) override {
// TODO!!!!!!
if (Request->GetSerializedToken().empty())
return true;

Expand All @@ -374,9 +428,9 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
auto resolveResult = GetResolveNameResult();
if (!resolveResult) {
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " table '" << GetProtoRequest(Request.get())->table()
<< "' has not been resolved yet";
// explanation << "Access denied for " << userToken.GetUserSID()
// << " table '" << GetProtoRequest(Request.get())->table() //TODO
// << "' has not been resolved yet";

errorMessage = explanation.Str();
return false;
Expand All @@ -387,9 +441,9 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
&& !entry.SecurityObject->CheckAccess(access, userToken))
{
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " with access " << NACLib::AccessRightsToString(access)
<< " to table '" << GetProtoRequest(Request.get())->table() << "'";
// explanation << "Access denied for " << userToken.GetUserSID()
// << " with access " << NACLib::AccessRightsToString(access)
// << " to table '" << GetProtoRequest(Request.get())->table() << "'"; // TODO

errorMessage = explanation.Str();
return false;
Expand Down
35 changes: 16 additions & 19 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl>,
}

protected:
void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) {
void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema");
if (resp.ErrorCount > 0) {
// TODO: map to a correct error
return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query");
}

auto& entry = resp.ResultSet[0];

if (UserToken && entry.SecurityObject) {
const ui32 access = NACLib::UpdateRow;
Expand Down Expand Up @@ -234,20 +228,22 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {

public:
explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId, const TString& databaseName,
const TString& path, std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite)
const TString& path, const NSchemeCache::TSchemeCacheNavigate::TEntry &entry, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite, const ui32 cookie)
: TBase(databaseName, path, TString(), longTxId, dedupId, noTxWrite)
, ReplyTo(replyTo)
, NavigateResult(navigateResult)
, Entry(entry)
, Batch(batch)
, Issues(issues) {
, Issues(issues)
, Cookie(cookie) {
Y_ABORT_UNLESS(Issues);
DataAccessor = std::make_unique<TParsedBatchData>(Batch);
}

void Bootstrap() {
Y_ABORT_UNLESS(NavigateResult);
ProceedWithSchema(*NavigateResult);
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("TLongTxWriteInternal", "")("path", Path)("batch size", Batch->num_rows());
// Y_ABORT_UNLESS(NavigateResult);
ProceedWithSchema(Entry);
}

protected:
Expand All @@ -264,28 +260,29 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
if (!message.empty()) {
Issues->AddIssue(NYql::TIssue(message));
}
this->Send(ReplyTo, new TEvents::TEvCompleted(0, status));
this->Send(ReplyTo, new TEvents::TEvCompleted(0, status), 0, Cookie);
PassAway();
}

void ReplySuccess() override {
this->Send(ReplyTo, new TEvents::TEvCompleted(0, Ydb::StatusIds::SUCCESS));
this->Send(ReplyTo, new TEvents::TEvCompleted(0, Ydb::StatusIds::SUCCESS), 0, Cookie);
PassAway();
}

private:
const TActorId ReplyTo;
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> NavigateResult;
NSchemeCache::TSchemeCacheNavigate::TEntry Entry;
std::shared_ptr<arrow::RecordBatch> Batch;
std::shared_ptr<NYql::TIssues> Issues;
const ui32 Cookie;
};

TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId,
const TString& dedupId, const TString& databaseName, const TString& path,
std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> navigateResult, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite) {
const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<NYql::TIssues> issues, const bool noTxWrite, const ui32 cookie) {
return ctx.RegisterWithSameMailbox(
new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues, noTxWrite));
new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, entry, batch, issues, noTxWrite, cookie));
}

//
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/tx_proxy/upload_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ class TUploadRowsInternal : public TUploadRowsBase<NKikimrServices::TActivity::U
return TString();
}

const TString& GetTable() override {
const TString &GetTable(ui32 idx) override {
Y_ABORT_UNLESS(idx == 0);
return Table;
}

ui64 GetTableSize(ui64 idx) override {
Y_ABORT_UNLESS(idx == 0);
return Rows->size();
}

ui32 GetNumTables() override {
return 1;
}

const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override {
return *Rows;
}
Expand Down
Loading

0 comments on commit 08f8eba

Please sign in to comment.