Skip to content

Commit

Permalink
local write pass
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Jun 26, 2017
1 parent 7284637 commit f8d2572
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 104 deletions.
42 changes: 6 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
# tsfile-spark-connector

Used to read tsfile in spark.
Used to read and write(developing) tsfile in spark.

将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。


## Example
## dependency

src/test/scala/cn.edu.thu.tsfile.spark.TSFileSuit
https://github.com/thulab/tsfile.git


## 路径指定方式


basefolder/key=1/file1.tsfile

basefolder/key=2/file2.tsfile
指定basefolder为path,会在表中多加一列key,值为1或2。

如:
path=basefolder


如果使用通配符指定,将不会当做partiton

如:
path=basefolder/\*/\*.tsfile


basefolder/file1.tsfile
basefolder/file2.tsfile

指定basefolder会将多个tsfile的schema合并,保留sensor的并集

如:
path=basefolder


## 版本需求
## versions

The versions required for Spark and Java are as follow:

Expand All @@ -47,7 +20,7 @@ The versions required for Spark and Java are as follow:



## 数据类型转化
## TsFile Type <=> SparkSQL type

This library uses the following mapping the data type from TsFile to SparkSQL:

Expand Down Expand Up @@ -153,15 +126,12 @@ The SparkSQL Table Structure is as follow:

##### spark-shell

可以将项目打包在 `spark-shell`中使用。
package:

```
mvn clean scala:compile compile package
```

包所在位置:
target/tsfile-spark-connector-0.1.0.jar


```
$ bin/spark-shell --jars tsfile-spark-connector-0.1.0.jar,tsfile-0.1.0.jar
Expand Down
Binary file added data/test1.tsfile
Binary file not shown.
6 changes: 1 addition & 5 deletions src/main/java/cn/edu/thu/tsfile/io/CreateTSFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ public void createTSFile1(String tsfilePath) throws Exception {
TSRandomAccessFileWriter output = new RandomAccessOutputStream(new File(tsfilePath));
TsFile tsFile = new TsFile(output, jsonSchema);

// for( int i = 1; i < 30000000; i++) {
// tsFile.writeLine("root.car.d1," + i + ", s1, " + i + ", s2, 10, s3, 100");
// }

tsFile.writeLine("root.car.d1,1, s1, 1, s2, 10, s3, 100.1, s4, 0.1");
tsFile.writeLine("root.car.d1,1, s1, 1, s2, 10, s3, 100.1");
tsFile.writeLine("root.car.d1,2, s1, 2, s2, 20, s3, 200.2, s4, 0.2");
tsFile.writeLine("root.car.d1,3, s1, 3, s2, 30, s3, 200.3, s4, 0.3");
tsFile.writeLine("root.car.d1,4, s1, 4, s2, 40, s3, 200.4, s4, 0.4");
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/cn/edu/thu/tsfile/io/HDFSOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -20,8 +18,6 @@
*/
public class HDFSOutputStream implements TSRandomAccessFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(HDFSOutputStream.class);

private FSDataOutputStream fsDataOutputStream;

public HDFSOutputStream(String filePath, boolean overwriter) throws IOException {
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/cn/edu/thu/tsfile/io/TsFileOutputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cn.edu.thu.tsfile.io;

import cn.edu.thu.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfile.timeseries.write.schema.FileSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class TsFileOutputFormat extends FileOutputFormat<NullWritable, TSRecord> {

private FileSchema fileSchema;

public TsFileOutputFormat(FileSchema fileSchema) {
this.fileSchema = fileSchema;
}

@Override
public RecordWriter<NullWritable, TSRecord> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Path path = getDefaultWorkFile(job, "");
try {
return new TsFileRecordWriter(job, path, fileSchema);
} catch (WriteProcessException e) {
e.printStackTrace();
throw new InterruptedException("construct TsFileRecordWriter failed");
}
}

}
37 changes: 37 additions & 0 deletions src/main/java/cn/edu/thu/tsfile/io/TsFileRecordWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cn.edu.thu.tsfile.io;

import cn.edu.thu.tsfile.timeseries.FileFormat.TsFile;
import cn.edu.thu.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.thu.tsfile.timeseries.write.record.TSRecord;
import cn.edu.thu.tsfile.timeseries.write.schema.FileSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class TsFileRecordWriter extends RecordWriter<NullWritable, TSRecord> {

private TsFile tsFile = null;

public TsFileRecordWriter(TaskAttemptContext job, Path file, FileSchema fileSchema) throws IOException, WriteProcessException {
HDFSOutputStream hdfsOutputStream = new HDFSOutputStream(file.toString(), job.getConfiguration(), false);
tsFile = new TsFile(hdfsOutputStream, fileSchema);
}

@Override
public void close(TaskAttemptContext context) throws IOException {
tsFile.close();
}

@Override
public synchronized void write(NullWritable arg0, TSRecord tsRecord) throws IOException {
try {
tsFile.writeLine(tsRecord);
} catch (WriteProcessException e) {
e.printStackTrace();
}
}

}
23 changes: 6 additions & 17 deletions src/main/java/cn/edu/thu/tsfile/qp/common/SQLConstant.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package cn.edu.thu.tsfile.qp.common;

import cn.edu.thu.tsfile.timeseries.read.qp.Path;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -12,21 +10,15 @@
*
*/
public class SQLConstant {
public static final String MREGE_EXTENSION = "merge";
public static final String ERR_EXTENSION = "err";
public static final String PATH_SEPARATOR = ".";
public static final String PATH_SEPARATER_NO_REGEX = "\\.";

public static final String DEFAULT_DELTA_OBJECT_TYPE = "defalut_delta_object_type";

public static final String RESERVED_TIME = "time";
public static final String RESERVED_FREQ = "freq";
public static final String RESERVED_DELTA_OBJECT = "delta_object";
public static final String IS_AGGREGATION = "IS_AGGREGATION";

public static final String lineFeedSignal = "\n";
public static final String ROOT = "root";
public static final String METADATA_PARAM_EQUAL = "=";
public static final String INT32 = "INT32";
public static final String INT64 = "INT64";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DEFAULT_ENCODING = "RLE";

public static final int KW_AND = 1;
public static final int KW_OR = 2;
Expand All @@ -47,9 +39,6 @@ public class SQLConstant {
public static final int TOK_DELETE = 25;
public static final int TOK_UPDATE = 26;
public static final int TOK_QUERY = 27;
// public static final int TOK_VIRTUAL_TABLE = 30;
// public static final int TOK_TABNAME = 31;
// public static final int TOK_TABREF = 32;

public static final int TOK_AUTHOR_CREATE = 41;
public static final int TOK_AUTHOR_DROP = 42;
Expand Down Expand Up @@ -134,7 +123,7 @@ public class SQLConstant {
reverseWords.put(GREATERTHAN, LESSTHANOREQUALTO);
}

public static boolean isReservedPath(Path pathStr) {
public static boolean isReservedPath(String pathStr) {
return pathStr.equals(SQLConstant.RESERVED_TIME)
|| pathStr.equals(SQLConstant.RESERVED_FREQ)
|| pathStr.equals(SQLConstant.RESERVED_DELTA_OBJECT);
Expand Down
Loading

0 comments on commit f8d2572

Please sign in to comment.