Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fixSchemaPrefix' into fixSchemaP…
Browse files Browse the repository at this point in the history
…refix
  • Loading branch information
RemHero committed Jan 19, 2023
2 parents 1092f6b + b39d4a9 commit bd62227
Show file tree
Hide file tree
Showing 31 changed files with 785 additions and 297 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/api-session.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: "API-Test-SESSION-V2"

on:
push:
branches:
- main
pull_request:
branches:
- main
env:
VERSION: 0.6.0-SNAPSHOT

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
SessionV2-Test:
strategy:
fail-fast: false
#max-parallel: 20
matrix:
java: [ 8 ]
python-version: [ "3.7" ]
os: [ ubuntu-latest, macos-latest ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Environmet Dependence
uses: ./.github/actions/dependence
with:
python-version: ${{ matrix.python-version }}
java: ${{ matrix.java }}

- name: Run ZooKeeper
uses: ./.github/actions/zookeeperRunner

- name: Run IoTDB
uses: ./.github/actions/iotdbRunner
with:
if-CapExp: "false"
version: iotdb11

- name: Install with Maven
run: mvn clean package -DskipTests

- name: Start IginX
uses: ./.github/actions/iginxRunner
with:
version: ${VERSION}

- name: A Lame Integration Test with Maven for IoTDB
run: mvn test -q -Dtest=SessionV2IT -DfailIfNoTests=false
- uses: codecov/codecov-action@v1
with:
file: ./**/target/site/jacoco/jacoco.xml
name: codecov
6 changes: 3 additions & 3 deletions .github/workflows/capacity-expansion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ jobs:
run: |
mvn test -q -Dtest=InfluxDBHistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false
DataPrefixWithMultiSchemaPrefix-Test-IotDB:
DataPrefixWithMultiSchemaPrefix_AND_RemoveHistoryDataSource-Test-IotDB:
timeout-minutes: 20
strategy:
fail-fast: false
Expand Down Expand Up @@ -289,7 +289,7 @@ jobs:
- name: data prefix IT
run: |
if [ "${{matrix.DB-name}}" == "iotdb11" ]; then
mvn test -q -Dtest=IoTDB11HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false
mvn test -q -Dtest=IoTDB11HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataSource -DfailIfNoTests=false
elif [ "${{matrix.DB-name}}" == "iotdb12" ]; then
mvn test -q -Dtest=IoTDB12HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix -DfailIfNoTests=false
mvn test -q -Dtest=IoTDB12HistoryDataCapacityExpansionIT#testAddSameDataPrefixWithDiffSchemaPrefix_AND_testRemoveHistoryDataSource -DfailIfNoTests=false
fi
54 changes: 53 additions & 1 deletion core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl;
import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.exceptions.StatusCode;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.*;
Expand Down Expand Up @@ -183,6 +184,57 @@ public QueryDataResp queryData(QueryDataReq req) {
return ctx.getResult().getQueryDataResp();
}

@Override
public Status removeHistoryDataSource(RemoveHistoryDataSourceReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
return RpcUtils.ACCESS_DENY;
}
Status status = RpcUtils.SUCCESS;
long dummyStorageId = req.getDummyStorageId();
StorageEngineMeta meta = metaManager.getStorageEngine(dummyStorageId);
if (meta == null || meta.getDummyFragment() == null || meta.getDummyStorageUnit() == null) {
status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("storage engine is not exists.");
return status;
}
try {
// 设置对应的 dummyFragament 为 invalid 状态
meta.getDummyFragment().setIfValid(false);
meta.getDummyStorageUnit().setIfValid(false);

// 修改需要更新的元数据信息 extraParams中的 has_data属性需要修改
StorageEngineMeta newMeta = new StorageEngineMeta(
meta.getId(),
meta.getIp(),
meta.getPort(),
false,
null,
null,
meta.isReadOnly(),
null,
null,
meta.getExtraParams(),
meta.getStorageEngine(),
meta.getStorageUnitList(),
meta.getCreatedBy(),
meta.isNeedReAllocate()
);

// 更新 zk 上元数据信息,以及 iginx 上元数据信息
if (!metaManager.updateStorageEngine(dummyStorageId, newMeta)) {
status = RpcUtils.FAILURE;
status.setMessage("unexpected error during storage update");
}

return status;
} catch (Exception e) {
logger.error("unexpected error during storage migration: ", e);
status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during removing history data source: " + e.getMessage());
return status;
}
}

@Override
public Status addStorageEngines(AddStorageEnginesReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
Expand Down Expand Up @@ -245,7 +297,7 @@ public Status addStorageEngines(AddStorageEnginesReq req) {
int index = 0;
if (meta.isHasData()) {
String dataPrefix = meta.getDataPrefix();
StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(Constants.DUMMY + String.format("%04d", 0), -1);
StorageUnitMeta dummyStorageUnit = new StorageUnitMeta(StorageUnitMeta.generateDummyStorageUnitID(0), -1);
Pair<TimeSeriesRange, TimeInterval> boundary = StorageManager.getBoundaryOfStorage(meta, dataPrefix);
FragmentMeta dummyFragment;
String schemaPrefixTmp = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ private Operator mergeRawData(Map<TimeInterval, List<FragmentMeta>> fragments, L
if (!dummyFragments.isEmpty()) {
List<Operator> joinList = new ArrayList<>();
dummyFragments.forEach(meta -> {
String schemaPrefix = meta.getTsInterval().getSchemaPrefix();
joinList.add(new AddSchemaPrefix(new OperatorSource(new Project(new FragmentSource(meta),
if (meta.isValid()) {
String schemaPrefix = meta.getTsInterval().getSchemaPrefix();
joinList.add(new AddSchemaPrefix(new OperatorSource(new Project(new FragmentSource(meta),
pathMatchPrefix(pathList, meta.getTsInterval().getTimeSeries(), schemaPrefix), tagFilter)), schemaPrefix));
}
});
joinList.add(operator);
operator = OperatorUtils.joinOperatorsByTime(joinList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void filterFragmentByTimeRange(Select selectOperator) {
if (!dummyFragments.isEmpty()) {
List<Operator> joinList = new ArrayList<>();
dummyFragments.forEach(meta -> {
if (hasTimeRangeOverlap(meta, timeRanges)) {
if (meta.isValid() && hasTimeRangeOverlap(meta, timeRanges)) {
joinList.add(new Project(new FragmentSource(meta), pathList, selectOperator.getTagFilter()));
}
});
Expand Down
Loading

0 comments on commit bd62227

Please sign in to comment.