-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds python script for incremental partition insertion. #76
Merged
jayehwhyehentee
merged 97 commits into
GoogleCloudDataproc:main
from
prashastia:nightly-tests-unbounded-read-2-3
Jan 8, 2024
Merged
Changes from all commits
Commits
Show all changes
97 commits
Select commit
Hold shift + click to select a range
a4d0d25
Adds a new module for nightly tests.
prashastia f674638
Modifies the docstring.
prashastia 6ec22dc
Modifies the docstring.
prashastia f0cd825
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia c814bd9
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia f5d3f31
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia b9cad97
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia b3ce92b
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia 4e6ef67
Adds simple e2e test, Adds parse_logs.py, Adds table_read.sh
prashastia dd8e5be
Modifies IntegrationTest to check query correctness.
prashastia 137030d
Adds spotless:apply.
prashastia 957ebf0
Renames the table_read file to bounded_table_read.sh
prashastia 2ff4032
Addresses review comments.
prashastia 394132d
Creates separate shell script for bounded jobs.
prashastia 5425806
Adds a new common shell script for common actions performed for both …
prashastia ea79601
Addresses review comments.
prashastia c6a564f
Addresses review comments.
prashastia a2a49b3
Formats the file.
prashastia 3da1b14
Fixes checkstyle violations.
prashastia 0f34c03
Fixes checkstyle violations.
prashastia 0822681
Fixes checkstyle violations.
prashastia 1770526
Fixes checkstyle violations.
prashastia e416b1f
Changes test name, adds a new e2e test for checking table read for ta…
prashastia 471aca2
Changes test name, adds a new e2e test for checking table read for ta…
prashastia 68d48d0
Adds a new e2e test for checking query read.
prashastia 44f39f6
Fixes cause of error in query read execution.
prashastia b8a5883
Fixes cause of error in query read execution.
prashastia 46ac1e6
Adds a new e2e test for checking large table ~200GBs read.
prashastia 74afc54
Adds utils.py - a class containing implementations for writing recor…
prashastia ea0cba5
Addresses review comments in parse_logs.py.
prashastia 05bbd95
Addresses review comments in nightly.sh and modifies nightly.yaml for…
prashastia b731be7
Addresses review comments in requirements.txt
prashastia be260e4
Addresses review comments in table_read.sh
prashastia f172b67
Fixes checkstyle violations, addresses review comments.
prashastia 37bcd09
Update pom.xml
prashastia 485d4b7
Addresses review comments in table_read.sh
prashastia 61e4709
Fixes metric value regex to capture digits.
prashastia 8138578
Addresses review comments on parse_logs.py
prashastia ecf38d0
Fixes indentation problems in pom.xml
prashastia bd4d02c
Fixes indentation problems in pom.xml
prashastia 926a9d4
Adds utils.py containing helper function for dynamic record addition.
prashastia e9a0526
Merge remote-tracking branch 'origin/nightly-tests' into nightly-test…
prashastia dce318f
undo.
prashastia 77a3994
Merge remote-tracking branch 'origin/nightly-tests-large-table-read' …
prashastia 8adfb9b
Adapting parse_log to use the utils argument input class.
prashastia 3dd668d
Adds a new abstract class table_type.py contains records creation and…
prashastia b456523
Adds a python script to create a partitioned table and add initial va…
prashastia 2c2875e
insert_dynamic_partitions.py A python script to insert partitions inc…
prashastia 18bc65f
fixes error.
prashastia 045cfd9
Fixes error in parse_logs.py to accept the argument.
prashastia d36e17c
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1' …
prashastia d822421
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1-2…
prashastia 695f629
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-2-3…
prashastia d2bc679
Fixes error in the script regarding string size.
prashastia b53de8e
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1' …
prashastia 1b153ed
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1-2…
prashastia faae51a
Merge remote-tracking branch 'dataproc/main' into nightly-tests-unbou…
prashastia 6ad3fbc
Modifies utils.py to remove redundant error messages.
prashastia 7498bd7
Modifies parse_logs.py to input on the utils.
prashastia f4bb759
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1' …
prashastia 8c10500
Adds table_type.py. An abstract class containing implementations of w…
prashastia 5aada3c
Addresses a few review comments,
prashastia e56ed19
Addresses a few review comments,
prashastia 442a8af
Addresses review comments.
prashastia a6a4f68
Moves the avro file identifier to the file which uses it.
prashastia ceb531e
removes table_type.py since it is not used in the e2e tests.
prashastia af215bd
Merge remote-tracking branch 'origin/nightly-tests-unbouned-read-1' i…
prashastia dd0cf39
Works on create_partitioned_table.py to accommodate for utils.py rest…
prashastia b3b3dbf
Reformats utils.py
prashastia fcd91fb
Addresses review comments.
prashastia ab856aa
Addresses review comments.
prashastia f344f5d
Adds argparse - for trial
prashastia 446dbd6
Adds argparse - for trial
prashastia 3f86339
Adds argparse - for trial
prashastia 321b3f3
Adds argparse. Removes ArgumentInputUtils from utils.py corrects argu…
prashastia f5231f6
Adds argparse. Removes ArgumentInputUtils from utils.py corrects argu…
prashastia 227319c
Merge remote-tracking branch 'origin/nightly-tests-unbouned-read-1' i…
prashastia 06fe799
Adds argparse. Fixes formatting.
prashastia 3830028
Merge remote-tracking branch 'origin/nightly-tests-unbounded-read-1-2…
prashastia 361e148
Adds argparse. Fixes formatting.
prashastia 4127553
Merge remote-tracking branch 'dataproc/main' into nightly-tests-unbou…
prashastia 6078ebe
Changes insert_dynamic_partitions.py to account for increased rows. N…
prashastia 54170ed
Addresses review comments.
prashastia f2f21d5
Addresses review comments.
prashastia 543817e
Addresses review comments.
prashastia b814589
Addresses review comments.
prashastia b68ea5b
Addresses review comments.
prashastia 820dcc8
Addresses review comments.
prashastia 2bd9fad
Reformats the file.
prashastia a02570f
Addresses review comments.
prashastia 2c7cab2
Addresses review comments.
prashastia d48c345
Addresses review comments.
prashastia 4118b97
Addresses review comments.
prashastia 5eee53a
Addresses review comments. Takes number of rows per partition as an a…
prashastia 3f5367e
Addresses review comments.
prashastia a8fbfb8
Addresses review comments.
prashastia 393a33f
Reduces the wait time - an experiment
prashastia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
"""Python script to dynamically partitions to a BigQuery partitioned table.""" | ||
|
||
import argparse | ||
from collections.abc import Sequence | ||
import datetime | ||
import logging | ||
import threading | ||
import time | ||
from absl import app | ||
from utils import utils | ||
|
||
|
||
def sleep_for_seconds(duration): | ||
logging.info( | ||
'Going to sleep, waiting for connector to read existing, Time: %s', | ||
datetime.datetime.now() | ||
) | ||
# Buffer time to ensure that new partitions are created | ||
# after previous read session and before next split discovery. | ||
time.sleep(duration) | ||
|
||
|
||
def main(argv: Sequence[str]) -> None: | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
'--refresh_interval', | ||
dest='refresh_interval', | ||
help='Minutes between checking new data', | ||
type=int, | ||
required=True, | ||
) | ||
parser.add_argument( | ||
'--project_name', | ||
dest='project_name', | ||
help='Project Id which contains the table to be read.', | ||
type=str, | ||
required=True, | ||
) | ||
parser.add_argument( | ||
'--dataset_name', | ||
dest='dataset_name', | ||
help='Dataset Name which contains the table to be read.', | ||
type=str, | ||
required=True, | ||
) | ||
parser.add_argument( | ||
'--table_name', | ||
dest='table_name', | ||
help='Table Name of the table which is read in the test.', | ||
type=str, | ||
required=True, | ||
) | ||
parser.add_argument( | ||
'-n', | ||
'--number_of_rows_per_partition', | ||
dest='number_of_rows_per_partition', | ||
help='Number of rows to insert per partition.', | ||
type=int, | ||
required=False, | ||
default=30000, | ||
) | ||
|
||
args = parser.parse_args(argv[1:]) | ||
|
||
# Providing the values. | ||
project_name = args.project_name | ||
dataset_name = args.dataset_name | ||
table_name = args.table_name | ||
number_of_rows_per_partition = args.number_of_rows_per_partition | ||
|
||
execution_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).replace( | ||
hour=0, minute=0, second=0, microsecond=0 | ||
) | ||
refresh_interval = int(args.refresh_interval) | ||
|
||
# Set the partitioned table. | ||
table_id = f'{project_name}.{dataset_name}.{table_name}' | ||
|
||
# Now add the partitions to the table. | ||
# Hardcoded schema. Needs to be same as that in the pre-created table. | ||
simple_avro_schema_fields_string = ( | ||
'"fields": [{"name": "name", "type": "string"},{"name": "number",' | ||
'"type": "long"},{"name" : "ts", "type" : {"type" :' | ||
'"long","logicalType": "timestamp-micros"}}]' | ||
) | ||
simple_avro_schema_string = ( | ||
'{"namespace": "project.dataset","type": "record","name":' | ||
' "table","doc": "Avro Schema for project.dataset.table",' | ||
f'{simple_avro_schema_fields_string}' | ||
'}' | ||
) | ||
|
||
# hardcoded for e2e test. | ||
# partitions[i] * number_of_rows_per_partition are inserted per phase. | ||
partitions = [2, 1, 2] | ||
# BQ rate limit is exceeded due to large number of rows. | ||
number_of_threads = 2 | ||
number_of_rows_per_thread = number_of_rows_per_partition // number_of_threads | ||
|
||
avro_file_local = 'mockData.avro' | ||
table_creation_utils = utils.TableCreationUtils( | ||
simple_avro_schema_string, | ||
number_of_rows_per_thread, | ||
table_id, | ||
) | ||
|
||
# Insert iteratively. | ||
prev_partitions_offset = 0 | ||
for number_of_partitions in partitions: | ||
start_time = time.time() | ||
# Wait for read stream formation. | ||
sleep_for_seconds(2.5 * 60) | ||
|
||
# This represents one iteration. | ||
for partition_number in range(number_of_partitions): | ||
threads = list() | ||
# Insert via concurrent threads. | ||
for thread_number in range(number_of_threads): | ||
avro_file_local_identifier = avro_file_local.replace( | ||
'.', '_' + str(thread_number) + '.' | ||
) | ||
thread = threading.Thread( | ||
target=table_creation_utils.avro_to_bq_with_cleanup, | ||
kwargs={ | ||
'avro_file_local_identifier': avro_file_local_identifier, | ||
'partition_number': partition_number + prev_partitions_offset, | ||
'current_timestamp': execution_timestamp, | ||
}, | ||
) | ||
threads.append(thread) | ||
thread.start() | ||
for _, thread in enumerate(threads): | ||
thread.join() | ||
|
||
time_elapsed = time.time() - start_time | ||
prev_partitions_offset += number_of_partitions | ||
|
||
# We wait until the read streams are formed again. | ||
# So that the records just created can be read. | ||
sleep_for_seconds(float(60 * refresh_interval) - time_elapsed) | ||
|
||
|
||
if __name__ == '__main__': | ||
app.run(main) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is
prev_partitions_offset
being incremented multiple times in the same iteration?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Within the same iteration we are adding rows spread amongst 1 or more partitions.
so that at a new read, we make sure that multiple partitions are being read from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take an example.
First iteration:
Second iteration:
Third iteration:
So, we've skipped partition offsets 3 and 5. If that is intentional, then why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is correct.
So to maintain time consistency, I took time at UTC which would be 18:30 hrs.
So (18:30 + 2 = 20:30) to (18:30 + 3 = 21:30) will generate values in partitions 20hrs, 21 hrs.
So in next phase if we generate for 18:30 + 3 - 18:30 + 4, the partitions will clash.
I think this is getting too confusing.
I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed now.