diff --git a/bq-syncer/sync_parquet.py b/bq-syncer/sync_parquet.py index 6ec4376..4212265 100644 --- a/bq-syncer/sync_parquet.py +++ b/bq-syncer/sync_parquet.py @@ -71,6 +71,38 @@ def map_bq_type_to_python(bq_type): return type_mapping.get(bq_type, pl.Utf8) +def get_file_size(file_path): + """Returns the size of the file in bytes.""" + return os.path.getsize(file_path) + + +def merge_files(table_directory, table_id): + """Merge files in the table directory with each size less than 1GB.""" + parquet_files = glob.glob(os.path.join(table_directory, f"{table_id}_*.parquet")) + + # Filter out files larger than 1GB + small_files = [file for file in parquet_files if get_file_size(file) < 1e9] + + # If there is one or no file, no need to merge + if len(small_files) <= 1: + return + + df = pl.concat( + [pl.scan_parquet(file) for file in small_files] + ).collect() # Add collect() here + merged_file_name = os.path.join( + table_directory, + f"{table_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}_merged.parquet", + ) + df.write_parquet(merged_file_name) + + # Delete the old files + for file in small_files: + os.remove(file) + + print(f"[{datetime.now()}] Merged {len(small_files)} files into {merged_file_name}") + + def sync_table(table_item, index, total_tables): try: print( @@ -81,6 +113,9 @@ def sync_table(table_item, index, total_tables): table_directory = os.path.join(output_directory, table_id) os.makedirs(table_directory, exist_ok=True) # Ensure the directory exists + # Check if parquet files in the table directory need to be merged + merge_files(table_directory, table_id) + table_ref = dataset_ref.table(table_id) table = bqclient.get_table(table_ref) # get table object @@ -111,7 +146,7 @@ def sync_table(table_item, index, total_tables): else field for field, _ in fields ] - query = f"SELECT {', '.join(field_names_for_query)} FROM `{table_ref}` WHERE datastream_metadata.source_timestamp > {last_timestamp}" + query = f"SELECT {', '.join(field_names_for_query)} FROM `{table_ref}` WHERE datastream_metadata.source_timestamp > {last_timestamp} ORDER BY datastream_metadata.source_timestamp ASC" # Modify the query if --sample is set if args.sample: diff --git a/dashboard/app/apps/page.tsx b/dashboard/app/apps/page.tsx index 5bcef7b..58f9197 100644 --- a/dashboard/app/apps/page.tsx +++ b/dashboard/app/apps/page.tsx @@ -1,7 +1,7 @@ import AppsSummary from "@/components/apps-summary" import TopApps from "@/components/charts/top-apps" -export const revalidate = 3600 +export const revalidate = 60 export default function Page() { return ( diff --git a/dashboard/components/top-profiles.tsx b/dashboard/components/top-profiles.tsx index 18f7414..b074ac2 100644 --- a/dashboard/components/top-profiles.tsx +++ b/dashboard/components/top-profiles.tsx @@ -7,7 +7,7 @@ import { Card, CardContent, CardHeader, CardTitle } from "./ui/card" import { ScrollArea, ScrollBar } from "./ui/scroll-area" export default async function TopProfiles() { - const topProfiles = await getTopProfiles("1M") + const topProfiles = await getTopProfiles("ALL") return (