-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: export Open Beauty Facts data as parquet file (#15)
* refactor: create openfoodfacts_exports.parquet module * feat: add Open Beauty Facts Parquet export
- Loading branch information
1 parent
a2996b7
commit 095e0b1
Showing
8 changed files
with
1,010 additions
and
710 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
import logging | ||
import shutil | ||
import tempfile | ||
from pathlib import Path | ||
|
||
import pyarrow as pa | ||
import pyarrow.parquet as pq | ||
import tqdm | ||
from huggingface_hub import HfApi | ||
from more_itertools import chunked | ||
from openfoodfacts import Flavor | ||
from openfoodfacts.utils import jsonl_iter | ||
|
||
from openfoodfacts_exports import settings | ||
|
||
from .beauty import BEAUTY_DTYPE_MAP, BEAUTY_PRODUCT_SCHEMA, BeautyProduct | ||
from .common import Product | ||
from .food import FOOD_DTYPE_MAP, FOOD_PRODUCT_SCHEMA, FoodProduct | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
PARQUET_DATASET_PATH = { | ||
Flavor.off: settings.DATASET_DIR / "food.parquet", | ||
Flavor.obf: settings.DATASET_DIR / "beauty.parquet", | ||
} | ||
|
||
|
||
def export_parquet( | ||
dataset_path: Path, output_path: Path, flavor: Flavor, use_tqdm: bool = False | ||
) -> None: | ||
"""Convert a JSONL dataset to Parquet format and push it to Hugging Face | ||
Hub. | ||
Args: | ||
dataset_path (Path): The path to the JSONL dataset. | ||
output_path (Path): The path where the Parquet file will be saved. | ||
flavor (Flavor): The flavor of the dataset. | ||
use_tqdm (bool, optional): Whether to use tqdm to display a progress | ||
bar. Defaults to False. | ||
""" | ||
logger.info("Start JSONL export to Parquet.") | ||
|
||
pydantic_cls: type[Product] | ||
if flavor == Flavor.off: | ||
pydantic_cls = FoodProduct | ||
schema = FOOD_PRODUCT_SCHEMA | ||
dtype_map = FOOD_DTYPE_MAP | ||
elif flavor == Flavor.obf: | ||
pydantic_cls = BeautyProduct | ||
schema = BEAUTY_PRODUCT_SCHEMA | ||
dtype_map = BEAUTY_DTYPE_MAP | ||
else: | ||
raise ValueError(f"Unsupported flavor: {flavor}") | ||
|
||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
tmp_converted_parquet_path = Path(tmp_dir) / "converted_data.parquet" | ||
convert_jsonl_to_parquet( | ||
output_file_path=tmp_converted_parquet_path, | ||
dataset_path=dataset_path, | ||
pydantic_cls=pydantic_cls, | ||
schema=schema, | ||
dtype_map=dtype_map, | ||
use_tqdm=use_tqdm, | ||
) | ||
# Move dataset file to output_path | ||
shutil.move(tmp_converted_parquet_path, output_path) | ||
|
||
if settings.ENABLE_HF_PUSH: | ||
push_parquet_file_to_hf(data_path=output_path) | ||
else: | ||
logger.info("Hugging Face push is disabled.") | ||
logger.info("JSONL to Parquet conversion and postprocessing completed.") | ||
|
||
|
||
def convert_jsonl_to_parquet( | ||
output_file_path: Path, | ||
dataset_path: Path, | ||
pydantic_cls: type[Product], | ||
schema: pa.Schema, | ||
dtype_map: dict[str, pa.DataType] | None = None, | ||
batch_size: int = 1024, | ||
row_group_size: int = 122_880, # DuckDB default row group size, | ||
use_tqdm: bool = False, | ||
) -> None: | ||
"""Convert the Open Food Facts JSONL dataset to Parquet format. | ||
Args: | ||
output_file_path (Path): The path where the Parquet file will be saved. | ||
dataset_path (Path): The path to the Open Food Facts JSONL dataset. | ||
pydantic_cls: The Pydantic class used to validate the JSONL items. | ||
schema (pa.Schema): The schema of the Parquet file. | ||
dtype_map (dict[str, pa.DataType], optional): A mapping of field names | ||
to PyArrow data types. Defaults to None. | ||
batch_size (int, optional): The size of the batches used to convert the | ||
dataset. Defaults to 1024. | ||
row_group_size (int, optional): The size of the row groups in the | ||
Parquet file. Defaults to 122_880. | ||
use_tqdm (bool, optional): Whether to use tqdm to display a progress | ||
bar. Defaults to False. | ||
""" | ||
writer = None | ||
if dtype_map is None: | ||
dtype_map = {} | ||
item_iter = jsonl_iter(dataset_path) | ||
if use_tqdm: | ||
item_iter = tqdm.tqdm(item_iter, desc="JSONL") | ||
|
||
for batch in chunked(item_iter, batch_size): | ||
# We use by_alias=True because some fields start with a digit | ||
# (ex: nutriments.100g), and we cannot declare the schema with | ||
# Pydantic without an alias. | ||
products = [pydantic_cls(**item).model_dump(by_alias=True) for item in batch] | ||
keys = products[0].keys() | ||
data = { | ||
key: pa.array( | ||
[product[key] for product in products], | ||
# Don't let pyarrow guess type for complex types | ||
type=dtype_map.get(key, None), | ||
) | ||
for key in keys | ||
} | ||
record_batch = pa.record_batch(data, schema=schema) | ||
if writer is None: | ||
writer = pq.ParquetWriter(output_file_path, schema=record_batch.schema) | ||
writer.write_batch(record_batch, row_group_size=row_group_size) | ||
|
||
if writer is not None: | ||
writer.close() | ||
|
||
|
||
def push_parquet_file_to_hf( | ||
data_path: Path, | ||
repo_id: str = "openfoodfacts/product-database", | ||
revision: str = "main", | ||
commit_message: str = "Database updated", | ||
) -> None: | ||
"""Push a Parquet file to Hugging Face Hub. | ||
Args: | ||
data_path (Path): The path to the Parquet file to push. The name of the | ||
file will be used as the path in the repository. | ||
repo_id (str, optional): The repository ID on Hugging Face Hub. | ||
Defaults to "openfoodfacts/product-database". | ||
revision (str, optional): The revision to push the data to. Defaults to | ||
"main". | ||
commit_message (str, optional): The commit message. Defaults to | ||
"Database updated". | ||
""" | ||
logger.info("Start pushing data to Hugging Face at %s", repo_id) | ||
if not data_path.exists(): | ||
raise FileNotFoundError(f"Data is missing: {data_path}") | ||
if data_path.suffix != ".parquet": | ||
raise ValueError(f"A parquet file is expected. Got {data_path.suffix} instead.") | ||
# We use the HF_Hub api since it gives us way more flexibility than | ||
# push_to_hub() | ||
HfApi().upload_file( | ||
path_or_fileobj=data_path, | ||
repo_id=repo_id, | ||
revision=revision, | ||
repo_type="dataset", | ||
path_in_repo=data_path.name, | ||
commit_message=commit_message, | ||
) | ||
logger.info("Data succesfully pushed to Hugging Face at %s", repo_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import orjson | ||
import pyarrow as pa | ||
from pydantic import Field, field_serializer | ||
|
||
from .common import ( | ||
PA_IMAGES_DATATYPE, | ||
PA_LANGUAGE_FIELD_DATATYPE, | ||
PA_OWNER_FIELD_DATATYPE, | ||
PA_PACKAGING_FIELD_DATATYPE, | ||
Ingredient, | ||
LanguageField, | ||
Product, | ||
) | ||
|
||
|
||
class BeautyProduct(Product): | ||
additives_n: int | None = None | ||
additives_tags: list[str] | None = None | ||
allergens_tags: list[str] | None = None | ||
emb_codes_tags: list[str] | None = None | ||
emb_codes: str | None = None | ||
ingredients_analysis_tags: list[str] | None = None | ||
ingredients_from_palm_oil_n: int | None = None | ||
ingredients_n: int | None = None | ||
ingredients_original_tags: list[str] | None = None | ||
ingredients_percent_analysis: int | None = None | ||
ingredients_tags: list[str] | None = None | ||
ingredients_text: list[LanguageField] | None = None | ||
ingredients_with_specified_percent_n: int | None = None | ||
ingredients_with_unspecified_percent_n: int | None = None | ||
ingredients: list[Ingredient] | None = None | ||
known_ingredients_n: int | None = None | ||
minerals_tags: list[str] | None = None | ||
nucleotides_tags: list[str] | None = None | ||
nutrient_levels_tags: list[str] | None = None | ||
nutrition_data_per: str | None = None | ||
serving_quantity: str | None = Field(default=None, coerce_numbers_to_str=True) | ||
serving_size: str | None = None | ||
traces_tags: list[str] | None = None | ||
unknown_ingredients_n: int | None = None | ||
unknown_nutrients_tags: list[str] | None = None | ||
vitamins_tags: list[str] | None = None | ||
|
||
@classmethod | ||
def get_language_fields(cls) -> list[str]: | ||
return [ | ||
"ingredients_text", | ||
"product_name", | ||
"packaging_text", | ||
"generic_name", | ||
] | ||
|
||
@field_serializer("ingredients") | ||
def serialize_ingredients( | ||
self, ingredients: list[Ingredient] | None, _info | ||
) -> str | None: | ||
"""Ingredients can be nested, which seems difficult to implement as an | ||
Arrow struct. | ||
To alleviate this, we serialize the ingredients as a JSON string.""" | ||
if ingredients is None: | ||
return None | ||
return orjson.dumps([ing.model_dump() for ing in ingredients]).decode("utf-8") | ||
|
||
|
||
BEAUTY_PRODUCT_SCHEMA = pa.schema( | ||
[ | ||
pa.field("additives_n", pa.int32(), nullable=True), | ||
pa.field("additives_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("allergens_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("brands_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("brands", pa.string(), nullable=True), | ||
pa.field("categories", pa.string(), nullable=True), | ||
pa.field("categories_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("checkers_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("cities_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("code", pa.string()), | ||
pa.field("complete", pa.int32(), nullable=True), | ||
pa.field("completeness", pa.float32(), nullable=True), | ||
pa.field("correctors_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("countries_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("created_t", pa.int64(), nullable=True), | ||
pa.field("creator", pa.string(), nullable=True), | ||
pa.field("data_quality_errors_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("data_quality_info_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("data_quality_warnings_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("data_sources_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("editors", pa.list_(pa.string()), nullable=True), | ||
pa.field("emb_codes_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("emb_codes", pa.string(), nullable=True), | ||
pa.field("entry_dates_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("generic_name", PA_LANGUAGE_FIELD_DATATYPE, nullable=True), | ||
pa.field("images", PA_IMAGES_DATATYPE, nullable=True), | ||
pa.field("informers_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("ingredients_analysis_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("ingredients_from_palm_oil_n", pa.int32(), nullable=True), | ||
pa.field("ingredients_n", pa.int32(), nullable=True), | ||
pa.field("ingredients_original_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("ingredients_percent_analysis", pa.int32(), nullable=True), | ||
pa.field("ingredients_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("ingredients_text", PA_LANGUAGE_FIELD_DATATYPE, nullable=True), | ||
pa.field("ingredients_with_specified_percent_n", pa.int32(), nullable=True), | ||
pa.field("ingredients_with_unspecified_percent_n", pa.int32(), nullable=True), | ||
pa.field("ingredients", pa.string(), nullable=True), | ||
pa.field("known_ingredients_n", pa.int32(), nullable=True), | ||
pa.field("labels_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("labels", pa.string(), nullable=True), | ||
pa.field("lang", pa.string(), nullable=True), | ||
pa.field("languages_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("last_edit_dates_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("last_editor", pa.string(), nullable=True), | ||
pa.field("last_image_t", pa.int64(), nullable=True), | ||
pa.field("last_modified_by", pa.string(), nullable=True), | ||
pa.field("last_modified_t", pa.int64(), nullable=True), | ||
pa.field("last_updated_t", pa.int64(), nullable=True), | ||
pa.field("link", pa.string(), nullable=True), | ||
pa.field("main_countries_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("manufacturing_places_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("manufacturing_places", pa.string(), nullable=True), | ||
pa.field("max_imgid", pa.int32(), nullable=True), | ||
pa.field("minerals_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("misc_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("nucleotides_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("nutrient_levels_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("nutrition_data_per", pa.string(), nullable=True), | ||
pa.field("obsolete", pa.bool_()), | ||
pa.field("origins_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("origins", pa.string(), nullable=True), | ||
pa.field("owner_fields", PA_OWNER_FIELD_DATATYPE, nullable=True), | ||
pa.field("owner", pa.string(), nullable=True), | ||
pa.field("packagings_complete", pa.bool_(), nullable=True), | ||
pa.field("packaging_recycling_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("packaging_shapes_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("packaging_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("packaging_text", PA_LANGUAGE_FIELD_DATATYPE, nullable=True), | ||
pa.field("packaging", pa.string(), nullable=True), | ||
pa.field("packagings", PA_PACKAGING_FIELD_DATATYPE, nullable=True), | ||
pa.field("photographers", pa.list_(pa.string()), nullable=True), | ||
pa.field("popularity_key", pa.int64(), nullable=True), | ||
pa.field("popularity_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("product_name", PA_LANGUAGE_FIELD_DATATYPE, nullable=True), | ||
pa.field("product_quantity_unit", pa.string(), nullable=True), | ||
pa.field("product_quantity", pa.string(), nullable=True), | ||
pa.field("purchase_places_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("quantity", pa.string(), nullable=True), | ||
pa.field("rev", pa.int32(), nullable=True), | ||
pa.field("scans_n", pa.int32(), nullable=True), | ||
pa.field("serving_quantity", pa.string(), nullable=True), | ||
pa.field("serving_size", pa.string(), nullable=True), | ||
pa.field("states_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("stores_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("stores", pa.string(), nullable=True), | ||
pa.field("traces_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("unique_scans_n", pa.int32(), nullable=True), | ||
pa.field("unknown_ingredients_n", pa.int32(), nullable=True), | ||
pa.field("unknown_nutrients_tags", pa.list_(pa.string()), nullable=True), | ||
pa.field("vitamins_tags", pa.list_(pa.string()), nullable=True), | ||
] | ||
) | ||
|
||
|
||
BEAUTY_DTYPE_MAP = { | ||
"images": PA_IMAGES_DATATYPE, | ||
"packagings": PA_PACKAGING_FIELD_DATATYPE, | ||
} |
Oops, something went wrong.