Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-26703: Refactor the tap #113

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_chargebee'],
install_requires=[
'tap-framework==0.1.1'
'singer-python==6.0.0',
'backoff==2.2.1',
'requests==2.31.0'
],
entry_points='''
[console_scripts]
Expand Down
152 changes: 105 additions & 47 deletions tap_chargebee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,122 @@
import singer
import tap_framework

import tap_chargebee.client
import tap_chargebee.streams
import sys
import json
from singer import metadata, Catalog
from tap_chargebee.client import ChargebeeClient
import tap_chargebee.streams as streams

LOGGER = singer.get_logger()


class ChargebeeRunner(tap_framework.Runner):
pass
def stream_is_selected(mdata):
"""
Check if the stream is selected
"""
return mdata.get((), {}).get("selected", False)


def get_available_streams(config: dict, cb_client: ChargebeeClient):
"""
Prepare the available streams based on the product catalog version
"""
site_name = config.get("site")
LOGGER.info("Site Name %s", site_name)
configuration_url = f"https://{site_name}.chargebee.com/api/v2/configurations"

try:
# Make a request to the configurations API
response = cb_client.make_request(url=configuration_url, method="GET")
site_configurations = response["configurations"]
except Exception as e:
LOGGER.error("Failed to fetch configurations: %s", e)
raise e

# Fetch the product catalog version
product_catalog_version = next(
iter(
config["product_catalog_version"]
for config in site_configurations
if config["domain"].lower() == site_name.lower()
),
None,
)

if product_catalog_version == "v2":
available_streams = streams.ITEM_MODEL_AVAILABLE_STREAMS
config["item_model"] = True
LOGGER.info("Found product catalog version v2")
elif product_catalog_version == "v1":
available_streams = streams.PLAN_MODEL_AVAILABLE_STREAMS
config["item_model"] = False
LOGGER.info("Found product catalog version v1")
else:
LOGGER.error(
"Incorrect Product Catalog version {}".format(product_catalog_version)
)
raise RuntimeError("Incorrect Product Catalog version")

return available_streams


def do_discover(config: dict, state: dict, available_streams: list):
"""
Generate the catalog
"""
LOGGER.info("Starting discovery.")
catalog = []

# Generate catalog for each stream based on the product catalog version
for available_stream in available_streams:
stream = available_stream(config, state, None, None)
catalog += stream.generate_catalog()

json.dump({"streams": catalog}, sys.stdout, indent=4)
LOGGER.info("Finished discover mode")


def do_sync(config: dict, catalog: Catalog, state: dict, client: ChargebeeClient):
"""
Sync data from Chargebee.
"""

last_stream = singer.get_currently_syncing(state)
LOGGER.info("last/currently syncing stream: %s", last_stream)

# Resume sync from the last stream
for catalog_entry in catalog.get_selected_streams(state):
mdata = metadata.to_map(catalog_entry.metadata)
replication_key = metadata.get(mdata, (), "replication-key")
key_properties = metadata.get(mdata, (), "table-key-properties")
stream_name = catalog_entry.tap_stream_id

singer.set_currently_syncing(state, stream_name)
singer.write_state(state)
singer.write_schema(
stream_name, catalog_entry.schema.to_dict(), key_properties, replication_key
)

LOGGER.info("%s: Starting sync", stream_name)
instance = streams.STREAMS[stream_name](config, state, catalog_entry, client)
counter_value = instance.sync()
singer.set_currently_syncing(state, None)
singer.write_state(state)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter_value)


@singer.utils.handle_top_exception(LOGGER)
def main():
args = singer.utils.parse_args(
required_config_keys=['api_key', 'start_date', 'site'])

client = tap_chargebee.client.ChargebeeClient(args.config)

runner = ChargebeeRunner(
args, client, get_available_streams(args, client)
required_config_keys=["api_key", "start_date", "site"]
)

try:
# Verify start date format
singer.utils.strptime(args.config.get("start_date"))
except ValueError:
raise ValueError("start_date must be in 'YYYY-mm-ddTHH:MM:SSZ' format") from None
client = ChargebeeClient(args.config)
available_streams = get_available_streams(args.config, client)

if args.discover:
runner.do_discover()
else:
runner.do_sync()
do_discover(args.config, args.state, available_streams)
elif args.catalog:
do_sync(args.config, args.catalog, args.state, client)


if __name__ == '__main__':
if __name__ == "__main__":
main()


def get_available_streams(self, cb_client):
site_name = self.config.get('site')
LOGGER.info("Site Name {}".format(site_name))
configuration_url = 'https://{}.chargebee.com/api/v2/configurations'.format(site_name)
response = cb_client.make_request(
url=configuration_url,
method='GET')
site_configurations = response['configurations']
LOGGER.info("Configurations API response {}".format(response))
product_catalog_version = next(iter(config['product_catalog_version'] for config in site_configurations if
config['domain'].lower() == site_name.lower()),
None)
if product_catalog_version == 'v2':
available_streams = tap_chargebee.streams.ITEM_MODEL_AVAILABLE_STREAMS
self.config['item_model'] = True
LOGGER.info('Item Model')
elif product_catalog_version == 'v1':
available_streams = tap_chargebee.streams.PLAN_MODEL_AVAILABLE_STREAMS
self.config['item_model'] = False
LOGGER.info('Plan Model')
else:
LOGGER.error("Incorrect Product Catalog version {}".format(product_catalog_version))
raise RuntimeError("Incorrect Product Catalog version")
return available_streams
Loading