Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First Unbounded Source Implementation #56

Merged
Show file tree
Hide file tree
Changes from 140 commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
eb9dc8f
cleaning up the main branch for new version.
prodriguezdefino Jun 27, 2023
0695673
updates in ignore and readme files
prodriguezdefino Jun 27, 2023
e36cc44
prepping the pom addition, added parent's compliance tools
prodriguezdefino Jun 27, 2023
e167803
adding parent pom and the connector impl project pom
prodriguezdefino Jun 28, 2023
8043378
adding common functionalities
prodriguezdefino Jun 28, 2023
b455b4e
added the bigquery services wrapper and factories
prodriguezdefino Jun 28, 2023
27cd837
creates the split, its state and the enumerator state
prodriguezdefino Jun 28, 2023
4d4b60f
added configs, split reader and split assignment impls
prodriguezdefino Jun 28, 2023
0567b58
applying recommendations from sonartype-lift
prodriguezdefino Jun 28, 2023
9006714
adding the Datastream source implementation for BigQuery
prodriguezdefino Jun 28, 2023
d5d95bf
added Table API implementation for BigQuery
prodriguezdefino Jun 29, 2023
1263768
adding the example and shaded distro jar, fixes a NPE when the provid…
prodriguezdefino Jun 29, 2023
4e185bd
added covertura analysis through open clover and improved few tests
prodriguezdefino Jun 30, 2023
4e63b02
fixes partition and row restriction orden for push downs
prodriguezdefino Jul 1, 2023
74d5b90
adds a test which make the storage read server stream iterator fail f…
prodriguezdefino Jul 2, 2023
aff1d6a
renamed example's packages and pom's group id to the correct one
prodriguezdefino Jul 2, 2023
57d5a48
resolve locally errors that occur in the split reader
prodriguezdefino Jul 2, 2023
b16392f
added a read from query results example
prodriguezdefino Jul 4, 2023
5743292
merge changes from master (previous pom deletion resolution)
prodriguezdefino Jul 6, 2023
cab9115
fixing the package name for the schema namespace
prodriguezdefino Jul 6, 2023
3375d54
Merge branch 'common_code_source' into bq_services_wrappers
prodriguezdefino Jul 10, 2023
849f769
merged main branch and took care of few lift comments
prodriguezdefino Jul 11, 2023
fd70b95
Merge branch 'bq_services_wrappers' into source_splits
prodriguezdefino Jul 11, 2023
0f18d14
merge from source_split
prodriguezdefino Jul 11, 2023
6b08119
fixing lift recommendations and spotless
prodriguezdefino Jul 11, 2023
5973d26
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Jul 11, 2023
2d9635d
Merge branch 'source_functionality' into table_api
prodriguezdefino Jul 11, 2023
135beeb
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
78aec3e
merge from example and distro branch
prodriguezdefino Jul 11, 2023
3ffb582
fixes namespace error in test and spotless
prodriguezdefino Jul 11, 2023
cc45b2f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Jul 11, 2023
3472934
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Jul 11, 2023
1dabd70
removed repeated mvn reference from command
prodriguezdefino Jul 11, 2023
27fa94a
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Jul 11, 2023
1d0dafa
adds documentation to enum
prodriguezdefino Jul 11, 2023
289c31c
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Jul 11, 2023
882816e
merge from test_bqstoragereadapi_error branch
prodriguezdefino Jul 11, 2023
161f577
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Jul 11, 2023
d55d12f
fixes presubmit problem
prodriguezdefino Jul 11, 2023
3ce56f0
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Jul 11, 2023
b3426f5
initial work
prodriguezdefino Jul 26, 2023
f473d57
addressing comments from review
prodriguezdefino Jul 27, 2023
c9e22b3
continuing with refactor
prodriguezdefino Aug 1, 2023
c178f83
merge from main
prodriguezdefino Aug 1, 2023
09eaaa4
merge from master
prodriguezdefino Aug 1, 2023
def3cc4
Merge branch 'source_splits' into split_assigner_and_reader
prodriguezdefino Aug 1, 2023
ceabb12
fixed type reference Int -> Long
prodriguezdefino Aug 1, 2023
d48734f
progress
prodriguezdefino Aug 2, 2023
4366aee
improved coverage
prodriguezdefino Aug 3, 2023
0dc8875
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 3, 2023
963c80b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 3, 2023
5206ef1
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 3, 2023
2b59a7c
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 3, 2023
4eb1137
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 3, 2023
6db9dea
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 3, 2023
237f798
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 3, 2023
8f12843
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 3, 2023
6348b21
fixed reference Int -> Long
prodriguezdefino Aug 3, 2023
9913587
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 3, 2023
5919fd8
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 3, 2023
1ad1eb1
merged from previous branches
prodriguezdefino Aug 3, 2023
ea506a1
fix cloudbuild script merge
prodriguezdefino Aug 3, 2023
80cf344
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 3, 2023
1a80c5e
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 3, 2023
9a97444
Merge branch 'fix_artifact_groupid_name' into partition_refactor
prodriguezdefino Aug 3, 2023
12264bf
fix directory reference name for codecov
prodriguezdefino Aug 3, 2023
67955dc
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 4, 2023
9eb78a0
Merge branch 'fix_artifact_groupid_name' into query_example
prodriguezdefino Aug 4, 2023
f1ee89e
Merge branch 'fix_artifact_groupid_name' into partition_refactor
prodriguezdefino Aug 4, 2023
6ea14df
craated abstract split assigner and bounded version
prodriguezdefino Aug 4, 2023
ef743c9
progress
prodriguezdefino Aug 4, 2023
bbe805b
first implementation of unbounded source for completed partitions
prodriguezdefino Aug 7, 2023
c2042fd
only sync those collections that are actively added and removed from …
prodriguezdefino Aug 7, 2023
b52afaa
introduced notification of new splits to enumerator
prodriguezdefino Aug 8, 2023
5beafab
added observer capabilities to enumerator for new splits when on unbo…
prodriguezdefino Aug 8, 2023
7fd6539
spotless apply
prodriguezdefino Aug 8, 2023
13b6cab
added logs
prodriguezdefino Aug 8, 2023
1734bac
merge from main
prodriguezdefino Aug 8, 2023
e96ff59
addressing comments from review
prodriguezdefino Aug 8, 2023
3b78492
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
c492f02
improved hashcode and equals readability
prodriguezdefino Aug 8, 2023
6a05498
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 8, 2023
820fb3b
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 8, 2023
dd5be94
improve readibility for hashcode and equals
prodriguezdefino Aug 8, 2023
fbd07c6
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 8, 2023
a44b11d
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Aug 8, 2023
156552f
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Aug 8, 2023
31559d5
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Aug 8, 2023
53c514e
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 8, 2023
656c831
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 8, 2023
bc626cd
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 8, 2023
0ee8345
Merge branch 'query_example' into partition_refactor
prodriguezdefino Aug 8, 2023
327dfd6
merge from prevoius branch
prodriguezdefino Aug 9, 2023
d3c71c7
partial work
prodriguezdefino Aug 9, 2023
9aae0af
changed tests to use google-truth instead of junit or assertj asserti…
prodriguezdefino Aug 9, 2023
61e5644
Merge branch 'split_assigner_and_reader' into source_functionality
prodriguezdefino Aug 9, 2023
517de82
added google-truth to tests
prodriguezdefino Aug 9, 2023
d916a1c
Merge branch 'source_functionality' into table_api
prodriguezdefino Aug 9, 2023
11fce0d
added google-truth to tests
prodriguezdefino Aug 9, 2023
62397e0
simplified partition discovery, improved logging
prodriguezdefino Aug 10, 2023
099078e
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Aug 10, 2023
40db3cb
merge from previous branch
prodriguezdefino Aug 10, 2023
8dbf958
merge from previous branch
prodriguezdefino Aug 10, 2023
d1c56d4
merge from previous branch
prodriguezdefino Aug 10, 2023
12553a4
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Aug 10, 2023
bba0167
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Aug 10, 2023
c860c28
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Aug 10, 2023
62aafbc
Merge branch 'query_example' into partition_refactor
prodriguezdefino Aug 10, 2023
3261811
Merge branch 'partition_refactor' into unboundedsource_completepartit…
prodriguezdefino Aug 10, 2023
7d0f54b
package refactor for assigners and added tests for them
prodriguezdefino Aug 11, 2023
062099f
tested concurrent collections for split assigner state
prodriguezdefino Aug 11, 2023
9a9c9df
added window definition for streaming example
prodriguezdefino Aug 19, 2023
06b038e
added an hybrid source example
prodriguezdefino Aug 21, 2023
cd4e71e
unify non query examples
prodriguezdefino Aug 23, 2023
9da7706
merge from master after #44
prodriguezdefino Oct 31, 2023
a10470c
Merge branch 'source_functionality' into table_api
prodriguezdefino Oct 31, 2023
6b28a0c
removing guava dependency from file
prodriguezdefino Nov 1, 2023
7160ff9
merge from master
prodriguezdefino Nov 1, 2023
87780c5
adding serializable autovalue annotation to avoid storing an Optional…
prodriguezdefino Nov 2, 2023
3f4d1be
addressing comments from review
prodriguezdefino Nov 2, 2023
c8eb789
addressed comments from review
prodriguezdefino Nov 2, 2023
fd79610
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 2, 2023
8008260
merge from #47
prodriguezdefino Nov 2, 2023
c923f31
merge from #48
prodriguezdefino Nov 2, 2023
d135144
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
ad2e4e5
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Nov 3, 2023
ebe9014
merge from previous branch
prodriguezdefino Nov 3, 2023
cce6567
partition list in 1024 chunks
prodriguezdefino Nov 3, 2023
f36f91f
Merge branch 'table_api' into add_example_and_shadedsqljar
prodriguezdefino Nov 3, 2023
59472b7
Merge branch 'add_example_and_shadedsqljar' into use_openclover_coverage
prodriguezdefino Nov 3, 2023
3d994a2
Merge branch 'use_openclover_coverage' into partition_restriction_fixes
prodriguezdefino Nov 3, 2023
d9027ab
Merge branch 'partition_restriction_fixes' into test_bqstoragereadapi…
prodriguezdefino Nov 3, 2023
62c87f0
Merge branch 'test_bqstoragereadapi_error' into fix_artifact_groupid_…
prodriguezdefino Nov 3, 2023
87fd326
Merge branch 'fix_artifact_groupid_name' into locally_retry_splitread…
prodriguezdefino Nov 3, 2023
0ba5ce1
Merge branch 'locally_retry_splitreadererror' into query_example
prodriguezdefino Nov 3, 2023
44d21b1
merge from previous branch
prodriguezdefino Nov 3, 2023
421fdfb
merge from previous branch
prodriguezdefino Nov 3, 2023
b421760
removed references to isBlank() string method
prodriguezdefino Nov 6, 2023
209f6db
Merge branch 'main' into unboundedsource_completepartitions
jayehwhyehentee Dec 11, 2023
f56d571
Allow configuring parition discovery interval
jayehwhyehentee Dec 11, 2023
880c980
Address review comments
jayehwhyehentee Dec 11, 2023
c5c3734
Fix max idleness config description
jayehwhyehentee Dec 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flink-connector-bigquery-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ under the License.
<artifactId>flink-connector-bigquery</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ interface QueryDataClient extends Serializable {
Optional<TablePartitionInfo> retrievePartitionColumnInfo(
String project, String dataset, String table);

/**
* Returns, in case of being a partitioned table, all the partitions present alongside their
* status.
*
* @param project The GCP project.
* @param dataset The BigQuery dataset.
* @param table The BigQuery table.
* @return The information and status of the table's partitions.
*/
List<PartitionIdWithInfoAndStatus> retrievePartitionsStatus(
String project, String dataset, String table);

/**
* Returns the {@link TableSchema} of the specified BigQuery table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.common.utils.SchemaTransform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

import java.io.IOException;
Expand All @@ -64,6 +66,7 @@
/** Implementation of the {@link BigQueryServices} interface that wraps the actual clients. */
@Internal
public class BigQueryServicesImpl implements BigQueryServices {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);

@Override
public StorageReadClient getStorageClient(CredentialsOptions credentialsOptions)
Expand Down Expand Up @@ -190,21 +193,24 @@ public List<String> retrieveTablePartitions(String project, String dataset, Stri
project, dataset),
"WHERE",
" partition_id <> '__STREAMING_UNPARTITIONED__'",
String.format(" table_catalog = '%s'", project),
String.format(" AND table_catalog = '%s'", project),
String.format(" AND table_schema = '%s'", dataset),
String.format(" AND table_name = '%s'", table),
"ORDER BY 1 DESC;")
"ORDER BY 1 ASC;")
.stream()
.collect(Collectors.joining("\n"));

QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

TableResult results = bigQuery.query(queryConfig);

return StreamSupport.stream(results.iterateAll().spliterator(), false)
.flatMap(row -> row.stream())
.map(fValue -> fValue.getStringValue())
.collect(Collectors.toList());
List<String> result =
StreamSupport.stream(results.iterateAll().spliterator(), false)
.flatMap(row -> row.stream())
.map(fValue -> fValue.getStringValue())
.collect(Collectors.toList());
LOG.info("Table partitions: {}", result);
return result;
} catch (Exception ex) {
throw new RuntimeException(
String.format(
Expand All @@ -215,6 +221,7 @@ public List<String> retrieveTablePartitions(String project, String dataset, Stri
}
}

@Override
public List<PartitionIdWithInfoAndStatus> retrievePartitionsStatus(
String project, String dataset, String table) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public TablePartitionInfo getInfo() {

@Override
public int hashCode() {
int hash = 5;
hash = 97 * hash + Objects.hashCode(this.getInfo());
hash = 97 * hash + Objects.hashCode(this.getPartitionId());
return hash;
return Objects.hash(this.partitionId, this.info);
}

@Override
Expand All @@ -59,10 +56,8 @@ public boolean equals(Object obj) {
return false;
}
final PartitionIdWithInfo other = (PartitionIdWithInfo) obj;
if (!Objects.equals(this.getPartitionId(), other.getPartitionId())) {
return false;
}
return Objects.equals(this.getInfo(), other.getInfo());
return Objects.equals(this.getPartitionId(), other.getPartitionId())
&& Objects.equals(this.getInfo(), other.getInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public BigQueryPartitionUtils.PartitionStatus getStatus() {
return status;
}

public Boolean isCompleted() {
return status.equals(BigQueryPartitionUtils.PartitionStatus.COMPLETED);
}

@Override
public int hashCode() {
int hash = 7;
hash = 17 * hash + Objects.hashCode(this.getPartitionId());
hash = 17 * hash + Objects.hashCode(this.getInfo());
hash = 17 * hash + Objects.hashCode(this.getStatus());
return hash;
return Objects.hash(getPartitionId(), getInfo(), getStatus());
}

@Override
Expand All @@ -71,13 +71,9 @@ public boolean equals(Object obj) {
return false;
}
final PartitionIdWithInfoAndStatus other = (PartitionIdWithInfoAndStatus) obj;
if (!Objects.equals(this.getPartitionId(), other.getPartitionId())) {
return false;
}
if (!Objects.equals(this.getInfo(), other.getInfo())) {
return false;
}
return this.getStatus() == other.getStatus();
return Objects.equals(this.getPartitionId(), other.getPartitionId())
&& Objects.equals(this.getInfo(), other.getInfo())
&& this.getStatus() == other.getStatus();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -41,11 +42,11 @@ public TablePartitionInfo(
String columnName,
PartitionType partitionType,
StandardSQLTypeName columnType,
Instant sbOldestEntryTime) {
Instant streamingBufferOldestEntryTime) {
this.columnName = columnName;
this.columnType = columnType;
this.partitionType = partitionType;
this.streamingBufferOldestEntryTime = sbOldestEntryTime;
this.streamingBufferOldestEntryTime = streamingBufferOldestEntryTime;
}

public String getColumnName() {
Expand Down Expand Up @@ -74,6 +75,34 @@ public List<PartitionIdWithInfo> toPartitionsWithInfo(List<String> partitionIds)
.orElse(Lists.newArrayList());
}

@Override
public int hashCode() {
return Objects.hash(
this.columnName,
this.columnType,
this.partitionType,
this.streamingBufferOldestEntryTime);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final TablePartitionInfo other = (TablePartitionInfo) obj;
return Objects.equals(this.columnName, other.columnName)
&& this.columnType == other.columnType
&& this.partitionType == other.partitionType
&& Objects.equals(
this.streamingBufferOldestEntryTime, other.streamingBufferOldestEntryTime);
}

@Override
public String toString() {
return "TablePartitionInfo{"
Expand All @@ -83,6 +112,8 @@ public String toString() {
+ columnType
+ ", partitionType="
+ partitionType
+ ", streamingBufferOldestEntryTime="
+ streamingBufferOldestEntryTime
+ '}';
}
}
Loading
Loading