From d1212431d593eb90ce15d0310e11506fcaf2a887 Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Sun, 24 Jul 2022 22:14:58 +0800 Subject: [PATCH 1/7] feat: enumeration --- cli/solanaetl/enumeration/__init__.py | 0 cli/solanaetl/enumeration/entity_type.py | 10 ++++++++++ 2 files changed, 10 insertions(+) create mode 100644 cli/solanaetl/enumeration/__init__.py create mode 100644 cli/solanaetl/enumeration/entity_type.py diff --git a/cli/solanaetl/enumeration/__init__.py b/cli/solanaetl/enumeration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/solanaetl/enumeration/entity_type.py b/cli/solanaetl/enumeration/entity_type.py new file mode 100644 index 0000000..5d569bb --- /dev/null +++ b/cli/solanaetl/enumeration/entity_type.py @@ -0,0 +1,10 @@ +class EntityType: + ACCOUNT = 'account' + BLOCK = 'block' + INSTRUCTION = 'instruction' + TOKEN_TRANSFER = 'token_transfer' + TOKEN = 'token' + TRANSACTION = 'transaction' + + + ALL_FOR_STREAMING = [ACCOUNT, BLOCK, INSTRUCTION, TOKEN_TRANSFER, TOKEN, TRANSACTION] From 5689a07b4af26e4da29f1110689705e464d01534 Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Sun, 24 Jul 2022 22:15:58 +0800 Subject: [PATCH 2/7] feat: get last block json rpc --- cli/solanaetl/json_rpc_requests.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cli/solanaetl/json_rpc_requests.py b/cli/solanaetl/json_rpc_requests.py index b7dfcf5..80c49f5 100644 --- a/cli/solanaetl/json_rpc_requests.py +++ b/cli/solanaetl/json_rpc_requests.py @@ -48,6 +48,12 @@ def generate_get_multiple_accounts_json_rpc(accounts, encoding='jsonParsed'): request_id=idx ) +def generate_get_latest_block_json_rpc(request_id=1): + return { + 'jsonrpc': '2.0', + 'method': "getBlockHeight", + 'id': request_id, + } def generate_json_rpc(method, params, request_id=1): return { From 413e8d64030949e179d91561c6900bf1a0944037 Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Sun, 24 Jul 2022 22:16:17 +0800 Subject: [PATCH 3/7] feat: streaming feature --- cli/solanaetl/streaming/__init__.py | 0 .../streaming/eth_item_id_calculator.py | 54 ++++++ .../streaming/item_exporter_creator.py | 91 ++++++++++ .../streaming/solana_streamer_adapter.py | 161 ++++++++++++++++++ 4 files changed, 306 insertions(+) create mode 100644 cli/solanaetl/streaming/__init__.py create mode 100644 cli/solanaetl/streaming/eth_item_id_calculator.py create mode 100644 cli/solanaetl/streaming/item_exporter_creator.py create mode 100644 cli/solanaetl/streaming/solana_streamer_adapter.py diff --git a/cli/solanaetl/streaming/__init__.py b/cli/solanaetl/streaming/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/solanaetl/streaming/eth_item_id_calculator.py b/cli/solanaetl/streaming/eth_item_id_calculator.py new file mode 100644 index 0000000..5e73809 --- /dev/null +++ b/cli/solanaetl/streaming/eth_item_id_calculator.py @@ -0,0 +1,54 @@ +# MIT License +# +# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import json +import logging + + +class SolanaItemIdCalculator: + + def calculate(self, item): + if item is None or not isinstance(item, dict): + return None + + item_type = item.get('type') + + if item_type == 'block' and item.get('hash') is not None: + return concat(item_type, item.get('hash')) + elif item_type == 'transaction' and item.get('block_hash') is not None: + return concat(item_type, item.get('block_hash')) + elif item_type == 'account' and item.get('tx_signature') is not None: + return concat(item_type, item.get('tx_signature')) + elif item_type == 'token_transfer' and item.get('tx_signature') is not None: + return concat(item_type, item.get('tx_signature')) + elif item_type == 'instruction' and item.get('tx_signature') is not None: + return concat(item_type, item.get('tx_signature')) + elif item_type == 'token' and item.get('tx_signature'): + return concat(item_type, item.get('tx_signature')) + + logging.warning('item_id for item {} is None'.format(json.dumps(item))) + + return None + + +def concat(*elements): + return '_'.join([str(elem) for elem in elements]) \ No newline at end of file diff --git a/cli/solanaetl/streaming/item_exporter_creator.py b/cli/solanaetl/streaming/item_exporter_creator.py new file mode 100644 index 0000000..12e6781 --- /dev/null +++ b/cli/solanaetl/streaming/item_exporter_creator.py @@ -0,0 +1,91 @@ +# MIT License +# +# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from blockchainetl_common.jobs.exporters.console_item_exporter import ConsoleItemExporter +from blockchainetl_common.jobs.exporters.multi_item_exporter import MultiItemExporter + + +def create_item_exporters(outputs): + split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] + + item_exporters = [create_item_exporter(output) for output in split_outputs] + return MultiItemExporter(item_exporters) + + +def create_item_exporter(output): + item_exporter_type = determine_item_exporter_type(output) + if item_exporter_type == ItemExporterType.PUBSUB: + from blockchainetl_common.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter + enable_message_ordering = 'sorted' in output or 'ordered' in output + item_exporter = GooglePubSubItemExporter( + item_type_to_topic_mapping={ + 'block': output + '.blocks', + 'transaction': output + '.transactions', + 'instruction': output + '.instructions', + 'token_transfer': output + '.token_transfers', + 'account': output + '.accounts', + 'token': output + '.tokens', + }, + message_attributes=('item_id'), + batch_max_bytes=1024 * 1024 * 5, + batch_max_latency=2, + batch_max_messages=1000, + enable_message_ordering=enable_message_ordering) + elif item_exporter_type == ItemExporterType.GCS: + from blockchainetl_common.jobs.exporters.gcs_item_exporter import GcsItemExporter + bucket, path = get_bucket_and_path_from_gcs_output(output) + item_exporter = GcsItemExporter(bucket=bucket, path=path) + elif item_exporter_type == ItemExporterType.CONSOLE: + item_exporter = ConsoleItemExporter() + else: + raise ValueError('Unable to determine item exporter type for output ' + output) + + return item_exporter + + +def get_bucket_and_path_from_gcs_output(output): + output = output.replace('gs://', '') + bucket_and_path = output.split('/', 1) + bucket = bucket_and_path[0] + if len(bucket_and_path) > 1: + path = bucket_and_path[1] + else: + path = '' + return bucket, path + + +def determine_item_exporter_type(output): + if output is not None and output.startswith('projects'): + return ItemExporterType.PUBSUB + elif output is not None and output.startswith('gs://'): + return ItemExporterType.GCS + elif output is None or output == 'console': + return ItemExporterType.CONSOLE + else: + return ItemExporterType.UNKNOWN + + +class ItemExporterType: + PUBSUB = 'pubsub' + GCS = 'gcs' + CONSOLE = 'console' + UNKNOWN = 'unknown' \ No newline at end of file diff --git a/cli/solanaetl/streaming/solana_streamer_adapter.py b/cli/solanaetl/streaming/solana_streamer_adapter.py new file mode 100644 index 0000000..0d3aaeb --- /dev/null +++ b/cli/solanaetl/streaming/solana_streamer_adapter.py @@ -0,0 +1,161 @@ +import logging + +from blockchainetl_common.jobs.exporters.console_item_exporter import ConsoleItemExporter +from blockchainetl_common.jobs.exporters.in_memory_item_exporter import InMemoryItemExporter +from solanaetl.enumeration.entity_type import EntityType +from solanaetl.jobs.export_blocks_job import ExportBlocksJob +from solanaetl.jobs.extract_accounts_job import ExtractAccountsJob +from solanaetl.jobs.extract_token_transfers_job import ExtractTokenTransfersJob +from solanaetl.jobs.extract_tokens_job import ExtractTokensJob + +from solanaetl.streaming.eth_item_id_calculator import SolanaItemIdCalculator +from solanaetl.json_rpc_requests import generate_get_latest_block_json_rpc +import json + +class SolanaStreamerAdapter: + def __init__( + self, + batch_web3_provider, + item_exporter=ConsoleItemExporter(), + batch_size=100, + max_workers=5, + entity_types=tuple(EntityType.ALL_FOR_STREAMING)): + self.batch_web3_provider = batch_web3_provider + self.item_exporter = item_exporter + self.batch_size = batch_size + self.max_workers = max_workers + self.entity_types = entity_types + self.item_id_calculator = SolanaItemIdCalculator() + + def open(self): + self.item_exporter.open() + + def get_current_block_number(self): + blocks_rpc = generate_get_latest_block_json_rpc() + response = self.batch_web3_provider.make_batch_request(json.dumps(blocks_rpc)) + return response['result'] + + def export_all(self, start_block, end_block): + # Export blocks, transactions and instructions + blocks, transactions, instructions = [], [], [] + if self._should_export(EntityType.BLOCK) or self._should_export(EntityType.TRANSACTION) or self._should_export(EntityType.INSTRUCTION): + blocks, transactions, instructions = self._export_blocks_transactions_instructions(start_block, end_block) + + # Export receipts and logs + accounts = [] + if self._should_export(EntityType.ACCOUNT): + accounts = self._export_accounts(instructions) + + # Extract token transfers + token_transfers = [] + if self._should_export(EntityType.TOKEN_TRANSFER): + token_transfers = self._extract_token_transfers(instructions) + + # Export tokens + tokens = [] + if self._should_export(EntityType.TOKEN): + tokens = self._extract_tokens(accounts) + + logging.info('Exporting with ' + type(self.item_exporter).__name__) + + all_items = \ + blocks + \ + transactions + \ + instructions + \ + token_transfers + \ + accounts + \ + tokens + + self.calculate_item_ids(all_items) + + self.item_exporter.export_items(all_items) + + def _export_blocks_transactions_instructions(self, start_block, end_block): + blocks_transactions_instructions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction', 'instruction']) + blocks_and_transactions_job = ExportBlocksJob( + start_block=start_block, + end_block=end_block, + batch_size=self.batch_size, + batch_web3_provider=self.batch_web3_provider, + max_workers=self.max_workers, + item_exporter=blocks_transactions_instructions_item_exporter, + export_blocks=self._should_export(EntityType.BLOCK), + export_transactions=self._should_export(EntityType.TRANSACTION), + export_instructions=self._should_export(EntityType.INSTRUCTION) + ) + blocks_and_transactions_job.run() + blocks = blocks_transactions_instructions_item_exporter.get_items('block') + transactions = blocks_transactions_instructions_item_exporter.get_items('transaction') + instructions = blocks_transactions_instructions_item_exporter.get_items('instruction') + + return blocks, transactions, instructions + + def _export_accounts(self, instructions): + exporter = InMemoryItemExporter(item_types=['account']) + job = ExtractAccountsJob( + instructions_iterable=instructions, + batch_size=self.batch_size, + batch_web3_provider=self.batch_web3_provider, + max_workers=self.max_workers, + item_exporter=exporter, + ) + job.run() + accounts = exporter.get_items('account') + return accounts + + def _extract_token_transfers(self, instructions): + exporter = InMemoryItemExporter(item_types=['token_transfer']) + job = ExtractTokenTransfersJob( + instructions_iterable=instructions, + batch_size=self.batch_size, + max_workers=self.max_workers, + item_exporter=exporter) + job.run() + token_transfers = exporter.get_items('token_transfer') + return token_transfers + + def _extract_tokens(self, accounts): + exporter = InMemoryItemExporter(item_types=['token']) + job = ExtractTokensJob( + accounts_iterable=accounts, + batch_size=self.batch_size, + batch_web3_provider= self.batch_web3_provider, + max_workers=self.max_workers, + item_exporter=exporter + ) + job.run() + tokens = exporter.get_items('token') + return tokens + + def _should_export(self, entity_type): + if entity_type == EntityType.BLOCK: + return True + + if entity_type == EntityType.TRANSACTION: + return EntityType.TRANSACTION in self.entity_types + + if entity_type == EntityType.INSTRUCTION: + return EntityType.INSTRUCTION in self.entity_types or self._should_export(EntityType.ACCOUNT) + + if entity_type == EntityType.ACCOUNT: + return EntityType.ACCOUNT in self.entity_types or self._should_export(EntityType.TOKEN) + + if entity_type == EntityType.TOKEN_TRANSFER: + return EntityType.TOKEN_TRANSFER in self.entity_types + + if entity_type == EntityType.TOKEN: + return EntityType.TOKEN in self.entity_types + + raise ValueError('Unexpected entity type ' + entity_type) + + def calculate_item_ids(self, items): + for item in items: + item['item_id'] = self.item_id_calculator.calculate(item) + + def close(self): + self.item_exporter.close() + +def sort_by(arr, fields): + if isinstance(fields, tuple): + fields = tuple(fields) + return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields)) \ No newline at end of file From c30f9c9f3b449bd2ff6c3855d05d42be56ac2ef5 Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Sun, 24 Jul 2022 22:16:29 +0800 Subject: [PATCH 4/7] feat: streaming command --- cli/setup.py | 1 + cli/solanaetl/cli/__init__.py | 5 ++ cli/solanaetl/cli/stream.py | 93 +++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 cli/solanaetl/cli/stream.py diff --git a/cli/setup.py b/cli/setup.py index 15346e8..6f3a727 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -36,6 +36,7 @@ def read(fname): python_requires=">=3.6,<3.10", install_requires=[ "base58", + "eth-hash==0.3.3", "blockchain-etl-common==1.6.1", "click==8.1.3", "web3==6.0.0b3", diff --git a/cli/solanaetl/cli/__init__.py b/cli/solanaetl/cli/__init__.py index e48a1fc..2ac3a39 100644 --- a/cli/solanaetl/cli/__init__.py +++ b/cli/solanaetl/cli/__init__.py @@ -23,6 +23,8 @@ from solanaetl.cli.extract_field import extract_field from solanaetl.cli.extract_token_transfers import extract_token_transfers from solanaetl.cli.extract_tokens import extract_tokens +from solanaetl.cli.stream import stream + @click.group() @@ -44,3 +46,6 @@ def cli(ctx): # utils cli.add_command(extract_field, "extract_field") + +# streaming +cli.add_command(stream, "stream") diff --git a/cli/solanaetl/cli/stream.py b/cli/solanaetl/cli/stream.py new file mode 100644 index 0000000..bb0cd50 --- /dev/null +++ b/cli/solanaetl/cli/stream.py @@ -0,0 +1,93 @@ +# MIT License +# +# Copyright (c) 2022 Tan Sek Fook, sekfook97@gmail.com +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging + +import click +from blockchainetl_common.streaming.streaming_utils import configure_signals, configure_logging +from solanaetl.enumeration.entity_type import EntityType + +from solanaetl.providers.auto import get_provider_from_uri +from solanaetl.streaming.item_exporter_creator import create_item_exporters +from solanaetl.thread_local_proxy import ThreadLocalProxy + + +@click.command(context_settings=dict(help_option_names=['-h', '--help'])) +@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', show_default=True, type=str, help='') +@click.option('--lag', default=0, show_default=True, type=int, help='The number of blocks to lag behind the network.') +@click.option('-p', '--provider-uri', default='https://api.mainnet-beta.solana.com', show_default=True, type=str, + help='The URI of the web3 provider e.g. ' + 'https://api.mainnet-beta.solana.com') +@click.option('-o', '--output', type=str, + help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; ' + 'or GCS bucket e.g. gs://your-bucket-name; ' + 'If not specified will print to console') +@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block') +@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_STREAMING), show_default=True, type=str, + help='The list of entity types to export.') +@click.option('--period-seconds', default=10, show_default=True, type=int, help='How many seconds to sleep between syncs') +@click.option('-b', '--batch-size', default=10, show_default=True, type=int, help='How many blocks to batch in single request') +@click.option('-B', '--block-batch-size', default=1, show_default=True, type=int, help='How many blocks to batch in single sync round') +@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') +@click.option('--log-file', default=None, show_default=True, type=str, help='Log file') +@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') +def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, + period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): + """Streams all data types to console or Google Pub/Sub.""" + configure_logging(log_file) + configure_signals() + entity_types = parse_entity_types(entity_types) + + from solanaetl.streaming.solana_streamer_adapter import SolanaStreamerAdapter + from blockchainetl_common.streaming.streamer import Streamer + + logging.info('Using ' + provider_uri) + + streamer_adapter = SolanaStreamerAdapter( + batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), + item_exporter=create_item_exporters(output), + batch_size=batch_size, + max_workers=max_workers, + entity_types=entity_types + ) + streamer = Streamer( + blockchain_streamer_adapter=streamer_adapter, + last_synced_block_file=last_synced_block_file, + lag=lag, + start_block=start_block, + period_seconds=period_seconds, + block_batch_size=block_batch_size, + pid_file=pid_file + ) + streamer.stream() + + +def parse_entity_types(entity_types): + entity_types = [c.strip() for c in entity_types.split(',')] + + # validate passed types + for entity_type in entity_types: + if entity_type not in EntityType.ALL_FOR_STREAMING: + raise click.BadOptionUsage( + '--entity-type', '{} is not an available entity type. Supply a comma separated list of types from {}' + .format(entity_type, ','.join(EntityType.ALL_FOR_STREAMING))) + + return entity_types From fa4ef8a20e1c877cefeef0a0b1c2e86bf8aeff61 Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Sun, 24 Jul 2022 22:20:33 +0800 Subject: [PATCH 5/7] chores: add newline at end of file --- cli/setup.py | 1 - cli/solanaetl/streaming/eth_item_id_calculator.py | 2 +- cli/solanaetl/streaming/item_exporter_creator.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cli/setup.py b/cli/setup.py index 6f3a727..15346e8 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -36,7 +36,6 @@ def read(fname): python_requires=">=3.6,<3.10", install_requires=[ "base58", - "eth-hash==0.3.3", "blockchain-etl-common==1.6.1", "click==8.1.3", "web3==6.0.0b3", diff --git a/cli/solanaetl/streaming/eth_item_id_calculator.py b/cli/solanaetl/streaming/eth_item_id_calculator.py index 5e73809..79d39e3 100644 --- a/cli/solanaetl/streaming/eth_item_id_calculator.py +++ b/cli/solanaetl/streaming/eth_item_id_calculator.py @@ -51,4 +51,4 @@ def calculate(self, item): def concat(*elements): - return '_'.join([str(elem) for elem in elements]) \ No newline at end of file + return '_'.join([str(elem) for elem in elements]) diff --git a/cli/solanaetl/streaming/item_exporter_creator.py b/cli/solanaetl/streaming/item_exporter_creator.py index 12e6781..d3b8178 100644 --- a/cli/solanaetl/streaming/item_exporter_creator.py +++ b/cli/solanaetl/streaming/item_exporter_creator.py @@ -88,4 +88,4 @@ class ItemExporterType: PUBSUB = 'pubsub' GCS = 'gcs' CONSOLE = 'console' - UNKNOWN = 'unknown' \ No newline at end of file + UNKNOWN = 'unknown' From 932e47a382ef668426f29b356ae755752b57e33e Mon Sep 17 00:00:00 2001 From: sfsf9797 Date: Fri, 7 Oct 2022 22:32:08 +0800 Subject: [PATCH 6/7] feat: ignore skipped block --- cli/solanaetl/jobs/export_blocks_job.py | 2 +- cli/solanaetl/jobs/export_instructions_job.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/solanaetl/jobs/export_blocks_job.py b/cli/solanaetl/jobs/export_blocks_job.py index b615199..9005765 100644 --- a/cli/solanaetl/jobs/export_blocks_job.py +++ b/cli/solanaetl/jobs/export_blocks_job.py @@ -89,7 +89,7 @@ def _export_batch(self, block_number_batch: List[int]): json.dumps(blocks_rpc)) results = rpc_response_batch_to_results(response) blocks = [self.block_mapper.from_json_dict( - result) for result in results] + result) for result in results if result is not None] for block in blocks: self._export_block(block) diff --git a/cli/solanaetl/jobs/export_instructions_job.py b/cli/solanaetl/jobs/export_instructions_job.py index e9c495c..e765737 100644 --- a/cli/solanaetl/jobs/export_instructions_job.py +++ b/cli/solanaetl/jobs/export_instructions_job.py @@ -60,7 +60,7 @@ def _export_instructions(self, transaction_addresses): json.dumps(transactions_rpc)) results = rpc_response_batch_to_results(response) transactions = [self.transaction_mapper.from_json_dict( - result) for result in results] + result) for result in results if result is not None] for transaction in transactions: self._export_instructions_in_transaction(transaction) From 53909aa325a295903a3752cad582dff71ed40a03 Mon Sep 17 00:00:00 2001 From: Sek Fook Date: Fri, 7 Oct 2022 22:32:52 +0800 Subject: [PATCH 7/7] chore: add newline Co-authored-by: Jensen Yap --- cli/solanaetl/streaming/solana_streamer_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/solanaetl/streaming/solana_streamer_adapter.py b/cli/solanaetl/streaming/solana_streamer_adapter.py index 0d3aaeb..48249ef 100644 --- a/cli/solanaetl/streaming/solana_streamer_adapter.py +++ b/cli/solanaetl/streaming/solana_streamer_adapter.py @@ -158,4 +158,4 @@ def close(self): def sort_by(arr, fields): if isinstance(fields, tuple): fields = tuple(fields) - return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields)) \ No newline at end of file + return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))