From 44bd45f6cb3ff0966b4e7558285630cbe03caebc Mon Sep 17 00:00:00 2001 From: radazen Date: Thu, 25 Apr 2024 09:26:45 -0400 Subject: [PATCH] Clean more null characters and unicode escapes from strings and JSON (#1505) --- kafka-streamer/main.go | 58 +++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/kafka-streamer/main.go b/kafka-streamer/main.go index b338583b3..bb73d57e3 100644 --- a/kafka-streamer/main.go +++ b/kafka-streamer/main.go @@ -135,9 +135,8 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool) { // Creating multiple configs for each topic allows them to process separate partitions in parallel configs := []*streamerConfig{ newEthereumOwnerConfig(deserializer, queries), - newEthereumOwnerConfig(deserializer, queries), - //newEthereumTokenConfig(deserializer, queries), - //newEthereumTokenConfig(deserializer, queries), + newEthereumTokenConfig(deserializer, queries), + newEthereumTokenConfig(deserializer, queries), newBaseOwnerConfig(deserializer, queries), newZoraOwnerConfig(deserializer, queries), } @@ -691,18 +690,18 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ SimplehashNftID: &nft.Nft_id, ContractAddress: &contractAddress, TokenID: tokenID, - Name: removeNullChars(nft.Name), - Description: removeNullChars(nft.Description), + Name: cleanString(nft.Name), + Description: cleanString(nft.Description), Previews: previews, - ImageUrl: removeNullChars(nft.Image_url), - VideoUrl: removeNullChars(nft.Video_url), - AudioUrl: removeNullChars(nft.Audio_url), - ModelUrl: removeNullChars(nft.Model_url), - OtherUrl: removeNullChars(nft.Other_url), - BackgroundColor: removeNullChars(nft.Background_color), - ExternalUrl: removeNullChars(nft.External_url), + ImageUrl: cleanString(nft.Image_url), + VideoUrl: cleanString(nft.Video_url), + AudioUrl: cleanString(nft.Audio_url), + ModelUrl: cleanString(nft.Model_url), + OtherUrl: cleanString(nft.Other_url), + BackgroundColor: cleanString(nft.Background_color), + ExternalUrl: cleanString(nft.External_url), OnChainCreatedDate: onChainCreatedDate, - Status: removeNullChars(nft.Status), + Status: cleanString(nft.Status), TokenCount: tokenCount, OwnerCount: ownerCount, Contract: contract, @@ -710,7 +709,7 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ LastSale: lastSale, FirstCreated: firstCreated, Rarity: rarity, - ExtraMetadata: removeNullChars(nft.Extra_metadata), + ExtraMetadata: cleanString(nft.Extra_metadata), ImageProperties: imageProperties, VideoProperties: videoProperties, AudioProperties: audioProperties, @@ -769,35 +768,36 @@ func toJSONB[T any](data *T) (pgtype.JSONB, error) { return pgtype.JSONB{}, err } - var jsonb pgtype.JSONB - err = jsonb.Set(jsonData) - if err != nil { - return pgtype.JSONB{}, err - } + // Convert jsonData to a string + jsonStr := string(jsonData) - // Remove null bytes from the underlying string - jsonStr := string(jsonb.Bytes) + // Strip out any literal null bytes + jsonStr = strings.ReplaceAll(jsonStr, "\x00", "") - // Remove null characters - cleanedStr := strings.ReplaceAll(jsonStr, "\x00", "") + // Strip out any escaped null characters in JSON + cleanedStr := strings.ReplaceAll(jsonStr, "\\u0000", "") - // Convert the cleaned string back to bytes - jsonb.Bytes = []byte(cleanedStr) - - err = jsonb.Set(jsonb.Bytes) + var jsonb pgtype.JSONB + // Convert the cleaned string back to bytes and set it to jsonb + err = jsonb.Set([]byte(cleanedStr)) if err != nil { return pgtype.JSONB{}, err } - jsonb.Status = pgtype.Present // Explicitly mark the JSONB data as present + jsonb.Status = pgtype.Present return jsonb, nil } -func removeNullChars(s *string) *string { +func cleanString(s *string) *string { if s == nil { return nil } + // Remove null characters cleanedStr := strings.ReplaceAll(*s, "\x00", "") + + // Remove invalid UTF-8 sequences + cleanedStr = strings.ToValidUTF8(cleanedStr, "") + return &cleanedStr }