Skip to content

Commit

Permalink
Support MCQA 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed Oct 24, 2024
1 parent 8a3914f commit c68d7d8
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants.ODPS_NAMESPACE_SCHEMA;
import static com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants.ODPS_RUNNING_CLUSTER;
import static com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants.ODPS_SQL_TIMEZONE;
import static com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants.ODPS_TASK_WLM_QUOTA;
import static com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants.PROJECT_PROTECTION;

import java.io.PrintStream;
Expand Down Expand Up @@ -121,13 +122,21 @@ public void run() throws OdpsException, ODPSConsoleException {
getContext().setUserSetSqlTimezone(true);
}

if (ENABLE_INTERACTIVE_MODE.equals(key)) {
if (ENABLE_INTERACTIVE_MODE.equalsIgnoreCase(key)) {
// change interactive mode temporarily
getContext().setInteractiveQuery(Boolean.parseBoolean(value));
getWriter().writeError("OK");
return;
}

if (ODPS_TASK_WLM_QUOTA.equalsIgnoreCase(key)) {
UseQuotaCommand useQuotaCommand =
new UseQuotaCommand(getCommandText(), getContext(), getContext().getQuotaRegionId(),
value);
useQuotaCommand.run();
return;
}

if (key.startsWith(ODPSConsoleConstants.FALLBACK_PREFIX)) {
switch (key) {
case ODPSConsoleConstants.FALLBACK_RESOURCE_NOT_ENOUGH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.aliyun.openservices.odps.console.ExecutionContext;
import com.aliyun.openservices.odps.console.ODPSConsoleException;
import com.aliyun.openservices.odps.console.utils.ODPSConsoleUtils;
import com.aliyun.openservices.odps.console.utils.SessionUtils;

public class UseQuotaCommand extends AbstractCommand {

Expand Down Expand Up @@ -47,7 +48,7 @@ public UseQuotaCommand(
}

@Override
protected void run() throws ODPSConsoleException, OdpsException {
public void run() throws ODPSConsoleException, OdpsException {

if (this.quotaName == null || this.quotaName.isEmpty()) {
throw new InvalidParameterException("Invalid parameter: Quota name is empty.");
Expand All @@ -73,10 +74,22 @@ protected void run() throws ODPSConsoleException, OdpsException {
throw new InvalidParameterException("Level 1 quota is not allowed to use. Please use a Level 2 quota.");
}
if (quota.getResourceSystemType() != null
&& quota.getResourceSystemType().equalsIgnoreCase("FUXI_ONLINE")) {
&& "FUXI_ONLINE".equalsIgnoreCase(quota.getResourceSystemType())) {
throw new InvalidParameterException("Online quota is not allowed to use manually. " +
"It can only be used automatically by entering interactive mode.");
}

// fuxi_vw means enable query by mcqa v2
if (quota.getResourceSystemType() != null
&& "FUXI_VW".equalsIgnoreCase(quota.getResourceSystemType())) {
getContext().setInteractiveQuery(true);
SessionUtils.resetSQLExecutor(null, null, getContext(), getCurrentOdps(), false,
quota.getNickname(), true, regionId);
} else {
// mcqa v2 no need to set hints
String value = String.format("%s@%s", quota.getNickname(), quota.getRegionId());
SetCommand.setMap.put("odps.task.wlm.quota", value);
}
} catch (NoSuchObjectException e) {
String errMsg = "Quota " + quotaName + " is not found in region " + regionId
+ ". It may be in another region or not exist at all.";
Expand All @@ -91,14 +104,12 @@ protected void run() throws ODPSConsoleException, OdpsException {
ee.setRequestId(e.getRequestId());
throw ee;
}

String value = String.format("%s@%s", quota.getNickname(), quota.getRegionId());
SetCommand.setMap.put("odps.task.wlm.quota", value);
getContext().setQuotaName(quota.getNickname());
getContext().setQuotaRegionId(quota.getRegionId());
}

public static UseQuotaCommand parse(List<String> optionList, ExecutionContext sessionContext) {
public static UseQuotaCommand parse(List<String> optionList, ExecutionContext sessionContext)
throws ODPSConsoleException, OdpsException {
String regionId = ODPSConsoleUtils.shiftOption(optionList, OPTION_REGION_ID);
String quotaName = ODPSConsoleUtils.shiftOption(optionList, OPTION_QUOTA_NAME);
if (!StringUtils.isNullOrEmpty(quotaName)) {
Expand All @@ -107,7 +118,8 @@ public static UseQuotaCommand parse(List<String> optionList, ExecutionContext se
return null;
}

public static UseQuotaCommand parse(String commandString, ExecutionContext sessionContext) {
public static UseQuotaCommand parse(String commandString, ExecutionContext sessionContext)
throws ODPSConsoleException, OdpsException {
Matcher matcher = PATTERN.matcher(commandString);
if (matcher.matches()) {
return new UseQuotaCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class ODPSConsoleConstants {
public static final String ODPS_RUNNING_CLUSTER = "odps.running.cluster";
public static final String CONSOLE_SQL_RESULT_INSTANCETUNNEL = "console.sql.result.instancetunnel";
public static final String TASK_MAJOR_VERSION = "odps.task.major.version";
public static final String ODPS_TASK_WLM_QUOTA = "odps.task.wlm.quota";

public static final String tablePattern = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ public State run(InstanceStateContext context) throws OdpsException {
throw e;
}

// 若是同步 instance, 则结束状态机
if (context.getInstance().isSync()) {
return State.END;
}

// 若进度打印没有完成,则强行刷新进度至 100% 状态
if (!context.isProgressReportFinish()) {
// 若进度打印没有完成,则强行刷新进度至 100% 状态。同步 instance 不打印 progress 信息
if (!context.getInstance().isSync() && !context.isProgressReportFinish()) {
InstanceProgressReporter reporter = new InstanceProgressReporter(context);
reporter.printProgress(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Quota;
import com.aliyun.odps.sqa.SQLExecutor;
import com.aliyun.odps.sqa.SQLExecutorBuilder;
import com.aliyun.odps.utils.StringUtils;
Expand All @@ -13,7 +12,6 @@
import com.aliyun.openservices.odps.console.constants.ODPSConsoleConstants;

import java.util.Map;
import java.util.TimeZone;

/**
* Created by dongxiao on 2019/12/23.
Expand Down Expand Up @@ -57,6 +55,10 @@ public static String recoverSQLExecutor(Instance instance, ExecutionContext cont
}

public static String resetSQLExecutor(String sessionName, Instance instance, ExecutionContext context, Odps odps, boolean autoReattach, String quotaName) throws OdpsException {
return resetSQLExecutor(sessionName, instance, context, odps, autoReattach, quotaName, false, null);
}

public static String resetSQLExecutor(String sessionName, Instance instance, ExecutionContext context, Odps odps, boolean autoReattach, String quotaName, boolean mcqaV2, String regionId) throws OdpsException {
SQLExecutorBuilder builder = new SQLExecutorBuilder();
builder.odps(odps)
.quotaName(quotaName)
Expand All @@ -69,9 +71,11 @@ public static String resetSQLExecutor(String sessionName, Instance instance, Exe
.enableReattach(autoReattach)
.taskName(ODPSConsoleConstants.SESSION_DEFAULT_TASK_NAME)
.attachTimeout(context.getAttachTimeout())
.recoverFrom(instance);
.recoverFrom(instance)
.enableMcqaV2(mcqaV2)
.regionId(regionId);
SQLExecutor executor = builder.build();
String currentId = executor.getInstance().getId();
String currentId = mcqaV2 ? null : executor.getInstance().getId();
resetSessionContext(executor, odps, context);
return currentId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ public void run() throws OdpsException, ODPSConsoleException {
Odps odps = getCurrentOdps();
ExecutionContext context = getContext();
if (id != null) {
Instance instance = odps.instances().get(id);
if (id.endsWith("_mcqa") && context.getQuotaName() == null) {
throw new ODPSConsoleException("Quota name is required for MCQA 2.0 instance, "
+ "'use quota xxx;' to set quota name.");
}
Instance instance =
odps.instances().get(odps.getDefaultProject(), id, context.getQuotaName(),
context.getQuotaRegionId());
InstanceRunner runner = new InstanceRunner(odps, instance, context);
runner.waitForCompletion();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ private void getQueryResult(long startTime) throws ODPSConsoleException {

waitLogviewGenerated();
finishReporter(reporterThread);
String postMessage = "Session sub-query" + " cost time: "
+ String.valueOf(System.currentTimeMillis() - startTime) + " ms.";
String postMessage = "Query cost time: "
+ (System.currentTimeMillis() - startTime) + " ms.";

// print summary in compatible output mode
if (this.getContext().isInteractiveOutputCompatible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ protected String getTaskName(boolean isDryRun) {
public void run() throws OdpsException, ODPSConsoleException {
ExecutionContext context = getContext();

if (context.isInteractiveQuery()) {
getWriter().writeError("Query will be executed in Interactive Mode.");
new InteractiveQueryCommand(getCommandText(), getContext()).run();
return;
}

if ("true".equalsIgnoreCase(SetCommand.setMap.getOrDefault(PMC_TASK_CONSOLE_KEY, "false"))) {
context.setPMCMode(true);
} else {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
</licenses>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<odps.sdk.version>0.48.6-public</odps.sdk.version>
<odps.sdk.version>0.50.3-public</odps.sdk.version>
<hadoop.version>3.3.3</hadoop.version>
</properties>

Expand Down

0 comments on commit c68d7d8

Please sign in to comment.