Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19835: Adding new streams and discovery mode #38

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_braintree'],
install_requires=[
'singer-python==5.5.0',
'singer-python==5.12.2',
'requests==2.20.0',
'braintree==3.53.0',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the latest version of braintree i.e. 4.16.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated version of the Braintree

],
Expand Down
257 changes: 46 additions & 211 deletions tap_braintree/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,230 +3,65 @@
from datetime import datetime, timedelta
import os
import pytz

import json
import sys

import braintree
import singer

from singer import utils
from .transform import transform_row


CONFIG = {}
STATE = {}
TRAILING_DAYS = timedelta(days=30)
DEFAULT_TIMESTAMP = "1970-01-01T00:00:00Z"

logger = singer.get_logger()


def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


def load_schema(entity):
return utils.load_json(get_abs_path("schemas/{}.json".format(entity)))


def get_start(entity):
if entity not in STATE:
STATE[entity] = CONFIG["start_date"]

return STATE[entity]


def to_utc(dt):
return dt.replace(tzinfo=pytz.UTC)


def daterange(start_date, end_date):
"""
Generator function that produces an iterable list of days between the two
dates start_date and end_date as a tuple pair of datetimes.

Note:
All times are set to 0:00. Designed to be used in date query where query
logic would be record_date >= 2019-01-01 0:00 and record_date < 2019-01-02 0:00

Args:
start_date (datetime): start of period
end_date (datetime): end of period

Yields:
tuple: daily period
* datetime: day within range
* datetime: day within range + 1 day

"""

# set to start of day
start_date = to_utc(
datetime.combine(
start_date.date(),
datetime.min.time() # set to the 0:00 on the day of the start date
)
)

end_date = to_utc(end_date + timedelta(1))

for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n), start_date + timedelta(n + 1)


def sync_transactions():
schema = load_schema("transactions")

singer.write_schema("transactions", schema, ["id"],
bookmark_properties=['created_at'])

latest_updated_at = utils.strptime_to_utc(STATE.get('latest_updated_at', DEFAULT_TIMESTAMP))

run_maximum_updated_at = latest_updated_at

latest_disbursement_date = utils.strptime_to_utc(STATE.get('latest_disbursment_date', DEFAULT_TIMESTAMP))

run_maximum_disbursement_date = latest_disbursement_date

latest_start_date = utils.strptime_to_utc(get_start("transactions"))

period_start = latest_start_date - TRAILING_DAYS

period_end = utils.now()

logger.info("transactions: Syncing from {}".format(period_start))

logger.info("transactions: latest_updated_at from {}, disbursement_date from {}".format(
latest_updated_at, latest_disbursement_date
))
from tap_braintree.discover import discover
from tap_braintree.sync import sync as _sync

logger.info("transactions: latest_start_date from {}".format(
latest_start_date
))
REQUIRED_CONFIG_KEYS = [
"merchant_id",
"public_key",
"private_key",
"start_date"
]

# increment through each day (20k results max from api)
for start, end in daterange(period_start, period_end):
LOGGER = singer.get_logger()

end = min(end, period_end)
def do_discover():
LOGGER.info("Starting discovery")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Starting discovery")
LOGGER.info("Starting discover mode")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logger messages

catalog = discover()
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info("Finished discover")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Finished discover")
LOGGER.info("Finished discover mode")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logger messages as per your suggestions


data = braintree.Transaction.search(
braintree.TransactionSearch.created_at.between(start, end))
time_extracted = utils.now()

logger.info("transactions: Fetched {} records from {} - {}".format(
data.maximum_size, start, end
))

row_written_count = 0
row_skipped_count = 0

for row in data:
# Ensure updated_at consistency
if not getattr(row, 'updated_at'):
row.updated_at = row.created_at

transformed = transform_row(row, schema)
updated_at = to_utc(row.updated_at)

# if disbursement is successful, get disbursement date
# set disbursement datetime to min if not found

if row.disbursement_details is None:
disbursement_date = datetime.min

else:
if row.disbursement_details.disbursement_date is None:
row.disbursement_details.disbursement_date = datetime.min

disbursement_date = to_utc(datetime.combine(
row.disbursement_details.disbursement_date,
datetime.min.time()))

# Is this more recent than our past stored value of update_at?
# Is this more recent than our past stored value of disbursement_date?
# Use >= for updated_at due to non monotonic updated_at values
# Use > for disbursement_date - confirming all transactions disbursed
# at the same time
# Update our high water mark for updated_at and disbursement_date
# in this run
if (
updated_at >= latest_updated_at
) or (
disbursement_date >= latest_disbursement_date
):

if updated_at > run_maximum_updated_at:
run_maximum_updated_at = updated_at

if disbursement_date > run_maximum_disbursement_date:
run_maximum_disbursement_date = disbursement_date

singer.write_record("transactions", transformed,
time_extracted=time_extracted)
row_written_count += 1

else:

row_skipped_count += 1

logger.info("transactions: Written {} records from {} - {}".format(
row_written_count, start, end
))

logger.info("transactions: Skipped {} records from {} - {}".format(
row_skipped_count, start, end
))

# End day loop
logger.info("transactions: Complete. Last updated record: {}".format(
run_maximum_updated_at
))

logger.info("transactions: Complete. Last disbursement date: {}".format(
run_maximum_disbursement_date
))

latest_updated_at = run_maximum_updated_at

latest_disbursement_date = run_maximum_disbursement_date

STATE['latest_updated_at'] = utils.strftime(latest_updated_at)

STATE['latest_disbursement_date'] = utils.strftime(
latest_disbursement_date)

utils.update_state(STATE, "transactions", utils.strftime(end))

singer.write_state(STATE)


def do_sync():
logger.info("Starting sync")
sync_transactions()
logger.info("Sync completed")


@utils.handle_top_exception(logger)
@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(
["merchant_id", "public_key", "private_key", "start_date"]
)
config = args.config

environment = getattr(
braintree.Environment, config.pop("environment", "Production")
parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
config = {}
state = {}

if parsed_args.config:
config = parsed_args.config

if parsed_args.state:
state = parsed_args.state

environment = getattr(braintree.Environment, config.pop("environment", "Production"))

gateway = braintree.BraintreeGateway(
braintree.Configuration(
environment,
merchant_id = config['merchant_id'],
public_key= config["public_key"],
private_key=config["private_key"]
)
)

CONFIG['start_date'] = config.pop('start_date')

braintree.Configuration.configure(environment, **config)

if args.state:
STATE.update(args.state)

try:
do_sync()
if parsed_args.discover:
do_discover()
else:
_sync(
gateway,
config,
parsed_args.catalog or discover(),
state
)
except braintree.exceptions.authentication_error.AuthenticationError:
logger.critical('Authentication error occured. '
LOGGER.critical('Authentication error occured. '
'Please check your merchant_id, public_key, and '
'private_key for errors', exc_info=True)

Expand Down
67 changes: 67 additions & 0 deletions tap_braintree/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from asyncio.log import logger

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import statement.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unused imports

from singer.catalog import Catalog, CatalogEntry, Schema
import os
import json
import singer
from singer import metadata
from tap_braintree.streams import STREAMS

LOGGER = singer.get_logger()

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

def get_schemas():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function doc string.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc strings for all the fuctions.

schemas = {}
field_metadata = {}

for stream_name, stream_metadata in STREAMS.items():

schema_path = get_abs_path("schemas/{}.json".format(stream_name))
with open(schema_path,"r") as file:
schema = json.load(file)
schemas[stream_name] = schema

mdata = metadata.new()

mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_metadata.key_properties,
valid_replication_keys=stream_metadata.replication_keys,
replication_method=stream_metadata.replication_method,
)

mdata = metadata.to_map(mdata)
# Loop through all keys and make replication keys of automatic inclusion
for field_name in schema["properties"].keys():

if stream_metadata.replication_keys and field_name in stream_metadata.replication_keys:
mdata = metadata.write(
mdata, ("properties", field_name), "inclusion", "automatic",
)

mdata = metadata.to_list(mdata)
field_metadata[stream_name] = mdata

return schemas, field_metadata

def discover():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function doc string for all the functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc strings for all the fuctions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Divide this file into two files schema.py and discover.py because you are doing full refactoring.
Reference: https://github.com/singer-io/tap-github/pull/168/files

Copy link
Author

@jtilala jtilala Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seperated discover.py's code into schema.py and discover.py

schemas, field_metadata = get_schemas()
catalog = Catalog([])

for stream_name, schema_dict in schemas.items():

schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]

catalog.streams.append(
CatalogEntry(
stream=stream_name,
tap_stream_id=stream_name,
key_properties=STREAMS[stream_name].key_properties,
schema=schema,
metadata=mdata,
)
)

return catalog
Loading