diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java index a84384fe17bf..7604004ff0cc 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java @@ -18,12 +18,14 @@ */ package org.apache.iceberg.flink.source.reader; +import java.util.stream.Stream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverter; import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; @@ -42,8 +44,11 @@ private RowConverter(RowType rowType, TypeInformation rowTypeInfo) { public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema icebergSchema) { RowType rowType = FlinkSchemaUtil.convert(icebergSchema); TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema); - RowTypeInfo rowTypeInfo = - new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + TypeInformation[] typeInformations = + Stream.of(tableSchema.getFieldDataTypes()) + .map(ExternalTypeInfo::of) + .toArray(TypeInformation[]::new); + RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, tableSchema.getFieldNames()); return new RowConverter(rowType, rowTypeInfo); }