Skip to content

Commit

Permalink
Merge pull request #1 from qiaojialin/master
Browse files Browse the repository at this point in the history
merge jialin's latest version into this branch
  • Loading branch information
jixuan1989 authored Jul 5, 2017
2 parents d8a6cd6 + d75865c commit 97bc793
Show file tree
Hide file tree
Showing 43 changed files with 669 additions and 410 deletions.
71 changes: 24 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,16 @@
# tsfile-spark-connector

将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。


## 示例

src/test/scala/cn.edu.thu.tsfile.spark.TSFileSuit


## 路径指定方式


basefolder/key=1/file1.tsfile

basefolder/key=2/file2.tsfile
指定basefolder为path,会在表中多加一列key,值为1或2。

如:
path=basefolder


如果使用通配符指定,将不会当做partiton

如:
path=basefolder/\*/\*.tsfile
Used to read and write(developing) tsfile in spark.

将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。

basefolder/file1.tsfile
basefolder/file2.tsfile

指定basefolder会将多个tsfile的schema合并,保留sensor的并集
## dependency

如:
path=basefolder
https://github.com/thulab/tsfile.git


## 版本需求
## versions

The versions required for Spark and Java are as follow:

Expand All @@ -45,7 +20,7 @@ The versions required for Spark and Java are as follow:



## 数据类型转化
## TsFile Type <=> SparkSQL type

This library uses the following mapping the data type from TsFile to SparkSQL:

Expand Down Expand Up @@ -104,7 +79,7 @@ The SparkSQL Table Structure is as follow:

```scala
// import this library and Spark
import cn.edu.thu.tsfile.spark._
import cn.edu.thu.tsfile._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()
Expand All @@ -123,43 +98,45 @@ The SparkSQL Table Structure is as follow:
* **Example 2**

```scala
val spark = SparkSession.builder().master("local").getOrCreate()
import cn.edu.thu.tsfile._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read
.format("cn.edu.thu.tsfile.spark")
.load("test.ts")


df.filter("sensor_1 > 1.2").show()
.format("cn.edu.thu.tsfile")
.load("test.tsfile")
df.filter("time < 10").show()

```

* **Example 3**

```scala
import cn.edu.thu.tsfile._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

//create a table in SparkSQL and build relation with a TsFile
spark.sql("create temporary view TsFile using cn.edu.thu.tsfile.spark options(path = \"test.ts\")")
spark.sql("create temporary view TsFile using cn.edu.thu.tsfile options(path = \"test.ts\")")

spark.sql("select * from TsFile where sensor_1 > 1.2").show()

```

##### spark-shell

可以将项目打包在 `spark-shell`中使用。
package:

```
mvn package -DskipTests
包所在位置:
/tsfile-kmx-spark-connector/target/tsfile-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean scala:compile compile package
```


```
$ bin/spark-shell --jars tsfile-spark-0.1.0-jar-with-dependencies.jar
$ bin/spark-shell --jars tsfile-spark-connector-0.1.0.jar,tsfile-0.1.0.jar
scala> sql("CREATE TEMPORARY TABLE TsFile_table USING cn.edu.thu.tsfile.spark OPTIONS (path \"hdfs://localhost:9000/test.ts\")")
scala> sql("CREATE TEMPORARY TABLE TsFile_table USING cn.edu.thu.tsfile OPTIONS (path \"hdfs://localhost:9000/test1.tsfile\")")
scala> sql("select * from TsFile_table where sensor_1 > 1.2").show()
scala> sql("select * from TsFile_table").show()
```
Binary file added data/test1.tsfile
Binary file not shown.
151 changes: 96 additions & 55 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,61 +1,102 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.edu.thu</groupId>
<artifactId>tsfile</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<packaging>jar</packaging>
<groupId>cn.edu.thu</groupId>
<artifactId>tsfile-spark-connector</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>

<name>tsfile</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.3</hadoop.version>
<compile.version>1.8</compile.version>
<spark.version>2.0.1</spark.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.edu.thu</groupId>
<artifactId>tsfile</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.11.8</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${compile.version}</source>
<target>${compile.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.3</hadoop.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.edu.thu</groupId>
<artifactId>tsfile</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cn.edu.thu.tsfile.spark;
package cn.edu.thu.tsfile.io;

import cn.edu.thu.tsfile.common.conf.TSFileConfig;
import cn.edu.thu.tsfile.common.conf.TSFileDescriptor;
Expand Down Expand Up @@ -28,7 +28,7 @@ public void createTSFile1(String tsfilePath) throws Exception {
TSRandomAccessFileWriter output = new RandomAccessOutputStream(new File(tsfilePath));
TsFile tsFile = new TsFile(output, jsonSchema);

tsFile.writeLine("root.car.d1,1, s1, 1, s2, 10, s3, 100.1, s4, 0.1");
tsFile.writeLine("root.car.d1,1, s1, 1, s2, 10, s3, 100.1");
tsFile.writeLine("root.car.d1,2, s1, 2, s2, 20, s3, 200.2, s4, 0.2");
tsFile.writeLine("root.car.d1,3, s1, 3, s2, 30, s3, 200.3, s4, 0.3");
tsFile.writeLine("root.car.d1,4, s1, 4, s2, 40, s3, 200.4, s4, 0.4");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package cn.edu.thu.tsfile.hadoop.io;

package cn.edu.thu.tsfile.io;

import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
Expand All @@ -18,8 +15,8 @@
*/
public class HDFSInputStream implements TSRandomAccessFileReader {

private FSDataInputStream fsDataInputStream;
private FileStatus fileStatus;
private final FSDataInputStream fsDataInputStream;
private final long length;

public HDFSInputStream(String filePath) throws IOException {

Expand All @@ -31,11 +28,9 @@ public HDFSInputStream(String filePath, Configuration configuration) throws IOEx
this(new Path(filePath),configuration);
}

public HDFSInputStream(Path path, Configuration configuration) throws IOException {

FileSystem fs = FileSystem.get(configuration);
fsDataInputStream = fs.open(path);
fileStatus = fs.getFileStatus(path);
public HDFSInputStream(Path path, Configuration conf) throws IOException {
length = path.getFileSystem(conf).getFileStatus(path).getLen();
fsDataInputStream = path.getFileSystem(conf).open(path);
}

public void seek(long offset) throws IOException {
Expand All @@ -50,7 +45,7 @@ public int read() throws IOException {

public long length() throws IOException {

return fileStatus.getLen();
return length;
}

public int readInt() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cn.edu.thu.tsfile.hadoop.io;
package cn.edu.thu.tsfile.io;

import cn.edu.thu.tsfile.common.utils.TSRandomAccessFileWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -20,8 +18,6 @@
*/
public class HDFSOutputStream implements TSRandomAccessFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(HDFSOutputStream.class);

private FSDataOutputStream fsDataOutputStream;

public HDFSOutputStream(String filePath, boolean overwriter) throws IOException {
Expand Down
Loading

0 comments on commit 97bc793

Please sign in to comment.