Skip to content

Commit

Permalink
Merge branch 'main' into wip_replace_license_notice_in_jar
Browse files Browse the repository at this point in the history
  • Loading branch information
weiting-chen authored Aug 19, 2024
2 parents f1efa11 + 45d629e commit 227c928
Show file tree
Hide file tree
Showing 50 changed files with 1,288 additions and 597 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/dev_cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ jobs:
process:
name: Process
runs-on: ubuntu-latest
permissions: write-all
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Comment Issues link
if: |
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'edited')
uses: actions/github-script@v3
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
Expand All @@ -47,7 +48,7 @@ jobs:
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'edited')
uses: actions/github-script@v3
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dev_cron/issues_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function haveComment(github, context, pullRequestNumber, body) {
page: 1
};
while (true) {
const response = await github.issues.listComments(options);
const response = await github.rest.issues.listComments(options);
if (response.data.some(comment => comment.body === body)) {
return true;
}
Expand All @@ -52,7 +52,7 @@ async function commentISSUESURL(github, context, pullRequestNumber, issuesID) {
if (await haveComment(github, context, pullRequestNumber, issuesURL)) {
return;
}
await github.issues.createComment({
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pullRequestNumber,
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dev_cron/title_check.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function haveISSUESID(title) {
}

async function commentOpenISSUESIssue(github, context, pullRequestNumber) {
const {data: comments} = await github.issues.listComments({
const {data: comments} = await github.rest.issues.listComments({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pullRequestNumber,
Expand All @@ -36,7 +36,7 @@ async function commentOpenISSUESIssue(github, context, pullRequestNumber) {
}
const commentPath = ".github/workflows/dev_cron/title_check.md";
const comment = fs.readFileSync(commentPath).toString();
await github.issues.createComment({
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pullRequestNumber,
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/docker_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
name: Build and Push Docker Image

on:
pull_request:
push:
branches:
- main
paths:
- '.github/workflows/docker_image.yml'
schedule:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import java.util.Set;

public class CHNativeCacheManager {
public static void cacheParts(String table, Set<String> columns, boolean async) {
nativeCacheParts(table, String.join(",", columns), async);
public static String cacheParts(String table, Set<String> columns) {
return nativeCacheParts(table, String.join(",", columns));
}

private static native void nativeCacheParts(String table, String columns, boolean async);
private static native String nativeCacheParts(String table, String columns);

public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
}

private static native CacheResult nativeGetCacheStatus(String jobId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution;

public class CacheResult {
public enum Status {
RUNNING(0),
SUCCESS(1),
ERROR(2);

private final int value;

Status(int value) {
this.value = value;
}

public int getValue() {
return value;
}

public static Status fromInt(int value) {
for (Status myEnum : Status.values()) {
if (myEnum.getValue() == value) {
return myEnum;
}
}
throw new IllegalArgumentException("No enum constant for value: " + value);
}
}

private final Status status;
private final String message;

public CacheResult(int status, String message) {
this.status = Status.fromInt(status);
this.message = message;
}

public Status getStatus() {
return status;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
hashIds.forEach(
resource_id => CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
}
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
CHNativeCacheManager.cacheParts(mergeTreeTable, columns, true)

case e =>
logError(s"Received unexpected message. $e")
Expand All @@ -74,12 +72,16 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
try {
CHNativeCacheManager.cacheParts(mergeTreeTable, columns, false)
context.reply(CacheLoadResult(true))
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case _: Exception =>
context.reply(CacheLoadResult(false, s"executor: $executorId cache data failed."))
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed."))
}
case GlutenMergeTreeCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
context.reply(status)
case e =>
logError(s"Received unexpected message. $e")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ object GlutenRpcMessages {
case class GlutenCleanExecutionResource(executionId: String, broadcastHashIds: util.Set[String])
extends GlutenRpcMessage

// for mergetree cache
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
extends GlutenRpcMessage

case class CacheLoadResult(success: Boolean, reason: String = "") extends GlutenRpcMessage
case class GlutenMergeTreeCacheLoadStatus(jobId: String)

case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "")
extends GlutenRpcMessage
}
Loading

0 comments on commit 227c928

Please sign in to comment.