Skip to content

Commit

Permalink
feat: add file merging for small parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
daoleno committed Dec 25, 2023
1 parent d951590 commit af57ae7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
37 changes: 36 additions & 1 deletion bq-syncer/sync_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ def map_bq_type_to_python(bq_type):
return type_mapping.get(bq_type, pl.Utf8)


def get_file_size(file_path):
"""Returns the size of the file in bytes."""
return os.path.getsize(file_path)


def merge_files(table_directory, table_id):
"""Merge files in the table directory with each size less than 1GB."""
parquet_files = glob.glob(os.path.join(table_directory, f"{table_id}_*.parquet"))

# Filter out files larger than 1GB
small_files = [file for file in parquet_files if get_file_size(file) < 1e9]

# If there is one or no file, no need to merge
if len(small_files) <= 1:
return

df = pl.concat(
[pl.scan_parquet(file) for file in small_files]
).collect() # Add collect() here
merged_file_name = os.path.join(
table_directory,
f"{table_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}_merged.parquet",
)
df.write_parquet(merged_file_name)

# Delete the old files
for file in small_files:
os.remove(file)

print(f"[{datetime.now()}] Merged {len(small_files)} files into {merged_file_name}")


def sync_table(table_item, index, total_tables):
try:
print(
Expand All @@ -81,6 +113,9 @@ def sync_table(table_item, index, total_tables):
table_directory = os.path.join(output_directory, table_id)
os.makedirs(table_directory, exist_ok=True) # Ensure the directory exists

# Check if parquet files in the table directory need to be merged
merge_files(table_directory, table_id)

table_ref = dataset_ref.table(table_id)
table = bqclient.get_table(table_ref) # get table object

Expand Down Expand Up @@ -111,7 +146,7 @@ def sync_table(table_item, index, total_tables):
else field
for field, _ in fields
]
query = f"SELECT {', '.join(field_names_for_query)} FROM `{table_ref}` WHERE datastream_metadata.source_timestamp > {last_timestamp}"
query = f"SELECT {', '.join(field_names_for_query)} FROM `{table_ref}` WHERE datastream_metadata.source_timestamp > {last_timestamp} ORDER BY datastream_metadata.source_timestamp ASC"

# Modify the query if --sample is set
if args.sample:
Expand Down
2 changes: 1 addition & 1 deletion dashboard/app/apps/page.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import AppsSummary from "@/components/apps-summary"
import TopApps from "@/components/charts/top-apps"

export const revalidate = 3600
export const revalidate = 60

export default function Page() {
return (
Expand Down
2 changes: 1 addition & 1 deletion dashboard/components/top-profiles.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Card, CardContent, CardHeader, CardTitle } from "./ui/card"
import { ScrollArea, ScrollBar } from "./ui/scroll-area"

export default async function TopProfiles() {
const topProfiles = await getTopProfiles("1M")
const topProfiles = await getTopProfiles("ALL")

return (
<Card>
Expand Down

0 comments on commit af57ae7

Please sign in to comment.