Skip to content

Commit

Permalink
upgrade iotdb to 0.7.0 and spark to 2.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Jun 16, 2018
1 parent 44ea1f2 commit 3be3914
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 419 deletions.
424 changes: 311 additions & 113 deletions .idea/workspace.xml

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions delta.spark.iml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
<orderEntry type="library" name="Maven: org.json:json:20170516" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: cn.edu.tsinghua:tsfile:0.5.0" level="project" />
<orderEntry type="library" name="Maven: cn.edu.tsinghua:tsfile:0.7.0" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.1.3" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.3" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.thrift:libthrift:0.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.0.5-M1" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" name="Maven: cn.edu.tsinghua:iotdb-jdbc:0.5.0" level="project" />
<orderEntry type="library" name="Maven: cn.edu.tsinghua:iotdb-jdbc:0.7.0" level="project" />
<orderEntry type="library" name="Maven: joda-time:joda-time:2.9.9" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-client:2.7.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-common:2.7.3" level="project" />
Expand Down
9 changes: 5 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

<groupId>cn.edu.tsinghua</groupId>
<artifactId>spark-iotdb-connector</artifactId>
<version>0.5.0</version>
<version>0.7.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.3</hadoop.version>
<compile.version>1.8</compile.version>
<spark.version>2.0.0</spark.version>
<spark.version>2.1.0</spark.version>
<junit.version>4.12</junit.version>
<iotdb.version>0.7.0</iotdb.version>
</properties>

<dependencies>
Expand All @@ -30,12 +31,12 @@
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>tsfile</artifactId>
<version>0.5.0</version>
<version>${iotdb.version}</version>
</dependency>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>0.5.0</version>
<version>${iotdb.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/cn/edu/tsinghua/iotdb/SQLConstant.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package cn.edu.tsinghua.iotdb;

/**
* this class contains several constants used in SQL.
*
*/
public class SQLConstant {

public static final String NEED_NOT_TO_PRINT_TIMESTAMP = "AGGREGATION";
public static final String RESERVED_TIME = "time";
public static final String TIMESTAMP_STR = "Time"; // new added
public static final String NULL_STR = "null"; // new added

}
136 changes: 0 additions & 136 deletions src/main/java/cn/edu/tsinghua/tsfile/qp/common/SQLConstant.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cn.edu.tsinghua.tsfile
package cn.edu.tsinghua.iotdb

import java.sql._
import cn.edu.tsinghua.tsfile.qp.common.SQLConstant

import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
import java.sql.Statement
Expand Down Expand Up @@ -32,7 +32,7 @@ object Converter {
r
}

def toSparkSchema(options: TSFileOptions): StructType = {
def toSparkSchema(options: IoTDBOptions): StructType = {

Class.forName("cn.edu.tsinghua.iotdb.jdbc.TsfileDriver")
val sqlConn: Connection = DriverManager.getConnection(options.url, options.user, options.password)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cn.edu.tsinghua.tsfile
package cn.edu.tsinghua.iotdb

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
import org.slf4j.LoggerFactory

private[tsfile] class DefaultSource extends RelationProvider with DataSourceRegister {
private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegister {
private final val logger = LoggerFactory.getLogger(classOf[DefaultSource])

override def shortName(): String = "tsfile"
Expand All @@ -13,12 +13,12 @@ private[tsfile] class DefaultSource extends RelationProvider with DataSourceRegi
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {

val tsfileOptions = new TSFileOptions(parameters)
val iotdbOptions = new IoTDBOptions(parameters)

if (tsfileOptions.url == null || tsfileOptions.sql == null) {
if (iotdbOptions.url == null || iotdbOptions.sql == null) {
sys.error("TSFile node or sql not specified")
}
new TSFileRelation(tsfileOptions)(sqlContext.sparkSession)
new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)

}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package cn.edu.tsinghua.tsfile
package cn.edu.tsinghua.iotdb

/**
* Created by qjl on 16-11-4.
*/
class TSFileOptions (
class IoTDBOptions(
@transient private val parameters: Map[String, String])
extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cn.edu.tsinghua.tsfile
package cn.edu.tsinghua.iotdb

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
Expand All @@ -9,16 +9,12 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types._


/**
* Created by qjl on 16-11-3.
*/

//TSFile partition
case class TSFilePartition(where: String, id: Int, start: java.lang.Long, end:java.lang.Long) extends Partition {
//IoTDB data partition
case class IoTDBPartition(where: String, id: Int, start: java.lang.Long, end:java.lang.Long) extends Partition {
override def index: Int = id
}

object TSFileRDD {
object IoTDBRDD {

private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
val fieldMap = Map(schema.fields.map(x => x.name -> x): _*)
Expand All @@ -27,13 +23,13 @@ object TSFileRDD {

}

class TSFileRDD private[tsfile](
sc: SparkContext,
options: TSFileOptions,
schema : StructType,
requiredColumns: Array[String],
filters: Array[Filter],
partitions: Array[Partition])
class IoTDBRDD private[iotdb](
sc: SparkContext,
options: IoTDBOptions,
schema : StructType,
requiredColumns: Array[String],
filters: Array[Filter],
partitions: Array[Partition])
extends RDD[Row](sc, Nil) {

override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
Expand All @@ -42,9 +38,8 @@ class TSFileRDD private[tsfile](
var nextValue: Row = null
val inputMetrics = context.taskMetrics().inputMetrics

val part = split.asInstanceOf[TSFilePartition]
val part = split.asInstanceOf[IoTDBPartition]

println(">>>>>>>@@@@@@@@@@@@")
var taskInfo: String = _
Option(TaskContext.get()).foreach { taskContext => {
taskContext.addTaskCompletionListener { _ => conn.close()}
Expand All @@ -59,7 +54,7 @@ class TSFileRDD private[tsfile](

var rs : ResultSet = stmt.executeQuery(options.sql)
println(options.sql)
val prunedSchema = TSFileRDD.pruneSchema(schema, requiredColumns)
val prunedSchema = IoTDBRDD.pruneSchema(schema, requiredColumns)
private val rowBuffer = Array.fill[Any](prunedSchema.length)(null)

def getNext(): Row = {
Expand Down
Loading

0 comments on commit 3be3914

Please sign in to comment.