Skip to content

Commit

Permalink
1623: ODP-1286| druid vertsion move to 28.0.1 (#27)
Browse files Browse the repository at this point in the history
(cherry picked from commit c0b7e51)
  • Loading branch information
basapuram-kumar authored and prabhjyotsingh committed Sep 27, 2024
1 parent 4bd3303 commit 70c1256
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 50 deletions.
2 changes: 1 addition & 1 deletion druid-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<name>Hive Druid Handler</name>
<properties>
<hive.path.to.root>..</hive.path.to.root>
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guava.version>31.1-jre</druid.guava.version>
</properties>
<dependencies>
<!-- dependencies are always listed in sorted order by groupId, artifactId -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static InputRowParser getInputRowParser(Table table, TimestampSpec timestampSpec

// Default case JSON
if ((parseSpecFormat == null) || "json".equalsIgnoreCase(parseSpecFormat)) {
return new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, null, null), "UTF-8");
return new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec), "UTF-8");
} else if ("csv".equalsIgnoreCase(parseSpecFormat)) {
return new StringInputRowParser(new CSVParseSpec(timestampSpec,
dimensionsSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
import org.apache.druid.metadata.storage.derby.DerbyConnector;
import org.apache.druid.metadata.storage.derby.DerbyMetadataStorage;
import org.apache.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorDriverConfig;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorSslConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig;
Expand Down Expand Up @@ -319,7 +320,7 @@ private void updateKafkaIngestion(Table table) {
+ columnNames);
}

DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs, null, null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs);
String timestampFormat = DruidStorageHandlerUtils
.getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_FORMAT);
String timestampColumnName = DruidStorageHandlerUtils
Expand Down Expand Up @@ -884,7 +885,8 @@ private SQLMetadataConnector buildConnector() {
connector =
new MySQLConnector(storageConnectorConfigSupplier,
Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()),
new MySQLConnectorConfig());
new MySQLConnectorSslConfig(),
new MySQLConnectorDriverConfig());
break;
case "postgresql":
connector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
Expand Down Expand Up @@ -217,7 +218,8 @@ private DruidStorageHandlerUtils() {
/**
* Mapper to use to serialize/deserialize Druid objects (SMILE).
*/
public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory());
public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(
String.valueOf(new SmileFactory()));
private static final int DEFAULT_MAX_TRIES = 10;

static {
Expand Down Expand Up @@ -797,7 +799,7 @@ private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) {
if (shardSpec instanceof LinearShardSpec) {
return new LinearShardSpec(shardSpec.getPartitionNum() + 1);
} else if (shardSpec instanceof NumberedShardSpec) {
return new NumberedShardSpec(shardSpec.getPartitionNum(), ((NumberedShardSpec) shardSpec).getPartitions());
return new NumberedShardSpec(shardSpec.getPartitionNum(), ((NumberedShardSpec) shardSpec).getNumCorePartitions());
} else {
// Druid only support appending more partitions to Linear and Numbered ShardSpecs.
throw new IllegalStateException(String.format("Cannot expand shard spec [%s]", shardSpec));
Expand Down Expand Up @@ -832,12 +834,16 @@ public static IndexSpec getIndexSpec(Configuration jc) {
if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) {
bitmapSerdeFactory = new ConciseBitmapSerdeFactory();
} else {
bitmapSerdeFactory = new RoaringBitmapSerdeFactory(true);
bitmapSerdeFactory = RoaringBitmapSerdeFactory.getInstance();
}
return new IndexSpec(bitmapSerdeFactory,
IndexSpec.DEFAULT_DIMENSION_COMPRESSION,
IndexSpec.DEFAULT_METRIC_COMPRESSION,
IndexSpec.DEFAULT_LONG_ENCODING);
IndexSpec.DEFAULT.getDimensionCompression(),
IndexSpec.DEFAULT.getStringDictionaryEncoding(),
IndexSpec.DEFAULT.getMetricCompression(),
IndexSpec.DEFAULT.getLongEncoding(),
IndexSpec.DEFAULT.getJsonCompression(),
IndexSpec.DEFAULT.getSegmentLoader()
);
}

public static Pair<List<DimensionSchema>, AggregatorFactory[]> getDimensionsAndAggregates(List<String> columnNames,
Expand Down Expand Up @@ -1054,18 +1060,18 @@ private static BloomKFilter evaluateBloomFilter(ExprNodeDesc desc, Configuration
return null;
}
String columnName = ((ExprNodeColumnDesc) (funcDesc.getChildren().get(0))).getColumn();
ValueType targetType = null;
ColumnType targetType = null;
if (udf instanceof GenericUDFBridge) {
Class<? extends UDF> udfClass = ((GenericUDFBridge) udf).getUdfClass();
if (udfClass.equals(UDFToDouble.class)) {
targetType = ValueType.DOUBLE;
targetType = ColumnType.DOUBLE;
} else if (udfClass.equals(UDFToFloat.class)) {
targetType = ValueType.FLOAT;
targetType = ColumnType.FLOAT;
} else if (udfClass.equals(UDFToLong.class)) {
targetType = ValueType.LONG;
targetType = ColumnType.LONG;
}
} else if (udf instanceof GenericUDFToString) {
targetType = ValueType.STRING;
targetType = ColumnType.STRING;
}

if (targetType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@ public FileSinkOperator.RecordWriter getHiveRecordWriter(
.getDimensionsAndAggregates(columnNames, columnTypes);
final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
new TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
new DimensionsSpec(dimensionsAndAggregates.lhs, Lists
.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
Constants.DRUID_SHARD_KEY_COL_NAME
), null
)
new DimensionsSpec(dimensionsAndAggregates.lhs)
));

Map<String, Object>
Expand All @@ -152,10 +148,9 @@ public FileSinkOperator.RecordWriter getHiveRecordWriter(
Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY);

IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, null, null, null,
new File(basePersistDirectory, dataSource), new CustomVersioningPolicy(version), null, null, null, indexSpec,
null, true, 0, 0, true, null, 0L, null, null);

RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(null, maxRowInMemory, null, null, null, null,
new File(basePersistDirectory, dataSource), new CustomVersioningPolicy(version), null, null, null, indexSpec,
null, 0, 0, true, null, 0L, null, null);
LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
return new DruidRecordWriter(dataSchema, realtimeTuningConfig,
DruidStorageHandlerUtils.createSegmentPusherForDirectory(segmentDirectory, jc),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private static List<LocatedSegmentDescriptor> fetchLocatedSegmentDescriptors(Str
request =
String.format("http://%s/druid/v2/datasources/%s/candidates?intervals=%s",
address,
query.getDataSource().getNames().get(0),
query.getDataSource().getTableNames().stream().findFirst(),
URLEncoder.encode(intervals, "UTF-8"));
LOG.debug("sending request {} to query for segments", request);
final InputStream response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand All @@ -36,7 +39,7 @@
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
Expand Down Expand Up @@ -102,11 +105,19 @@ public DruidRecordWriter(DataSchema dataSchema,
Preconditions.checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir),
"realtimeTuningConfig is null");
this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
rowIngestionMeters,
false,
Integer.MAX_VALUE,
0
);

appenderator = Appenderators
.createOffline("hive-offline-appenderator", this.dataSchema, tuningConfig, false, new FireDepartmentMetrics(),
dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO,
DruidStorageHandlerUtils.INDEX_MERGER_V9);
.createOffline("hive-offline-appenderator", this.dataSchema, tuningConfig, new FireDepartmentMetrics(),
dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO,
DruidStorageHandlerUtils.INDEX_MERGER_V9,
rowIngestionMeters, parseExceptionHandler, false);
this.maxPartitionSize = maxPartitionSize;
appenderator.startJob();
this.segmentsDescriptorDir = Preconditions.checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
Expand Down Expand Up @@ -170,7 +181,7 @@ private SegmentIdWithShardSpec getSegmentIdentifierAndMaybePush(long truncatedTi

private void pushSegments(List<SegmentIdWithShardSpec> segmentsToPush) {
try {
SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentsToPush, committerSupplier.get(), false).get();
SegmentsAndCommitMetadata segmentsAndMetadata = appenderator.push(segmentsToPush, committerSupplier.get(), false).get();
final Set<String> pushedSegmentIdentifierHashSet = new HashSet<>();

for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class AvroParseSpec extends ParseSpec {
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("flattenSpec") JSONPathSpec flattenSpec) {
super(timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null),
dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null));
dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null));

this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;

Expand Down Expand Up @@ -77,6 +78,11 @@ public KafkaIndexTaskTuningConfig(
);
}

@Override
public boolean isSkipBytesInMemoryOverheadCheck() {
return false;
}

@Override
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) {
return new KafkaIndexTaskTuningConfig(
Expand Down Expand Up @@ -124,5 +130,8 @@ public String toString() {
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
'}';
}

@Override
public AppendableIndexSpec getAppendableIndexSpec() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand Down Expand Up @@ -136,8 +136,8 @@ public Duration getOffsetFetchPeriod() {
@Override
public String toString() {
return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment="
+ getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" + TuningConfigs
.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod="
+ getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" +
getMaxBytesInMemoryOrDefault() + ", intermediatePersistPeriod="
+ getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory()
+ ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec()
+ ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout="
Expand All @@ -158,4 +158,14 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() {
getHandoffConditionTimeout(), isResetOffsetAutomatically(), getSegmentWriteOutMediumFactory(),
getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions());
}

@Override
public boolean isSkipBytesInMemoryOverheadCheck() {
return false;
}

@Override
public AppendableIndexSpec getAppendableIndexSpec() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.google.common.base.Throwables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
Expand Down Expand Up @@ -126,7 +126,7 @@ public JsonParserIterator<R> createQueryResultsIterator() {
} catch (Exception e) {
if (iterator != null) {
// We got exception while querying results from this host.
CloseQuietly.close(iterator);
CloseableUtils.closeAndWrapExceptions(iterator);
}
LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]",
query, address, e.getMessage());
Expand Down Expand Up @@ -200,7 +200,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException

@Override public void close() {
if (queryResultsIterator != null) {
CloseQuietly.close(queryResultsIterator);
CloseableUtils.closeAndWrapExceptions(queryResultsIterator);
}
}

Expand Down Expand Up @@ -248,7 +248,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
CloseableUtils.closeAndWrapExceptions(jp);
return false;
}

Expand Down Expand Up @@ -296,7 +296,7 @@ private void init() {
}

@Override public void close() throws IOException {
CloseQuietly.close(jp);
CloseableUtils.closeAndWrapExceptions(jp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
inputRowParser =
new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
"auto",
null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")), null, null)));
null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")))));
final Map<String, Object>
parserMap =
objectMapper.convertValue(inputRowParser, new TypeReference<Map<String, Object>>() {
Expand All @@ -146,11 +146,12 @@
null,
objectMapper);

IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
IndexSpec indexSpec = new IndexSpec(RoaringBitmapSerdeFactory.getInstance(), null, null, null, null, null, null);
RealtimeTuningConfig
tuningConfig =
new RealtimeTuningConfig(null,
null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, null, 0, 0, null,
new RealtimeTuningConfig(null, null,
null, null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null,
0, 0, null,
null, 0L, null, null);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() {
Expand Down
2 changes: 1 addition & 1 deletion itests/qtest-druid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<druid.jersey.version>1.19.3</druid.jersey.version>
<druid.jetty.version>9.4.45.v20220203</druid.jetty.version>
<druid.derby.version>10.11.1.1</druid.derby.version>
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guava.version>31.1-jre</druid.guava.version>
<druid.guice.version>4.1.0</druid.guice.version>
<kafka.test.version>2.5.0</kafka.test.version>
<druid.guice.version>4.1.0</druid.guice.version>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@
<derby.version>10.14.2.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
<druid.version>0.17.1</druid.version>
<druid.version>28.0.1</druid.version>
<esri.version>2.2.4</esri.version>
<flatbuffers.version>1.12.0</flatbuffers.version>
<guava.version>22.0</guava.version>
<guava.version>31.1-jre</guava.version>
<groovy.version>2.4.21</groovy.version>
<h2database.version>2.2.220</h2database.version>
<hadoop.version>3.3.6</hadoop.version>
Expand Down
2 changes: 1 addition & 1 deletion standalone-metastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2
</dropwizard-metrics-hadoop-metrics2-reporter.version>
<dropwizard.version>3.1.0</dropwizard.version>
<guava.version>22.0</guava.version>
<guava.version>31.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<hikaricp.version>4.0.3</hikaricp.version>
<jackson.version>2.16.1</jackson.version>
Expand Down
2 changes: 1 addition & 1 deletion storage-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<commons-logging.version>1.1.3</commons-logging.version>
<guava.version>22.0</guava.version>
<guava.version>31.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<junit.version>4.13.2</junit.version>
<junit.jupiter.version>5.6.3</junit.jupiter.version>
Expand Down

0 comments on commit 70c1256

Please sign in to comment.