Skip to content

Commit

Permalink
Support alter sequence (#4180)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Apr 28, 2024
1 parent 4c31460 commit 4cff3b4
Show file tree
Hide file tree
Showing 14 changed files with 310 additions and 24 deletions.
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterSequence: {
const auto& modifyScheme = schemeOp.GetAlterSequence();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,14 @@ class TKikimrIcGateway : public IKqpGateway {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> AlterSequence(const TString& cluster,
const NYql::TAlterSequenceSettings& settings, bool missingOk) override {
Y_UNUSED(cluster);
Y_UNUSED(settings);
Y_UNUSED(missingOk);
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
try {
if (!CheckCluster(cluster)) {
Expand Down
77 changes: 77 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,13 @@ class TKqpGatewayProxy : public IKikimrGateway {
const NYql::TDropSequenceSettings& settings, bool missingOk) override {
CHECK_PREPARED_DDL(DropSequence);

if (!SessionCtx->Config().EnableSequences) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue("Sequences are not supported yet."));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::UNSUPPORTED));
return MakeFuture(std::move(errResult));
}

try {
if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
Expand Down Expand Up @@ -1507,6 +1514,76 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
}

TFuture<TGenericResult> AlterSequence(const TString& cluster,
const TAlterSequenceSettings& settings, bool missingOk) override {
CHECK_PREPARED_DDL(AlterSequence);

if (!SessionCtx->Config().EnableSequences) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue("Sequences are not supported yet."));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::UNSUPPORTED));
return MakeFuture(std::move(errResult));
}

try {

if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

std::pair<TString, TString> pathPair;
{
TString error;
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterSequence);
schemeTx.SetSuccessOnNotExist(missingOk);

NKikimrSchemeOp::TSequenceDescription* seqDesc = schemeTx.MutableSequence();
seqDesc->SetName(pathPair.second);

if (settings.SequenceSettings.MinValue) {
seqDesc->SetMinValue(*settings.SequenceSettings.MinValue);
}
if (settings.SequenceSettings.MaxValue) {
seqDesc->SetMaxValue(*settings.SequenceSettings.MaxValue);
}
if (settings.SequenceSettings.Increment) {
seqDesc->SetIncrement(*settings.SequenceSettings.Increment);
}
if (settings.SequenceSettings.StartValue) {
seqDesc->SetStartValue(*settings.SequenceSettings.StartValue);
}
if (settings.SequenceSettings.Cache) {
seqDesc->SetCache(*settings.SequenceSettings.Cache);
}
if (settings.SequenceSettings.Cycle) {
seqDesc->SetCycle(*settings.SequenceSettings.Cycle);
}

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableAlterSequence()->Swap(&schemeTx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(schemeTx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> CreateTableStore(const TString& cluster,
const TCreateTableStoreSettings& settings, bool existingOk) override
{
Expand Down
42 changes: 39 additions & 3 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Ok;
}

TStatus HandleAlterSequence(NNodes::TKiAlterSequence node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "ModifyPermissions is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -515,7 +521,8 @@ class TKikimrDataSink : public TDataProviderBase
}

if (node.IsCallable(TKiCreateSequence::CallableName())
|| node.IsCallable(TKiDropSequence::CallableName())) {
|| node.IsCallable(TKiDropSequence::CallableName())
|| node.IsCallable(TKiAlterSequence::CallableName())) {
return true;
}

Expand Down Expand Up @@ -707,15 +714,14 @@ class TKikimrDataSink : public TDataProviderBase
const NCommon::TWriteSequenceSettings& settings, const TKikimrKey& key, TExprContext& ctx)
{
YQL_ENSURE(settings.Mode);
auto mode = settings.Mode.Cast();
if (node->Child(3)->Content() != "Void") {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Creating sequence with data is not supported."));
return nullptr;
}

auto valueType = settings.ValueType.IsValid()
? settings.ValueType.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("bigint").Done();
: Build<TCoAtom>(ctx, node->Pos()).Value("int8").Done();

auto temporary = settings.Temporary.IsValid()
? settings.Temporary.Cast()
Expand Down Expand Up @@ -755,6 +761,30 @@ class TKikimrDataSink : public TDataProviderBase
.Ptr();
}

static TExprNode::TPtr MakeAlterSequence(const TExprNode::TPtr& node,
const NCommon::TWriteSequenceSettings& settings, const TKikimrKey& key, TExprContext& ctx)
{
YQL_ENSURE(settings.Mode);
bool missingOk = (settings.Mode.Cast().Value() == "alter_if_exists");

if (node->Child(3)->Content() != "Void") {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Alter sequence with data is not supported."));
return nullptr;
}

return Build<TKiAlterSequence>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.Sequence().Build(key.GetPGObjectId())
.SequenceSettings(settings.SequenceSettings.Cast())
.Settings(settings.Other)
.MissingOk<TCoAtom>()
.Value(missingOk)
.Build()
.Done()
.Ptr();
}

bool RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx, TExprNode::TPtr& resultNode) {
TKiDataSink dataSink(node->ChildPtr(1));
auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
Expand Down Expand Up @@ -1305,6 +1335,8 @@ class TKikimrDataSink : public TDataProviderBase
return MakeCreateSequence(node, settings, key, ctx);
} else if (mode == "drop" || mode == "drop_if_exists") {
return MakeDropSequence(node, settings, key, ctx);
} else if (mode == "alter" || mode == "alter_if_exists") {
return MakeAlterSequence(node, settings, key, ctx);
} else {
YQL_ENSURE(false, "unknown Sequence mode \"" << TString(mode) << "\"");
}
Expand Down Expand Up @@ -1544,6 +1576,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleDropSequence(node.Cast(), ctx);
}

if (auto node = callable.Maybe<TKiAlterSequence>()) {
return HandleAlterSequence(node.Cast(), ctx);
}

ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
<< callable.CallableName()));
return TStatus::Error;
Expand Down
56 changes: 45 additions & 11 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,27 +285,33 @@ namespace {
};
}

TCreateSequenceSettings ParseCreateSequenceSettings(TKiCreateSequence createSequence) {
TCreateSequenceSettings createSequenceSettings;
createSequenceSettings.Name = TString(createSequence.Sequence());
createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false;
for (const auto& setting: createSequence.SequenceSettings()) {
TSequenceSettings ParseSequenceSettings(const TCoNameValueTupleList& sequenceSettings) {
TSequenceSettings result;
for (const auto& setting: sequenceSettings) {
auto name = setting.Name().Value();
auto value = TString(setting.Value().template Cast<TCoAtom>().Value());
if (name == "start") {
createSequenceSettings.SequenceSettings.StartValue = FromString<i64>(value);
result.StartValue = FromString<i64>(value);
} else if (name == "maxvalue") {
createSequenceSettings.SequenceSettings.MaxValue = FromString<i64>(value);
result.MaxValue = FromString<i64>(value);
} else if (name == "minvalue") {
createSequenceSettings.SequenceSettings.MinValue = FromString<i64>(value);
result.MinValue = FromString<i64>(value);
} else if (name == "cache") {
createSequenceSettings.SequenceSettings.Cache = FromString<ui64>(value);
result.Cache = FromString<ui64>(value);
} else if (name == "cycle") {
createSequenceSettings.SequenceSettings.Cycle = value == "1" ? true : false;
result.Cycle = value == "1" ? true : false;
} else if (name == "increment") {
createSequenceSettings.SequenceSettings.Increment = FromString<i64>(value);
result.Increment = FromString<i64>(value);
}
}
return result;
}

TCreateSequenceSettings ParseCreateSequenceSettings(TKiCreateSequence createSequence) {
TCreateSequenceSettings createSequenceSettings;
createSequenceSettings.Name = TString(createSequence.Sequence());
createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false;
createSequenceSettings.SequenceSettings = ParseSequenceSettings(createSequence.SequenceSettings());

return createSequenceSettings;
}
Expand All @@ -316,6 +322,14 @@ namespace {
};
}

TAlterSequenceSettings ParseAlterSequenceSettings(TKiAlterSequence alterSequence) {
TAlterSequenceSettings alterSequenceSettings;
alterSequenceSettings.Name = TString(alterSequence.Sequence());
alterSequenceSettings.SequenceSettings = ParseSequenceSettings(alterSequence.SequenceSettings());

return alterSequenceSettings;
}

[[nodiscard]] TString AddConsumerToTopicRequest(
Ydb::Topic::Consumer* protoConsumer, const TCoTopicConsumer& consumer
) {
Expand Down Expand Up @@ -1722,6 +1736,26 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
}, "Executing DROP SEQUENCE");
}

if (auto maybeAlterSequence = TMaybeNode<TKiAlterSequence>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
return SyncStatus(requireStatus);
}

auto cluster = TString(maybeAlterSequence.Cast().DataSink().Cluster());
TAlterSequenceSettings alterSequenceSettings = ParseAlterSequenceSettings(maybeAlterSequence.Cast());
bool missingOk = (maybeAlterSequence.MissingOk().Cast().Value() == "1");

auto future = Gateway->AlterSequence(cluster, alterSequenceSettings, missingOk);

return WrapFuture(future,
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
Y_UNUSED(res);
auto resultNode = ctx.NewWorld(input->Pos());
return resultNode;
}, "Executing CREATE SEQUENCE");
}

if (auto maybeAlter = TMaybeNode<TKiAlterTopic>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@
{"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
]
},
{
"Name": "TKiAlterSequence",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KiAlterSequence!"},
"Children": [
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
{"Index": 2, "Name": "Sequence", "Type": "TCoAtom"},
{"Index": 3, "Name": "SequenceSettings", "Type": "TCoNameValueTupleList"},
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 5, "Name": "MissingOk", "Type": "TCoAtom"}
]
},
{
"Name": "TKiCreateReplication",
"Base": "TCallable",
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ struct TDropSequenceSettings {
TString Name;
};

struct TAlterSequenceSettings {
TString Name;
TSequenceSettings SequenceSettings;
};

struct TAlterExternalTableSettings {
TString ExternalTable;
};
Expand Down Expand Up @@ -936,6 +941,8 @@ class IKikimrGateway : public TThrRefBase {
const TCreateSequenceSettings& settings, bool existingOk) = 0;
virtual NThreading::TFuture<TGenericResult> DropSequence(const TString& cluster,
const TDropSequenceSettings& settings, bool missingOk) = 0;
virtual NThreading::TFuture<TGenericResult> AlterSequence(const TString& cluster,
const TAlterSequenceSettings& settings, bool missingOk) = 0;

virtual NThreading::TFuture<TGenericResult> CreateColumnTable(
TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiReturningList::CallableName());
DataSinkNames.insert(TKiCreateSequence::CallableName());
DataSinkNames.insert(TKiDropSequence::CallableName());
DataSinkNames.insert(TKiAlterSequence::CallableName());

CommitModes.insert(CommitModeFlush);
CommitModes.insert(CommitModeRollback);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase {

virtual TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) = 0;
virtual TStatus HandleDropSequence(NNodes::TKiDropSequence node, TExprContext& ctx) = 0;
virtual TStatus HandleAlterSequence(NNodes::TKiAlterSequence node, TExprContext& ctx) = 0;

virtual TStatus HandleModifyPermissions(NNodes::TKiModifyPermissions node, TExprContext& ctx) = 0;

Expand Down
19 changes: 17 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return true;
}

static bool CheckCreateSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
static bool CheckSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
const static std::unordered_set<TString> sequenceSettingNames =
{"start", "increment", "cache", "minvalue", "maxvalue", "cycle"};
for (const auto& setting : settings) {
Expand Down Expand Up @@ -1509,7 +1509,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
}

virtual TStatus HandleCreateSequence(TKiCreateSequence node, TExprContext& ctx) override {
if(!CheckCreateSequenceSettings(node.SequenceSettings(), ctx)) {
if(!CheckSequenceSettings(node.SequenceSettings(), ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -1548,6 +1548,21 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}

virtual TStatus HandleAlterSequence(TKiAlterSequence node, TExprContext& ctx) override {
if(!CheckSequenceSettings(node.SequenceSettings(), ctx)) {
return TStatus::Error;
}

if (!node.Settings().Empty()) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "Unsupported sequence settings"));
return TStatus::Error;
}

node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
return TStatus::Ok;
}

virtual TStatus HandleAlterTopic(TKiAlterTopic node, TExprContext& ctx) override {
if (!CheckTopicSettings(node.Settings(), ctx)) {
return TStatus::Error;
Expand Down
Loading

0 comments on commit 4cff3b4

Please sign in to comment.