Skip to content

Commit

Permalink
Supported multi thread export
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jan 31, 2025
1 parent dcb89a5 commit 2507d44
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 49 deletions.
29 changes: 19 additions & 10 deletions ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,15 +573,15 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
const auto& key = MakePartitionKey(row);
const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) {
auto fileWrite = std::make_unique<TS3FileWriteActor>(
TxId,
Gateway,
Credentials,
key,
NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension),
Compression,
RetryPolicy, DirtyWrite, Token);
keyIt->second.emplace_back(fileWrite.get());
auto fileWrite = std::make_unique<TS3FileWriteActor>(
TxId,
Gateway,
Credentials,
key,
NS3Util::UrlEscapeRet(Url + Path + key + MakeOutputName() + Extension),
Compression,
RetryPolicy, DirtyWrite, Token);
keyIt->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}

Expand Down Expand Up @@ -619,6 +619,10 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode;
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode);
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
statusCode = NDqProto::StatusIds::INTERNAL_ERROR;
result->Get()->Issues.AddIssue("Got upload error with unspecified error code.");
}
}

Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);
Expand All @@ -640,10 +644,15 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) {
(*ft)->PassAway();
it->second.erase(ft);
if (it->second.empty())
if (it->second.empty()) {
FileWriteActors.erase(it);
}
}
}
if (!Finished && GetFreeSpace() > 0) {
LOG_D("TS3WriteActor", "Has free space, notify owner");
Callbacks->ResumeExecution();
}
FinishIfNeeded();
}

Expand Down
71 changes: 32 additions & 39 deletions ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
auto keys = GetPartitionKeys(partBy);

auto sinkSettingsBuilder = Build<TExprList>(ctx, target.Pos());
if (partBy)
if (partBy) {
sinkSettingsBuilder.Add(std::move(partBy));
}

auto compression = GetCompression(settings);
const auto& extension = GetExtension(target.Format().Value(), compression ? compression->Tail().Content() : ""sv);
if (compression)
if (compression) {
sinkSettingsBuilder.Add(std::move(compression));
}

auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, target.Pos());
if (auto csvDelimiter = GetCsvDelimiter(settings)) {
Expand Down Expand Up @@ -199,31 +201,17 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
}
}

if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
if (IsDqPureExpr(input)) {
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink.";
auto shouldBePassedAsInput = FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TDqStage::CallableName()); });

auto stageInputs = Build<TExprList>(ctx, writePos);
auto toFlow = Build<TCoToFlow>(ctx, writePos);
TVector<TCoArgument> args;

if (shouldBePassedAsInput) {
auto arg = Build<TCoArgument>(ctx, writePos).Name("in").Done();
stageInputs.Add(input);
args.push_back(arg);
toFlow.Input(arg);
}
else {
toFlow.Input(input);
}

return keys.empty() ?
Build<TDqStage>(ctx, writePos)
.Inputs(stageInputs.Done())
.Inputs().Build()
.Program<TCoLambda>()
.Args(args)
.Args({})
.Body<TS3SinkOutput>()
.Input(toFlow.Done())
.Input<TCoToFlow>()
.Input(input)
.Build()
.Format(target.Format())
.KeyColumns().Build()
.Settings(sinkOutputSettingsBuilder.Done())
Expand Down Expand Up @@ -251,10 +239,12 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
.Add<TDqCnHashShuffle>()
.Output<TDqOutput>()
.Stage<TDqStage>()
.Inputs(stageInputs.Done())
.Inputs().Build()
.Program<TCoLambda>()
.Args(args)
.Body(toFlow.Done())
.Args({})
.Body<TCoToFlow>()
.Input(input)
.Build()
.Build()
.Settings().Build()
.Build()
Expand Down Expand Up @@ -317,23 +307,26 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
.Build()
.Done();

auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos());
if (inputStage.Outputs() && keys.empty()) {
outputsBuilder.InitFrom(inputStage.Outputs().Cast());
}
outputsBuilder.Add(sink);
auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos())
.Add(sink);

if (keys.empty()) {
const auto outputBuilder = Build<TS3SinkOutput>(ctx, target.Pos())
.Input(inputStage.Program().Body().Ptr())
.Format(target.Format())
.KeyColumns().Add(std::move(keys)).Build()
.Settings(sinkOutputSettingsBuilder.Done())
.Done();

return Build<TDqStage>(ctx, writePos)
.InitFrom(inputStage)
.Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr()))
.Inputs()
.Add<TDqCnMap>()
.Output(dqUnion.Output())
.Build()
.Build()
.Program<TCoLambda>()
.Args({"in"})
.Body<TS3SinkOutput>()
.Input("in")
.Format(target.Format())
.KeyColumns().Add(std::move(keys)).Build()
.Settings(sinkOutputSettingsBuilder.Done())
.Build()
.Build()
.Settings().Build()
.Outputs(outputsBuilder.Done())
.Done();
} else {
Expand Down

0 comments on commit 2507d44

Please sign in to comment.