-
Notifications
You must be signed in to change notification settings - Fork 332
Comparison of esProc composite table, ORC, Parquet
Big data technology has given rise to some columnar storage formats, and suitable storage solutions are the foundation of high-performance computing. This article mainly compares the differences in data compression and reading between three open-source columnar storage files: esProc composite table, ORC, and Parquet from an application perspective.
Table inbound storage data structure:
No | Field name | Field type |
---|---|---|
1 | pid | long |
2 | store | string |
3 | product | int |
4 | indate | date |
5 | num | int |
Generate 200 million items of txt data for the inbound table, store them locally as esProc composite tables, ORC, and Parquet files, compare their sizes, and test their corresponding read performance. The 'indate' field refers to the storage method of the Hive table and is converted into numerical format for storage.
Generate txt data using the esProc SPL script, and then convert this txt data into other data format files for testing.
Generate unordered data from the inbound table and save it as esProc composite table, ORC, and Parquet files.
The SPL script for generating TXT data is as follows:
A | B | C | D | |
---|---|---|---|---|
1 | /uuid(Segmented data ) | /Segment Number | /Segment length | /Sequence number in the segment |
2 | [] | 1 | 0 | 0 |
3 | 300000,400000],[50000, 100000],[600000,700000],[100000,200000],[200000,300000],[1, 50000],[400000,500000], [900000,1000000],[500000,600000],[800000,900000],[700000,800000 | 0 | /B3: Starting position of segment | |
4 | for 220 | for A3 | =B4(1)+(A4-1)*1000000, B4(2)+(A4-1)*1000000 | |
5 | =A2=A2.insert(0, C4) | |||
6 | 1000 | 100 | 1500 | 10000 |
7 | 200000000 | 20 | 60 | 10 |
8 | 0 | =date(2015,10,1) | =date(2016,9,30) | 100 |
9 | =create(pid, store,product,indate,num) | |||
10 | =file("H:/tmp/inbound.txt") | |||
11 | func genOne (store,product) | ="store_id_"+mid(string(store+1000000),2) | ||
12 | =B7+rand(C7-B7) | =to(1000).(rand(interval(B8,C8))).id@u().m(to(B12)).sort() | ||
13 | for B12 | =elapse(B8,C12(B13)) | =D7+rand(D8-D7) | |
14 | >A8 = A8+1 | |||
15 | if D2>=C2 | >B3=A2(B2)(1) | ||
16 | >C2 = A2(B2)(2)-B3 | |||
17 | >B2=B2+1 | |||
18 | =D2=0 | |||
19 | >A9.insert(0,(D2+B3), B11,product,C13,D13) | |||
20 | >D2 = D2+1 | |||
21 | for D6 | for A6 | =func(genOne,A21,B21) | |
22 | if (A8 >= A7) | >A10.export@a(A9.cursor()) | ||
23 | break A21 | |||
24 | >A10.export@at(A9.cursor()) | >A9.reset() |
Generate 200 million items of data and store them in a txt file, with a file size of 8448MB. The A2 sequence is used to assist in generating unordered and non-repeating numbers as key values.
The script for generating esProc composite table data is as follows:
A | B | C | |
---|---|---|---|
1 | =now() | 0 | |
2 | =file("H:/tmp/ inbound.txt").cursor@t() | ||
3 | =file("H:/tmp/data3/ inbound.ctx").create@y(#pid, store, product, indate,num) | ||
4 | for A2, 50000 | >A3.append(A4) | |
5 | =B1=B1+A4.len() | ||
6 | if B1%5000000==0 | >output(B1) | |
7 | >A3.close() | ||
8 | >output("sum =" / B1) | ||
9 | =interval@ms(A1,now()) |
Convert txt data into composite table data, and the generated composite table ctx file size is 608MB.
Use Java program to convert txt data into ORC file data.
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.CompressionKind;
import com.scudata.common.Logger;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
public class CreateOrcFile {
private TypeDescription m_schema;
Writer m_writer;
public CreateOrcFile (String destFile){
try {
/* Define table structure*/
String struct = "struct<pid:int,store:string,product:int,indate:date,num:int>";
m_schema = TypeDescription.fromString(struct);
File f = new File(destFile);
if (f.exists()) {
f.delete();
}
m_writer = getWriter(destFile);
} catch (IOException e) {
Logger.error("CreateOrcFile:: "+e);
}
}
public static void main(String[] args) throws IOException {
String srcFile = "H:/tmp/data3/inbound.txt";
String destFile = "h:/tmp/data3/out_inbound.orc";
CreateOrcFile cls = new CreateOrcFile(destFile);
cls.writeToOrcFile(srcFile);
}
private void writeToOrcFile(String srcFile)
{
try {
Sequence tab = null;
FileObject oRet = new FileObject(srcFile);
String exp = String.format(";,\",\"");
IParam param = ParamParser.parse(exp, null, null);
ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
long total = 0;
int unitSize = 10000;
VectorizedRowBatch batch = m_schema.createRowBatch();
LongColumnVector pid = (LongColumnVector) batch.cols[0];
BytesColumnVector store = (BytesColumnVector) batch.cols[1];
LongColumnVector product = (LongColumnVector) batch.cols[2];
DateColumnVector indate = (DateColumnVector) batch.cols[3];
LongColumnVector num = (LongColumnVector) batch.cols[4];
while(null!=(tab = (Sequence) c.fetch(unitSize)) ) {
total += tab.length();
for(int i=0; i<tab.length(); i++) {
BaseRecord sq = (BaseRecord)tab.get(i+1);
int row = batch.size++;
pid.vector[row] = Long.parseLong(sq.getFieldValue(0).toString());
byte[] buffer = ((String)sq.getFieldValue(1)).getBytes(StandardCharsets.UTF_8);
store.setRef(row, buffer, 0, buffer.length);
product.vector[row] = (Integer)sq.getFieldValue(2);
Date d = (Date)sq.getFieldValue(3);
indate.vector[row] = dateToDays(d);
num.vector[row] = Long.parseLong(sq.getFieldValue(4).toString());
if (batch.size == batch.getMaxSize()) {
m_writer.addRowBatch(batch);
batch.reset();
}
}
if (total %500000==0) {
System.out.println("idx = "+total);
}
if (tab.length()<unitSize) {
break;
}
}
if (batch.size != 0) {
m_writer.addRowBatch(batch);
}
m_writer.close();
}catch(Exception e) {
System.out.println("aaa: "+e);
}
}
/* Convert date to days*/
public static int dateToDays(Date date) {
int days = 0;
ZoneId zoneId = ZoneId.systemDefault();
int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds();
long zonems = offsetSeconds * 1000;
long milliseconds = date.getTime()+zonems;
days = (int)(milliseconds / (1000 * 60 * 60 * 24));
return days;
}
/* ORC write operation object */
private Writer getWriter(String filePath) throws IOException {
return OrcFile.createWriter(new Path(filePath),
OrcFile.writerOptions(new Configuration()).
setSchema(m_schema).
compress(CompressionKind.SNAPPY));
}
}
Convert txt data into ORC file data, generating a file size of 513.9MB.
Same as above, generate data using Java program.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
public class CreateParquetFile {
private static final MessageType FILE_SCHEMA;
private static final String TABLE_NAME = "inbound_parquet";
ParquetWriter m_writer; static {
// Define table structure
Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
messageTypeBuilder
.optional(PrimitiveType.PrimitiveTypeName.INT64)
.named("pid")
.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.named("store")
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("product")
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("indate")
.optional(PrimitiveType.PrimitiveTypeName.INT64)
.named("num");
FILE_SCHEMA = messageTypeBuilder.named(TABLE_NAME);
}
<br> public CreateParquetFile (String outFilePath){
try { File f = new File(outFilePath);
if (f.exists()) {
f.delete();
}
m_writer = getWriter(outFilePath);
} catch (IOException e) {
e.printStackTrace();
}
} /* Get write operation object */
private static ParquetWriter getWriter(String filePath) throws IOException {Path path = new Path(filePath);
return ExampleParquetWriter.builder(path)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(new Configuration())
.withType(FILE_SCHEMA).build();} /* Generate ParquetFile*/
private void writeToParquetFile(String sfile)
{
try { FileObject oRet = new FileObject(sfile);
String exp = String.format(";,\",\"");
IParam param = ParamParser.parse(exp, null, null);
ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
Sequence tab = null;
long total = 0;
while(null!=(tab = (Sequence) c.fetch(10000)) ) {
total += tab.length();
writeToFile(tab);
if (total %500000==0) {
System.out.println("idx ="+total);
} if (tab.length()<10000) {break;
}
}
m_writer.close();
}catch(Exception e) {
System.out.println("aaa:"+e);
}
} public static int dateToInt(Date date) {long milliseconds = date.getTime();
return (int) (milliseconds / 1000);
} /* Data writing */ private void writeToFile(Sequence tab) throws IOException { SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(FILE_SCHEMA); Group group = null; for(int i=0;i<tab.length(); i++) { BaseRecord sq = (BaseRecord)tab.get(i+1); try { group = simpleGroupFactory.newGroup(); group.append("pid", Long.parseLong(sq.getFieldValue(0).toString()) ); group.append("store", (String)sq.getFieldValue(1)); group.append("product", (Integer)sq.getFieldValue(2)); Date d = (Date)sq.getFieldValue(3); group.append("indate", dateToDays(d)); group.append("num", Long.parseLong(sq.getFieldValue(4).toString()) ); m_writer.write(group); }catch(Exception e) { System.out.println("writeToFile:"+e); } } } public static int dateToDays(Date date) {
int days = 0;
ZoneId zoneId = ZoneId.systemDefault();
int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds();
long zonems = offsetSeconds * 1000;
long milliseconds = date.getTime()+zonems;
days = (int)(milliseconds / (1000 * 60 * 60 * 24));
return days;
}public static void main(String[] args) throws IOException { String srcFile = "H:/tmp/data3/inbound.txt"; /* Retrieve the directory where the file is written */ String destFile= "h:/tmp/data3/out_inbound.parquet"; CreateParquetFile cls = new CreateParquetFile(destFile); cls.writeToParquetFile(srcFile); System.out.println("OK."); }
}
Convert text data into Parquet file data, generating a file size of 1157MB.
Read the file data records, convert them according to the field data type, and store them in the Object [] array. Use this method to traverse and test the read time of the esProc composite table, ORC, and Parquet files.
SPL script to read data:
A | B | |
---|---|---|
1 | =now() | |
2 | =file("H:/tmp/data3/inbound.ctx").open() | |
3 | =A2.cursor() | |
4 | for A3, 10000 | >B1=B1+A4.len() |
5 | =A2.close() | |
6 | >output("total=" / B1) | |
7 | =interval@ms(A1,now()) |
Using cursor traversal to read composite table data, takes 11.053 seconds.
Using Java program to read:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.time.LocalDate;
public class InboundReader {
public static void main(String[] args) throws Exception {
try {
long start = System.currentTimeMillis();
String sfile = "H:/tmp/data3/out_inbound.orc";
Reader reader = OrcFile.createReader(new Path(sfile), OrcFile.readerOptions(conf));
TypeDescription readSchema = reader.getSchema();
System.out.println("Row count: " + reader.getNumberOfRows());
VectorizedRowBatch batch = readSchema.createRowBatch(50000);
RecordReader rowIterator = reader.rows(reader.options().schema(readSchema));
LongColumnVector pid = (LongColumnVector) batch.cols[0];
BytesColumnVector store = (BytesColumnVector) batch.cols[1];
LongColumnVector product = (LongColumnVector) batch.cols[2];
DateColumnVector indate = (DateColumnVector) batch.cols[3];
LongColumnVector total = (LongColumnVector) batch.cols[4];
long sum = 0;
String[] cols = new String[]{"pid","store", "product","indate","num"};
Object[] items = new Object[cols.length];
while (rowIterator.nextBatch(batch)) {
for (int row = 0; row < batch.size; ++row) {
int productRow = product.isRepeating ? 0 : row;
int indateRow = indate.isRepeating ? 0 : row;
int totalRow = total.isRepeating ? 0 : row;
items[0] = pid.vector[row];
items[1] = store.toString(row);
items[2] = (product.noNulls || !product.isNull[productRow] ? product.vector[productRow] : 0);
items[3] = (indate.noNulls || !indate.isNull[indateRow] ? LocalDate.ofEpochDay(indate.vector[indateRow]).toString() : null);
items[4] = (total.noNulls || !total.isNull[totalRow] ? total.vector[totalRow] : 0);
}
sum+=batch.size;
}
rowIterator.close();
reader.close();
System.out.println("sum = "+ sum+"; Time = "+(System.currentTimeMillis() - start));
}catch(Exception e) {
System.out.println(e);
}
}
}
Traverse and read the ORC file, taking 19.595 seconds.
import java.time.LocalDate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupValueSource;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.GroupType;
import com.scudata.common.Logger;
public class ParquetReaderTest {
public static void main(String[] args) throws Exception {
try {
long nStart = System.currentTimeMillis();
ParquetReaderTest cls = new ParquetReaderTest();
String srcFile = "h:/tmp/data3/out_inbound.parquet";
cls.parquetRead(srcFile);
System.out.println("time = " + (System.currentTimeMillis() - nStart));
} catch (Exception e) {
e.printStackTrace();
}
}
private void parquetRead(String srcFile) throws Exception {
long sum = 0;
Map map = new LinkedHashMap<>();
map.put("pid", "long");
map.put("store", "string");
map.put("product", "int");
map.put("indate", "date");
map.put("num", "long");
GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader reader = new ParquetReader(new Path(srcFile), readSupport);
Group line = null;
Object[] items = null;
while ((line = reader.read()) != null) {
items = readGroup(line, map.values());
sum++;
if (sum % 5000000 == 0) {
System.out.println("idx =" + sum);
}
}
reader.close();
System.out.println("sum=" + sum);
}
private Object[] readGroup(Group g, Collection colsType) throws Exception {
Object[] items = new Object[colsType.size()];
try {
String sType = null;
Iterator iter = colsType.iterator();
int n = 0;
while (iter.hasNext()) {sType = iter.next();
Type colType = g.getType().getFields().get(n);
if (colType.isPrimitive()) {if (sType.equals("string")) {items[n] = g.getString(n, 0);
} else if (sType.equals("int")) {items[n] = g.getInteger(n, 0);
} else if (sType.equals("long")) {items[n] = g.getLong(n, 0);
} else if (sType.equals("double")) {items[n] = g.getDouble(n, 0);
} else if (sType.equals("float")) {items[n] = g.getFloat(n, 0);
} else if (sType.equals("boolean")) {items[n] = g.getBoolean(n, 0);
} else if (sType.equals("int96")) {items[n] = g.getInt96(n, 0);
} else if (sType.equals("date")) {items[n] = LocalDate.ofEpochDay(g.getInteger(n, 0)).toString();}
n++;
} else {GroupValueSource subG = g.getGroup(n, 0);
GroupType pt = subG.getType();
int colSize = pt.getFieldCount();
System.out.println("subColSize =" + colSize);
}
}
} catch (Exception e) {
Logger.error("readGroup::" + e);
}
return items;
}
}
Traverse and read Parquet file, taking 85.370 seconds. Testing has found that if the date type is stored in int96 binary format, converting it to date is slower.
Composite table | ORC | Parquet | TXT |
---|---|---|---|
Compression format | lz4 | snappy | snappy |
File size (MB) | 608 | 513.9 | 1157 |
Time taken for the 1st time (second) | 15.519 | 28.517 | 93.972 |
Time taken for the 2nd time (second) | 10.841 | 18.034 | 76.114 |
Time taken for the 3rd time (second) | 11.053 | 19.595 | 85.370 |
From the table, it can be seen that compared to uncompressed text data, these three columnar storage formats have good data compression ratios, greatly saving data storage space. Among them, ORC has the highest compression rate, while the esProc composite table is slightly lower but the difference is not significant (about 20%). Parquet's compression rate is significantly worse, already exceeding twice that of ORC.
In terms of reading speed, esProc composite table is the fastest, almost twice as fast as ORC and far exceeding Parquet. This can indicate from another perspective that Parquet may have become an outdated file format and is being replaced by ORC.
From both the perspective of compression rate and read performance, the overall advantage of esProc composite table is greater. If we also consider the flexible segmentation and ordered positioning functions on the esProc composite table, its performance advantages for big data computing will be even more apparent.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code