From 88cbd1821fbf9777de9e446188041440c0749ffa Mon Sep 17 00:00:00 2001 From: peichins Date: Mon, 2 Dec 2024 22:30:30 +1000 Subject: [PATCH] Transcode embeddings (#15) * added scripts to transcode from parquet to tfrecord files --- .../transcode_embeddings.py | 57 +++++++++++++++++++ tests/app_tests/test_transcode.py | 24 ++++++++ 2 files changed, 81 insertions(+) create mode 100644 src/transcode_embeddings/transcode_embeddings.py create mode 100644 tests/app_tests/test_transcode.py diff --git a/src/transcode_embeddings/transcode_embeddings.py b/src/transcode_embeddings/transcode_embeddings.py new file mode 100644 index 0000000..9fad634 --- /dev/null +++ b/src/transcode_embeddings/transcode_embeddings.py @@ -0,0 +1,57 @@ +# Takes embeddings from parquet files and writes them to TFRecord files + +import tensorflow as tf +import pandas as pd +import numpy as np + +from chirp.inference.tf_examples import EmbeddingsTFRecordMultiWriter, bytes_feature, int_feature, float_feature, serialize_tensor + +from src.data_frames import df_to_embeddings + +def get_parquet_file_list(parquet_folder): + """ + Recursively finds all parquet files in a folder + """ + return [f for f in parquet_folder.rglob('*.parquet')] + +def transcode_from_parquet(parquet_filepaths, output_path, num_files=10): + + print(f"transcoding {len(parquet_filepaths)} parquet files to {output_path}") + + + with EmbeddingsTFRecordMultiWriter(output_path, num_files=num_files) as writer: + for i, fp in enumerate(parquet_filepaths): + + #print a dot without a newline every 10th file and + # print i of total every 100 files + if i % 10 == 0: + if i % 100 == 0: + print(f"\n{i} of {len(parquet_filepaths)}") + else: + print('.', end='', flush=True) + + # read the parquet file with pandas + embeddings_table = df_to_embeddings(pd.read_parquet(fp)) + embeddings = np.array(embeddings_table[:,:,2:1282], dtype=np.float16) + #print(f"embeddings shape: {embeddings.shape}") + embeddings = tf.convert_to_tensor(embeddings, dtype=tf.float16) + #print(f"embeddings shape: {embeddings.shape}") + features = { + 'filename': bytes_feature(embeddings_table[0][0][0].encode()), + 'timestamp_s': float_feature(0.0), + 'embedding': bytes_feature(serialize_tensor(embeddings, tf.float16)), + 'embedding_shape': int_feature(tuple(embeddings.shape)) + } + ex = tf.train.Example(features=tf.train.Features(feature=features)) + writer.write(ex.SerializeToString()) + +# def filename_to_url(filename, domain): + +# # filename is made of 3 parts: datetime, site, and file number, followed by a file extension +# # the 3 parts are separated by underscores. The site name might also contain an underscore +# # the datetime is in the format YYYYMMDDTHHmmssZ, file number is an integer, and the file extension is .parquet +# # we need to contruct a url like this: https://[domain]/ + + + +# return f"https://storage.googleapis.com/urban-sound-classification/{filename}" \ No newline at end of file diff --git a/tests/app_tests/test_transcode.py b/tests/app_tests/test_transcode.py new file mode 100644 index 0000000..0e8b86c --- /dev/null +++ b/tests/app_tests/test_transcode.py @@ -0,0 +1,24 @@ +import os +from pathlib import Path +from src.transcode_embeddings.transcode_embeddings import transcode_from_parquet, get_parquet_file_list + + + +def test_transcode_from_parquet(): + # Define the input file path + input_folder = Path("./tests/files/embeddings") + + output_folder = Path("./tests/output/") + + parquet_files = get_parquet_file_list(input_folder) + + # Call the transcode_from_parquet function + transcode_from_parquet(parquet_files, output_folder, num_files=256) + + # Assert that the output files exist by checking that + # there are 256 files in the output folder with filenames embeddings-[date]-%[file_num]-of-00256 + # where date is a timestamp and file_num is a number between 0 and 255 with leading zeros + # by getting a list of files that match that pattern, and checking that the length of the list is 256 + output_files = [f for f in output_folder.rglob('embeddings-*-*-of-00256')] + assert len(output_files) == 256 +