Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stream_json #2697

Merged
merged 2 commits into from
Dec 13, 2024
Merged

fix: stream_json #2697

merged 2 commits into from
Dec 13, 2024

Conversation

Datayama38
Copy link
Collaborator

@Datayama38 Datayama38 commented Nov 22, 2024

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced data loading processes with improved SQL query handling and type safety.
    • Introduced a new JsonOptions interface for better configuration of JSON streaming.
  • Bug Fixes

    • Improved error handling in data transformation and loading methods to ensure robust transaction management.
  • Refactor

    • Streamlined import statements and consolidated code for better readability and maintainability.
    • Updated type definitions to enhance type safety across various functions.
  • Chores

    • Removed unused exports and updated dependency versions for better performance and stability.
    • Enhanced SQL commands to improve robustness in database migrations.

Copy link
Contributor

coderabbitai bot commented Nov 22, 2024

Walkthrough

The changes in this pull request involve multiple updates across various files, primarily focusing on modifying import and export statements, enhancing type safety in SQL queries, and refining data transformation methods. Notable adjustments include commenting out certain exports, updating type imports for various modules, and introducing new interfaces. The streamJson function underwent a significant rewrite to change its data handling approach. Overall, the modifications aim to streamline dependencies, improve code clarity, and ensure better handling of data processing.

Changes

File Path Change Summary
api/src/deps.ts - Commented out exports for StreamJsonOptions, jsonStreamArrayUrl, and jsonPickUrl.
- Updated type imports for pg and pg-cursor to newer versions.
- Added new imports for booleanPointInPolygon and distance.
- Retained exports for S3Client, PgClient, and Mail types.
api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts - Streamlined import statements.
- Updated SQL query in load method to cast $1 to JSON and simplified values array handling.
api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts - Streamlined import statements.
- Updated SQL query in load method to cast input to JSON and simplified values array handling.
api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts - Streamlined import statements.
- Updated SQL query in load method to cast input to JSON and simplified values handling.
api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts - Streamlined import statements.
- Updated SQL query in load method to cast input to JSON and simplified values handling.
api/src/etl/datasets/ign/common/IgnDataset.ts - Updated transform method to handle multiple simplifications and refined load method SQL queries to cast input to JSON.
api/src/etl/helpers/data/streamCsv.ts - Updated type definition for header in columns function to a tuple type for enhanced type safety.
api/src/etl/helpers/data/streamJson.ts - Significant rewrite of streamJson function to read entire file content into memory and added error handling and filtering capabilities.
api/src/etl/interfaces/StreamDataOptions.ts - Removed import of StreamJsonOptions and defined a new JsonOptions interface locally, updating StreamDataOptions to include it.
api/src/db/migrations/20241122000000-alter-fraudcheck_label.sql - Modified SQL command to include IF EXISTS and CASCADE in the DROP COLUMN statement for the geo_code column.

Possibly related PRs

  • fix: arr in geo2024 #2678: The changes in the IgnDataset class involve updates to SQL queries and handling of fields, which may relate to the overall data handling and type safety improvements seen in the main PR's updates to type imports and SQL query handling.

🐰 In the code, we hop and play,
With imports and types, we clear the way.
SQL queries now cast with care,
Data flows smoothly, everywhere!
So let’s celebrate this tidy feat,
Our code is now a tasty treat! 🥕✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (14)
api/src/etl/interfaces/StreamDataOptions.ts (2)

8-10: Consider enhancing the JsonOptions interface definition

The interface could benefit from the following improvements:

  1. Add JSDoc comments to document the purpose and expected format of the filter
  2. Consider making the filter property optional to maintain backward compatibility
  3. Consider using a more specific type than string (e.g., a union type of allowed filters)
+/**
+ * Options for JSON stream processing
+ * @property filter - Specifies the filter criteria for JSON data
+ */
 export interface JsonOptions {
-  filter: string;
+  filter?: string; // Consider using a more specific type
 }

Making filter required in JsonOptions is safe

The implementation in streamJson.ts already handles both cases where filter is present or not through the line:

const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;

Making filter required in the interface would be inconsistent with the actual implementation which treats it as optional. The review comment's concern about breaking changes is valid, but in the opposite direction - the interface should keep filter as optional to match the implementation.

Locations needing attention:

  • api/src/etl/interfaces/StreamDataOptions.ts: The filter property in JsonOptions interface should be marked as optional with ?
🔗 Analysis chain

Line range hint 1-15: Verify the impact of JsonOptions changes

Since this introduces a required filter property, we should verify all existing usages of JsonOptions are updated accordingly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all usages of streamJson and JsonOptions
echo "Searching for streamJson usage patterns..."
rg -l "streamJson|JsonOptions" --type typescript

# Look for potential interface implementations
echo "Checking for JsonOptions interface implementations..."
ast-grep --pattern 'const $VAR: JsonOptions = { $$$' --type typescript

# Look for direct object literals passed to streamJson
echo "Checking for direct object literals passed to streamJson..."
ast-grep --pattern 'streamJson($$$, { $$$' --type typescript

Length of output: 922


Script:

#!/bin/bash
# Search for all usages of streamJson and JsonOptions
echo "Searching for streamJson usage patterns..."
rg "streamJson|JsonOptions"

# Look for potential interface implementations
echo "Checking for JsonOptions interface implementations..."
ast-grep --pattern 'const $VAR: JsonOptions = { $$$'

# Look for direct object literals passed to streamJson
echo "Checking for direct object literals passed to streamJson..."
ast-grep --pattern 'streamJson($$$, { $$$'

Length of output: 1166


Script:

#!/bin/bash
# Check the implementation of streamJson to understand how filter is used
echo "Checking streamJson implementation..."
cat api/src/etl/helpers/data/streamJson.ts

# Check streamData.ts for any direct usage
echo "Checking streamData implementation..."
cat api/src/etl/helpers/data/streamData.ts

Length of output: 2232

api/src/etl/helpers/data/streamCsv.ts (1)

Line range hint 4-20: Add error handling and resource cleanup

While the streaming implementation is efficient, it lacks error handling for file operations, parsing, and pipe events. This could lead to unhandled errors and resource leaks.

Consider implementing these improvements:

 export async function* streamCsv<T>(
   filepath: string,
   sheetOptions: CsvOptions,
   chunkSize = 100,
 ): AsyncIterable<T[]> {
-  const fsStream = createReadStream(filepath, { encoding: "utf-8" });
-  const parser = parse({
-    columns: (header: string[]) => header.map((k) => k.toLowerCase()),
-    ...sheetOptions,
-  });
-  fsStream.pipe(parser);
+  const fsStream = createReadStream(filepath, { encoding: "utf-8" })
+    .on('error', (error) => {
+      throw new Error(`Failed to read CSV file: ${error.message}`);
+    });
+
+  const parser = parse({
+    columns: (header: string[]) => header.map((k) => k.toLowerCase()),
+    ...sheetOptions,
+  });
+
+  fsStream.pipe(parser)
+    .on('error', (error) => {
+      throw new Error(`Failed to parse CSV: ${error.message}`);
+    });
+
   let chunk: T[] = [];
-  for await (const line of parser) {
-    if (chunk.length === chunkSize) {
-      yield chunk;
-      chunk = [];
+  try {
+    for await (const line of parser) {
+      if (chunk.length === chunkSize) {
+        yield chunk;
+        chunk = [];
+      }
+      chunk.push(line);
     }
-    chunk.push(line);
+    if (chunk.length > 0) {
+      yield chunk;
+    }
+  } finally {
+    fsStream.destroy();
   }
-  yield chunk;
-  return;
 }
api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts (3)

Line range hint 40-48: Consider adding JSON validation and error handling

The current implementation might be vulnerable to malformed GeoJSON data. Consider adding validation before insertion.

 WITH tmp as(
   SELECT * FROM
   json_to_recordset($1::json)
   as tmp(type varchar, properties json,geometry json)
 )
 SELECT
+  CASE
+    WHEN properties IS NULL OR geometry IS NULL THEN
+      NULL
+    ELSE
   (properties->>'ISO3_CODE')::varchar as codeiso3,
   ST_SetSRID(st_multi(st_geomfromgeojson(geometry)),4326) as geom
+  END
 FROM tmp
+WHERE properties IS NOT NULL
+  AND geometry IS NOT NULL
 ON CONFLICT DO NOTHING

Line range hint 31-52: Consider implementing batch processing for large datasets

The current implementation processes records one at a time, which might not be optimal for large datasets. Consider implementing batch processing to improve performance.

 let done = false;
+let batch = [];
+const BATCH_SIZE = 1000;
 do {
   const results = await cursor.next();
   done = !!results.done;
   if (results.value) {
-    const query = {
-      text: `
-        INSERT INTO ${this.tableWithSchema} ...
-      `,
-      values: [JSON.stringify(results.value)],
-    };
-    await connection.query(query);
+    batch.push(results.value);
+    if (batch.length >= BATCH_SIZE || done) {
+      const query = {
+        text: `
+          INSERT INTO ${this.tableWithSchema} ...
+        `,
+        values: [JSON.stringify(batch)],
+      };
+      await connection.query(query);
+      batch = [];
+    }
   }
 } while (!done);

Line range hint 22-62: Consider adding logging for ETL process monitoring

The load method would benefit from logging to track progress and help with debugging.

 override async load(): Promise<void> {
+  console.log(`Starting load for ${this.constructor.name}`);
   const connection = await this.connection.connect();
   await connection.query("BEGIN TRANSACTION");
   try {
     for (const filepath of this.filepaths) {
+      console.log(`Processing file: ${filepath}`);
       const cursor = streamData(filepath, this.fileType, this.sheetOptions);
       // ... existing code ...
+      console.log(`Completed processing file: ${filepath}`);
     }
     await connection.query("COMMIT");
     connection.release();
+    console.log(`Successfully completed load for ${this.constructor.name}`);
   } catch (e) {
+    console.error(`Error during load: ${e.message}`);
     await connection.query("ROLLBACK");
     connection.release();
     throw e;
   }
 }
api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts (1)

Line range hint 40-49: Consider implementing bulk inserts for better performance.

The current implementation processes records one at a time. For large datasets, this could be inefficient. Consider implementing batch processing to improve performance.

Here's a suggested approach:

 let done = false;
+const batchSize = 1000;
+let batch = [];
 do {
   const results = await cursor.next();
   done = !!results.done;
   if (results.value) {
-    const query = {
-      text: `
-        INSERT INTO ${this.tableWithSchema} (
-            ${[...this.rows.keys()].join(", \n")}
-        )
-        WITH tmp as(
-          SELECT * FROM
-          json_to_recordset($1::json)
-          as tmp(type varchar, properties json,geometry json)
-        )
-        SELECT
-          (properties->>'ISO3_CODE')::varchar as codeiso3,
-          ST_SetSRID(st_multi(st_geomfromgeojson(geometry)),4326) as geom
-        FROM tmp
-        ON CONFLICT DO NOTHING
-      `,
-      values: [JSON.stringify(results.value)],
-    };
-    await connection.query(query);
+    batch.push(results.value);
+    if (batch.length >= batchSize) {
+      await processBatch(batch);
+      batch = [];
+    }
   }
 } while (!done);
+if (batch.length > 0) {
+  await processBatch(batch);
+}

+async function processBatch(records: any[]) {
+  const query = {
+    text: `
+      INSERT INTO ${this.tableWithSchema} (
+          ${[...this.rows.keys()].join(", \n")}
+      )
+      WITH tmp as(
+        SELECT * FROM
+        json_to_recordset($1::json)
+        as tmp(type varchar, properties json,geometry json)
+      )
+      SELECT
+        (properties->>'ISO3_CODE')::varchar as codeiso3,
+        ST_SetSRID(st_multi(st_geomfromgeojson(geometry)),4326) as geom
+      FROM tmp
+      ON CONFLICT DO NOTHING
+    `,
+    values: [JSON.stringify(records)],
+  };
+  await connection.query(query);
+}
api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts (3)

10-10: Consider moving the URL to a configuration file.

The hardcoded URL could be problematic for maintenance. Consider moving it to a configuration file to:

  • Centralize URL management
  • Enable environment-specific configurations
  • Facilitate URL updates without code changes

Line range hint 40-48: Consider adding batch processing for better performance.

The current implementation processes records one at a time. For large datasets, this could be inefficient. Consider implementing batch processing to reduce database round-trips.

Example approach:

const batchSize = 1000;
let batch = [];
// ... in your loop
batch.push(results.value);
if (batch.length >= batchSize) {
  await processBatch(batch);
  batch = [];
}
// ... after loop
if (batch.length > 0) {
  await processBatch(batch);
}

47-47: Add logging for skipped records.

The ON CONFLICT DO NOTHING clause silently ignores duplicates. Consider adding logging to track skipped records for better observability.

-ON CONFLICT DO NOTHING
+ON CONFLICT DO UPDATE
+SET updated_at = NOW()
+RETURNING id, 'skipped' as status
api/src/etl/datasets/ign/common/IgnDataset.ts (3)

Line range hint 116-124: Consider adding JSON validation error handling

While the explicit ::json casting improves type safety, consider adding error handling for malformed JSON data to prevent silent failures.

 json_to_recordset($1::json)
 as t(type varchar, properties json,geometry json)
+WHERE json_typeof($1) = 'array'

Line range hint 138-143: Consider adding indexes for performance optimization

The WHERE clause conditions on arr and com columns could benefit from indexes to improve update performance, especially for large datasets.

Consider adding the following indexes:

CREATE INDEX IF NOT EXISTS idx_ign_arr ON your_table(arr) WHERE arr IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_ign_com ON your_table(com) WHERE arr IS NULL;

Line range hint 154-159: Consider refactoring to reduce SQL query duplication

The SQL queries for 'geom_simple' and 'centroid' cases share identical structure and differ only in the geometry transformation. Consider extracting the common parts into a reusable function.

private async updateGeometry(
  connection: PoolClient,
  values: any[],
  key: string,
  geometryTransformation: string
): Promise<void> {
  const arrField = this.rows.get("arr") || [];
  const comField = this.rows.get("com") || [];
  
  await connection.query({
    text: `
      UPDATE ${this.tableWithSchema}
      SET ${key} = ${geometryTransformation}
      FROM (
        SELECT * FROM json_to_recordset($1::json)
        as t(type varchar, properties json,geometry json)
      ) AS tt
      WHERE (arr is not null AND arr = (tt.properties->>'${arrField[0]}')::${arrField[1]})
      OR (arr is null AND com = (tt.properties->>'${comField[0]}')::${comField[1]})
    `,
    values,
  });
}
api/src/etl/helpers/data/streamJson.ts (1)

1-39: Standardize comments and error messages to English for consistency

The comments and error messages in the code are currently in French. For consistency and to facilitate collaboration among team members who may not speak French, it's recommended to use English throughout the codebase.

Here's the diff to update the comments and error messages:

-import { JsonOptions } from "../../interfaces/index.ts";

-export async function* streamJson<T>(
-  filepath: string,
-  sheetOptions: JsonOptions,
-  chunkSize = 100,
-): AsyncIterable<T[]> {
-  // Lire tout le fichier comme une seule chaîne
+  // Read the entire file as a single string
   const file = await Deno.readTextFile(filepath);
   try {
-    // Parse tout le fichier
+    // Parse the entire file
     const parsed = JSON.parse(file);
-    // Si un filtre est défini, extraire le sous-ensemble correspondant
+    // If a filter is defined, extract the corresponding subset
     const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
-    // Vérifiez si c'est un tableau JSON
+    // Check if it's a JSON array
     if (Array.isArray(data)) {
       let chunk: T[] = [];
       for (const line of data) {
         chunk.push(line);
         if (chunk.length === chunkSize) {
           yield chunk;
           chunk = [];
         }
       }
-      // Renvoyer le dernier chunk s'il reste des éléments
+      // Return the last chunk if there are remaining elements
       if (chunk.length > 0) {
         yield chunk;
       }
     } else {
-      console.error("Le fichier JSON n'est pas un tableau !");
+      console.error("The JSON file is not an array!");
-      throw new Error("Le fichier JSON doit être un tableau.");
+      throw new Error("The JSON file must be an array.");
     }
     return;
   } catch (error) {
-    console.error("Erreur lors du parsing du fichier JSON :", error);
+    console.error("Error parsing the JSON file:", error);
     throw error;
   }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 5e4aa33 and 16173c5.

⛔ Files ignored due to path filters (1)
  • api/deno.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • api/src/deps.ts (0 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts (3 hunks)
  • api/src/etl/datasets/ign/common/IgnDataset.ts (4 hunks)
  • api/src/etl/helpers/data/streamCsv.ts (1 hunks)
  • api/src/etl/helpers/data/streamJson.ts (1 hunks)
  • api/src/etl/interfaces/StreamDataOptions.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • api/src/deps.ts
🔇 Additional comments (11)
api/src/etl/interfaces/StreamDataOptions.ts (1)

Line range hint 12-15: LGTM: StreamDataOptions type union is well-defined

The type union appropriately includes all option types and maintains flexibility with the undefined option.

api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts (3)

3-3: LGTM! Import statement consolidation improves readability.


10-10: LGTM! URL formatting is consistent with the codebase style.


Line range hint 40-49: Type safety improvement looks good, but consider performance monitoring.

The explicit ::json cast and simplified JSON handling improve type safety and code clarity. However, for large datasets, monitor the memory usage and performance impact of stringifying the entire results.value.

Let's check for potential large datasets in the codebase:

api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts (2)

40-41: Good addition of explicit type casting

The ::json type casting in json_to_recordset improves type safety at the database level.


10-10: Verify the Eurostat GeoJSON endpoint accessibility and format

The URL points to a specific version (v2) and year (2024) of the country boundaries dataset. Let's ensure it's accessible and returns the expected GeoJSON format.

✅ Verification successful

URL endpoint is accessible and returns valid GeoJSON format

The Eurostat GeoJSON endpoint is:

  • Returning HTTP 200 status code
  • Serving content-type: application/geo+json
  • CORS-enabled with appropriate headers
  • Last modified on Oct 8, 2024
  • Properly configured with cache control (ETag)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the Eurostat GeoJSON endpoint
# Expected: 200 OK response and valid GeoJSON content type

curl -I "https://gisco-services.ec.europa.eu/distribution/v2/countries/geojson/CNTR_RG_01M_2024_4326.geojson"

Length of output: 946

api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts (2)

3-3: LGTM! Clean import consolidation and URL formatting.

The changes improve code organization while maintaining secure HTTPS access to the official EC data source.

Also applies to: 10-10


Line range hint 40-46: LGTM! Enhanced type safety with explicit JSON casting.

The addition of ::json type casting improves type safety during JSON processing, preventing potential runtime errors.

api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts (1)

49-49: Verify the impact of direct JSON stringification.

The change to use JSON.stringify(results.value) directly assumes the structure of results.value matches the expected format.

✅ Verification successful

Direct JSON stringification is safe in this context

The JSON stringification of results.value is a standard pattern used across the ETL datasets in this codebase. The implementation in AbstractDataset shows that:

  1. The data is being processed in batches through a cursor
  2. The stringified data is used with json_to_recordset in a prepared SQL statement
  3. The schema is strictly defined through the rows property
  4. There's proper error handling and transaction management
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check the structure of the GeoJSON data to ensure it matches the expected format
curl -s "https://gisco-services.ec.europa.eu/distribution/v2/countries/geojson/CNTR_RG_60M_2024_4326.geojson" | \
jq -r 'select(.features != null) | .features[0] | {type, properties: .properties | {ISO3_CODE}, geometry: .geometry.type}'

Length of output: 329


Script:

#!/bin/bash
# Let's check how results.value is being processed in the codebase
ast-grep --pattern 'results.value'

# Also check the type definition or interface for the results object
fd "\.ts$" --exec rg -l "interface.*Result|type.*Result" {}

Length of output: 15331


Script:

#!/bin/bash
# Let's check the context around the JSON.stringify usage in the Eurostat dataset files
rg -B 5 -A 5 "JSON\.stringify\(results\.value\)" api/src/etl/datasets/eurostat/countries/2024/

# Also check the parent class implementation
cat api/src/etl/common/AbstractDataset.ts

Length of output: 8264

api/src/etl/datasets/ign/common/IgnDataset.ts (2)

104-104: LGTM: Proper JSON serialization

The explicit JSON stringification ensures type safety when passing data to SQL queries.


Line range hint 104-159: Verify JSON streaming implementation

Let's verify the JSON streaming functionality and error handling across the codebase.

✅ Verification successful

JSON streaming and type casting implementation is consistent

The verification shows that:

  • All instances of json_to_recordset consistently use explicit ::json type casting across IGN and Eurostat datasets
  • The streaming implementation follows a consistent pattern using streamData helper with proper cursor handling and error checks:
    • Initialization with streamData(filepath, fileType, sheetOptions)
    • Iteration using cursor.next() with done flag
    • Value checking before processing
    • Proper error boundaries in processing blocks
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other instances of json_to_recordset to ensure consistent type casting
echo "Checking json_to_recordset usage patterns:"
rg "json_to_recordset\(" --type ts

# Look for error handling patterns in streaming operations
echo "Checking error handling in streaming operations:"
rg "streamData|cursor\.next\(\)" --type ts -A 5

Length of output: 16342

@@ -8,7 +8,7 @@ export async function* streamCsv<T>(
): AsyncIterable<T[]> {
const fsStream = createReadStream(filepath, { encoding: "utf-8" });
const parser = parse({
columns: (header: any) => Object.keys(header).map((k) => k.toLowerCase()),
columns: (header: [x: string]) => Object.keys(header).map((k) => k.toLowerCase()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

The type definition is incorrect and should be fixed

The current type [x: string] is incorrect as it's importing the Options type from csv-parse@^5.5 which expects the header parameter to be of type string[]. The tuple type is too restrictive and doesn't match the library's type definition. Here's the fix:

-    columns: (header: [x: string]) => Object.keys(header).map((k) => k.toLowerCase()),
+    columns: (header: string[]) => header.map((k) => k.toLowerCase()),

Additionally:

  • Using Object.keys() on an array is unnecessary as we can directly map over the header array
  • The csv-parse library's type definition confirms that header parameter should be string[]
🔗 Analysis chain

Fix incorrect type definition for CSV header parameter

The type [x: string] represents a tuple with a single string element, which is too restrictive for CSV headers that typically contain multiple columns. This could cause type errors when processing CSV files with multiple columns.

Apply this fix to correctly type the header parameter:

-    columns: (header: [x: string]) => Object.keys(header).map((k) => k.toLowerCase()),
+    columns: (header: string[]) => header.map((k) => k.toLowerCase()),

Let's verify the CSV structure in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find CSV files and check their structure to ensure they have multiple columns

# Find all CSV files in the repository
echo "Searching for CSV files..."
fd -e csv --exec head -n 1 {} \; 2>/dev/null

# Look for CSV processing patterns to understand expected column structure
echo "Checking CSV processing patterns..."
rg -A 5 "createReadStream.*\.csv" 

Length of output: 5635


Script:

#!/bin/bash
# Let's analyze the CSV streaming implementation and its usage

# Find the streamCsv implementation and its type definitions
echo "Checking streamCsv implementation and types:"
rg -A 10 "streamCsv" 

# Look for imports and usage of streamCsv
echo -e "\nChecking streamCsv usage:"
rg -l "streamCsv"

# Check for CSV parsing patterns and header handling
echo -e "\nChecking CSV parsing patterns:"
ast-grep --pattern 'columns: (header: $_) => $_'

Length of output: 2688


Script:

#!/bin/bash
# Let's check the CSV parser library usage and type definitions

# Check for CSV parser imports and configuration
echo "Checking CSV parser imports and usage:"
rg -A 5 "from 'csv-parse'" 
rg -A 5 "from \"csv-parse\""

# Look for CsvOptions type definition
echo -e "\nChecking CsvOptions type definition:"
rg -A 10 "type CsvOptions"
rg -A 10 "interface CsvOptions"

# Look for actual CSV parsing in action
echo -e "\nChecking CSV parsing implementations:"
rg -A 10 "parse\({" 

Length of output: 495


Script:

#!/bin/bash
# Let's check the CSV parser library usage with fixed patterns

# Check for CSV parser imports and configuration
echo "Checking CSV parser imports and usage:"
rg -A 5 "import.*csv-parse"

# Look for CsvOptions type definition
echo -e "\nChecking CsvOptions type definition:"
rg -A 10 "CsvOptions"

# Look for actual CSV parsing in action (fixed regex)
echo -e "\nChecking CSV parsing implementations:"
rg -A 10 "parse\(" 

# Check the actual header parameter type from csv-parse
echo -e "\nChecking csv-parse types:"
fd -e d.ts -x cat {} \; 2>/dev/null | rg -A 5 "columns.*header"

Length of output: 26876


Script:

#!/bin/bash
# Let's check the csv-parse package imports and type definitions

# Find the csv-parse import statement
echo "Checking csv-parse import:"
rg "from.*csv-parse.*" -A 2

# Check where CsvOptions is imported from deps.ts
echo -e "\nChecking CsvOptions type definition source:"
rg "export.*CsvOptions.*from.*" 

# Look for the actual parse function import
echo -e "\nChecking parse function import:"
rg "import.*\{.*parse.*\}.*from.*csv-parse.*"

Length of output: 716

Comment on lines +14 to +16
const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
// Vérifiez si c'est un tableau JSON
if (Array.isArray(data)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for missing filter keys in the JSON data

When sheetOptions.filter is provided, parsed[sheetOptions.filter] may result in undefined if the key does not exist in the parsed JSON object. This would cause Array.isArray(data) to return false, and the code would throw an error stating that the JSON must be an array, which might be misleading. Consider adding a check to verify that the filter key exists and provide a clearer error message if it doesn't.

Here's the diff to include the additional check:

 const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;

+if (data === undefined) {
+  console.error(`The key '${sheetOptions.filter}' was not found in the JSON data.`);
+  throw new Error(`The key '${sheetOptions.filter}' does not exist in the JSON data.`);
+}

 // Check if it's a JSON array
 if (Array.isArray(data)) {

This way, if the specified filter key is missing, the error message will accurately reflect the issue, aiding in quicker debugging and resolution.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
// Vérifiez si c'est un tableau JSON
if (Array.isArray(data)) {
const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
if (data === undefined) {
console.error(`The key '${sheetOptions.filter}' was not found in the JSON data.`);
throw new Error(`The key '${sheetOptions.filter}' does not exist in the JSON data.`);
}
// Vérifiez si c'est un tableau JSON
if (Array.isArray(data)) {

Comment on lines +9 to +14
const file = await Deno.readTextFile(filepath);
try {
// Parse tout le fichier
const parsed = JSON.parse(file);
// Si un filtre est défini, extraire le sous-ensemble correspondant
const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid loading the entire JSON file into memory; consider streaming for large files

Reading the entire JSON file into memory using Deno.readTextFile and JSON.parse can lead to high memory consumption and potential crashes when handling large files. To improve performance and scalability, consider using a streaming JSON parser that processes the file incrementally without loading it entirely into memory.

Here's a refactored version using a streaming approach:

-import { JsonOptions } from "../../interfaces/index.ts";

-export async function* streamJson<T>(
-  filepath: string,
-  sheetOptions: JsonOptions,
-  chunkSize = 100,
-): AsyncIterable<T[]> {
-  // Lire tout le fichier comme une seule chaîne
-  const file = await Deno.readTextFile(filepath);
-  try {
-    // Parse tout le fichier
-    const parsed = JSON.parse(file);
-    // Si un filtre est défini, extraire le sous-ensemble correspondant
-    const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
-    // Vérifiez si c'est un tableau JSON
-    if (Array.isArray(data)) {
-      let chunk: T[] = [];
-      for (const line of data) {
-        chunk.push(line);
-        if (chunk.length === chunkSize) {
-          yield chunk;
-          chunk = [];
-        }
-      }
-      // Renvoyer le dernier chunk s'il reste des éléments
-      if (chunk.length > 0) {
-        yield chunk;
-      }
-    } else {
-      console.error("Le fichier JSON n'est pas un tableau !");
-      throw new Error("Le fichier JSON doit être un tableau.");
-    }
-    return;
-  } catch (error) {
-    console.error("Erreur lors du parsing du fichier JSON :", error);
-    throw error;
-  }
-}

+import { JsonOptions } from "../../interfaces/index.ts";
+import { TextLineStream } from "https://deno.land/std/streams/mod.ts";
+import { readLines } from "https://deno.land/std/io/mod.ts";

+export async function* streamJson<T>(
+  filepath: string,
+  sheetOptions: JsonOptions,
+  chunkSize = 100,
+): AsyncIterable<T[]> {
+  try {
+    const file = await Deno.open(filepath);
+    const decoder = new TextDecoderStream();
+    const lines = file.readable
+      .pipeThrough(decoder)
+      .pipeThrough(new TextLineStream());
+
+    let buffer = "";
+    let chunk: T[] = [];
+
+    for await (const line of lines) {
+      buffer += line;
+      try {
+        const parsedLine = JSON.parse(buffer);
+        buffer = "";
+
+        const data = sheetOptions.filter ? parsedLine[sheetOptions.filter] : parsedLine;
+
+        if (Array.isArray(data)) {
+          for (const item of data) {
+            chunk.push(item);
+            if (chunk.length === chunkSize) {
+              yield chunk;
+              chunk = [];
+            }
+          }
+        } else {
+          console.error("The JSON data is not an array.");
+          throw new Error("The JSON data must be an array.");
+        }
+      } catch (e) {
+        // Incomplete JSON, continue accumulating
+        buffer += "\n";
+      }
+    }
+
+    if (chunk.length > 0) {
+      yield chunk;
+    }
+
+    file.close();
+  } catch (error) {
+    console.error("Error parsing the JSON file:", error);
+    throw error;
+  }
+}

Note: The above code uses TextLineStream and readLines to read the file line by line, parsing each complete JSON object individually. This approach reduces memory usage by not loading the entire file into memory. Depending on the JSON structure, you may need to adjust the parsing logic or use a streaming JSON parser library compatible with Deno.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const file = await Deno.readTextFile(filepath);
try {
// Parse tout le fichier
const parsed = JSON.parse(file);
// Si un filtre est défini, extraire le sous-ensemble correspondant
const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
import { JsonOptions } from "../../interfaces/index.ts";
import { TextLineStream } from "https://deno.land/std/streams/mod.ts";
import { readLines } from "https://deno.land/std/io/mod.ts";
export async function* streamJson<T>(
filepath: string,
sheetOptions: JsonOptions,
chunkSize = 100,
): AsyncIterable<T[]> {
try {
const file = await Deno.open(filepath);
const decoder = new TextDecoderStream();
const lines = file.readable
.pipeThrough(decoder)
.pipeThrough(new TextLineStream());
let buffer = "";
let chunk: T[] = [];
for await (const line of lines) {
buffer += line;
try {
const parsedLine = JSON.parse(buffer);
buffer = "";
const data = sheetOptions.filter ? parsedLine[sheetOptions.filter] : parsedLine;
if (Array.isArray(data)) {
for (const item of data) {
chunk.push(item);
if (chunk.length === chunkSize) {
yield chunk;
chunk = [];
}
}
} else {
console.error("The JSON data is not an array.");
throw new Error("The JSON data must be an array.");
}
} catch (e) {
// Incomplete JSON, continue accumulating
buffer += "\n";
}
}
if (chunk.length > 0) {
yield chunk;
}
file.close();
} catch (error) {
console.error("Error parsing the JSON file:", error);
throw error;
}
}

Copy link
Member

@jonathanfallon jonathanfallon Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je pense que c'est le point le plus important qu'il faut refacto avant de pouvoir merger d'autant plus que c'est une fonctionnalité utilisée à plusieurs endroits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonathanfallon, je suis ravi que vous partagiez cette préoccupation. Si je peux vous aider lors du refactoring, n'hésitez pas à me solliciter.

(_/)
(•_•)
/ 💖

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oui, je suis d'accord. j'ai patché vite fait pour pouvoir le faire tourner sur les campagnes d'incitation mais il faut creuser du côté du json-nl pour améliorer tout ça car la suggestion de coderabbit ne marche pas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
api/src/etl/helpers/data/streamJson.ts (4)

8-13: Translate French comments to English for better maintainability.

Current comments are in French. For international collaboration and maintainability, comments should be in English:

-  // Lire tout le fichier comme une seule chaîne
+  // Read the entire file as a string
-  // Parse tout le fichier
+  // Parse the entire file
-  // Si un filtre est défini, extraire le sous-ensemble correspondant
+  // If a filter is defined, extract the corresponding subset

31-32: Translate error messages to English.

Error messages should be in English for consistency:

-      console.error("Le fichier JSON n'est pas un tableau !");
-      throw new Error("Le fichier JSON doit être un tableau.");
+      console.error("The JSON file is not an array!");
+      throw new Error("The JSON file must be an array.");

17-29: Consider using array methods for more efficient chunking.

The current chunking implementation could be simplified using array methods.

-      let chunk: T[] = [];
-      for (const line of data) {
-        chunk.push(line);
-        if (chunk.length === chunkSize) {
-          yield chunk;
-          chunk = [];
-        }
-      }
-      // Renvoyer le dernier chunk s'il reste des éléments
-      if (chunk.length > 0) {
-        yield chunk;
-      }
+      for (let i = 0; i < data.length; i += chunkSize) {
+        yield data.slice(i, i + chunkSize);
+      }

36-36: Translate error message to English.

Error message should be in English:

-    console.error("Erreur lors du parsing du fichier JSON :", error);
+    console.error("Error parsing JSON file:", error);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 034b19b and f130c6f.

⛔ Files ignored due to path filters (1)
  • api/deno.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • api/src/deps.ts (0 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts (3 hunks)
  • api/src/etl/datasets/ign/common/IgnDataset.ts (4 hunks)
  • api/src/etl/helpers/data/streamCsv.ts (1 hunks)
  • api/src/etl/helpers/data/streamJson.ts (1 hunks)
  • api/src/etl/interfaces/StreamDataOptions.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • api/src/deps.ts
🚧 Files skipped from review as they are similar to previous changes (7)
  • api/src/etl/interfaces/StreamDataOptions.ts
  • api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts
  • api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts
  • api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts
  • api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts
  • api/src/etl/datasets/ign/common/IgnDataset.ts
  • api/src/etl/helpers/data/streamCsv.ts
🔇 Additional comments (4)
api/src/etl/helpers/data/streamJson.ts (4)

1-7: LGTM! Well-typed function signature with async generator.

The function signature is well-designed with proper TypeScript generics and async generator syntax.


8-14: ⚠️ Potential issue

Switch to streaming approach for better memory efficiency.

Reading the entire file into memory using Deno.readTextFile can lead to out-of-memory errors with large files.


14-14: ⚠️ Potential issue

Add error handling for missing filter keys.

When using sheetOptions.filter, the code should verify that the key exists in the parsed data.


1-39: Verify the impact of in-memory processing on the system.

This implementation reads entire files into memory, which is a significant change from the previous streaming approach. Please verify:

  1. Typical file sizes being processed
  2. Available system memory in production
  3. Concurrent processing requirements
✅ Verification successful

Based on the results, I can now generate a final response as we have sufficient information about the usage patterns and context.

Memory-based JSON processing appears acceptable for the current use case

The implementation change from streaming to in-memory processing is justified based on the following findings:

  1. The JSON files in the codebase are relatively small:

    • Most JSON files are under 2KB
    • The largest JSON files are package-lock.json files (~850KB max), which aren't processed by this utility
  2. Usage analysis shows streamJson is only used within the ETL pipeline:

    • Only imported in streamData.ts as part of the ETL helpers
    • Used as one of several data processing options (alongside CSV and XLSX)
  3. The chunking mechanism (100 items per chunk) still provides controlled memory usage for array processing after the initial file load.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for file size patterns and usage in the codebase

# Find all JSON files and their sizes
echo "JSON file sizes:"
fd -e json -x du -h {}

# Find calls to streamJson to understand usage patterns
echo -e "\nUsage of streamJson:"
rg -A 2 "streamJson" --type typescript

Length of output: 2243


Script:

#!/bin/bash
# Let's try a different approach to find streamJson usage

# Search for streamJson usage with ripgrep without type filtering
echo "Usage of streamJson:"
rg "streamJson" -A 3

# Look for potential file processing patterns
echo -e "\nFile processing patterns:"
rg "Deno.readTextFile|readFile" -A 2

# Search for test files that might indicate expected file sizes
echo -e "\nTest files containing JSON processing:"
rg "\.json" "test|spec" -A 2

Length of output: 5852

fix: deactivate datasets in etl

fix: deactivate datasets in etl

fix type
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
api/src/etl/helpers/data/streamJson.ts (1)

17-29: Consider using array methods for more concise chunking

The chunking logic could be simplified and made more robust.

Here's a more concise implementation:

-      let chunk: T[] = [];
-      for (const line of data) {
-        chunk.push(line);
-        if (chunk.length === chunkSize) {
-          yield chunk;
-          chunk = [];
-        }
-      }
-      if (chunk.length > 0) {
-        yield chunk;
-      }
+      for (let i = 0; i < data.length; i += chunkSize) {
+        yield data.slice(i, i + chunkSize) as T[];
+      }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f130c6f and b7b3552.

⛔ Files ignored due to path filters (1)
  • api/deno.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • api/src/deps.ts (0 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts (3 hunks)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts (3 hunks)
  • api/src/etl/datasets/ign/common/IgnDataset.ts (4 hunks)
  • api/src/etl/helpers/data/streamCsv.ts (1 hunks)
  • api/src/etl/helpers/data/streamJson.ts (1 hunks)
  • api/src/etl/interfaces/StreamDataOptions.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • api/src/deps.ts
🚧 Files skipped from review as they are similar to previous changes (7)
  • api/src/etl/datasets/eurostat/countries/2024/EurostatSimplifiedCountries2024.ts
  • api/src/etl/interfaces/StreamDataOptions.ts
  • api/src/etl/helpers/data/streamCsv.ts
  • api/src/etl/datasets/eurostat/countries/2024/EurostatCountries2024.ts
  • api/src/etl/datasets/eurostat/countries/2020/EurostatCountries2020.ts
  • api/src/etl/datasets/eurostat/countries/2020/EurostatSimplifiedCountries2020.ts
  • api/src/etl/datasets/ign/common/IgnDataset.ts
🔇 Additional comments (3)
api/src/etl/helpers/data/streamJson.ts (3)

1-7: LGTM: Function signature and type definitions are well-structured.

The generic type parameter and AsyncIterable return type are appropriate for the streaming use case.


8-12: ⚠️ Potential issue

Critical: Replace in-memory parsing with streaming implementation

Loading the entire file into memory using Deno.readTextFile can cause out-of-memory errors with large files. This is a regression from the previous streaming implementation.

Consider using the ndjson (Newline Delimited JSON) format and implement streaming as follows:

-import { JsonOptions } from "../../interfaces/index.ts";
+import { JsonOptions } from "../../interfaces/index.ts";
+import { TextLineStream } from "https://deno.land/std/streams/mod.ts";

 export async function* streamJson<T>(
   filepath: string,
   sheetOptions: JsonOptions,
   chunkSize = 100,
 ): AsyncIterable<T[]> {
   try {
-    const file = await Deno.readTextFile(filepath);
-    const parsed = JSON.parse(file);
+    const file = await Deno.open(filepath);
+    const lines = file.readable
+      .pipeThrough(new TextDecoderStream())
+      .pipeThrough(new TextLineStream());
+    
+    let chunk: T[] = [];
+    for await (const line of lines) {
+      if (!line.trim()) continue;
+      
+      const parsed = JSON.parse(line);
+      const item = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
+      
+      if (item === undefined) {
+        console.error(`Filter key '${sheetOptions.filter}' not found in line: ${line}`);
+        continue;
+      }
+      
+      chunk.push(item);
+      if (chunk.length === chunkSize) {
+        yield chunk;
+        chunk = [];
+      }
+    }
+    
+    if (chunk.length > 0) {
+      yield chunk;
+    }
+    
+    file.close();

13-16: ⚠️ Potential issue

Add validation for missing filter key

When accessing parsed[sheetOptions.filter], there's no check if the filter key exists, which could lead to misleading error messages.

Add explicit validation:

-    const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;
+    if (sheetOptions.filter && !(sheetOptions.filter in parsed)) {
+      throw new Error(`Filter key '${sheetOptions.filter}' not found in JSON data`);
+    }
+    const data = sheetOptions.filter ? parsed[sheetOptions.filter] : parsed;

Comment on lines +30 to +38
} else {
console.error("Le fichier JSON n'est pas un tableau !");
throw new Error("Le fichier JSON doit être un tableau.");
}
return;
} catch (error) {
console.error("Erreur lors du parsing du fichier JSON :", error);
throw error;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize error messages and improve error handling

The error messages are inconsistent (mixing French and English) and the error handling could be improved.

Suggested improvements:

-      console.error("Le fichier JSON n'est pas un tableau !");
-      throw new Error("Le fichier JSON doit être un tableau.");
+      throw new Error("JSON data must be an array");
     }
     return;
   } catch (error) {
-    console.error("Erreur lors du parsing du fichier JSON :", error);
-    throw error;
+    throw error instanceof Error 
+      ? error 
+      : new Error("Failed to parse JSON file");
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else {
console.error("Le fichier JSON n'est pas un tableau !");
throw new Error("Le fichier JSON doit être un tableau.");
}
return;
} catch (error) {
console.error("Erreur lors du parsing du fichier JSON :", error);
throw error;
}
} else {
throw new Error("JSON data must be an array");
}
return;
} catch (error) {
throw error instanceof Error
? error
: new Error("Failed to parse JSON file");
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
api/src/db/migrations/20241122000000-alter-fraudcheck_label.sql (1)

1-1: Add rollback section to the migration.

For safety, consider adding a rollback section that would restore the column if needed.

Add this section to the migration:

 ALTER TABLE FRAUDCHECK.LABELS DROP COLUMN IF EXISTS geo_code CASCADE;
+
+-- Rollback SQL (add as a comment for documentation):
+-- ALTER TABLE FRAUDCHECK.LABELS ADD COLUMN geo_code TEXT;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b7b3552 and 15b2e09.

📒 Files selected for processing (1)
  • api/src/db/migrations/20241122000000-alter-fraudcheck_label.sql (1 hunks)
🔇 Additional comments (1)
api/src/db/migrations/20241122000000-alter-fraudcheck_label.sql (1)

1-1: Verify dependent objects before dropping the column.

The addition of IF EXISTS and CASCADE improves safety, but CASCADE will automatically drop all dependent objects. Please ensure this is intended.

Let's check for dependencies before proceeding:

✅ Verification successful

No dependencies found on FRAUDCHECK.LABELS.geo_code column

The search results show that while there are other tables using geo_code columns (in carpool-related schemas), there are no dependencies specifically on the FRAUDCHECK.LABELS.geo_code column. The only reference found is in the migration file that's dropping it. This indicates it's safe to proceed with the CASCADE operation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for dependencies on the geo_code column
# Look for views, foreign keys, or other code that might reference this column

# Search for potential view definitions
rg -l "geo_code.*FRAUDCHECK\.LABELS" --type sql

# Search for potential references in TypeScript/JavaScript files
rg -l "geo_code.*FRAUDCHECK\.LABELS" --type ts --type js

# Search for other migrations that might have created dependencies
fd -e sql | rg ".*migration.*\.sql$" | xargs rg "geo_code"

Length of output: 2153

@jonathanfallon jonathanfallon merged commit 99e4fca into main Dec 13, 2024
8 checks passed
@jonathanfallon jonathanfallon deleted the fix/etl_stream_json branch December 13, 2024 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants