This repository has been archived by the owner on Nov 19, 2024. It is now read-only.
forked from singer-io/tap-exacttarget
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlist_subscribers.py
151 lines (119 loc) · 5.01 KB
/
list_subscribers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import FuelSDK
import singer
from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.endpoints.subscribers import SubscriberDataAccessObject
from tap_exacttarget.pagination import get_date_page, before_now, \
increment_date
from tap_exacttarget.schemas import ID_FIELD, CUSTOM_PROPERTY_LIST, \
CREATED_DATE_FIELD, OBJECT_ID_FIELD, MODIFIED_DATE_FIELD, \
SUBSCRIBER_KEY_FIELD, with_properties
from tap_exacttarget.state import incorporate, save_state, \
get_last_record_value_for_table
from tap_exacttarget.util import partition_all, sudsobj_to_dict
LOGGER = singer.get_logger()
def _get_subscriber_key(list_subscriber):
return list_subscriber.SubscriberKey
def _get_list_subscriber_filter(_list, start, unit):
return {
'LogicalOperator': 'AND',
'LeftOperand': {
'Property': 'ListID',
'SimpleOperator': 'equals',
'Value': _list.get('ID'),
},
'RightOperand': get_date_page('ModifiedDate', start, unit)
}
class ListSubscriberDataAccessObject(DataAccessObject):
SCHEMA = with_properties({
'ID': ID_FIELD,
'CreatedDate': CREATED_DATE_FIELD,
'ModifiedDate': MODIFIED_DATE_FIELD,
'ObjectID': OBJECT_ID_FIELD,
'PartnerProperties': CUSTOM_PROPERTY_LIST,
'ListID': {
'type': ['null', 'integer'],
'description': ('Defines identification for a list the '
'subscriber resides on.'),
},
'Status': {
'type': ['null', 'string'],
'description': ('Defines status of object. Status of '
'an address.'),
},
'SubscriberKey': SUBSCRIBER_KEY_FIELD,
})
TABLE = 'list_subscriber'
KEY_PROPERTIES = ['SubscriberKey', 'ListID']
def __init__(self, config, state, auth_stub, catalog):
super(ListSubscriberDataAccessObject, self).__init__(
config, state, auth_stub, catalog)
self.replicate_subscriber = False
self.subscriber_catalog = None
def _get_all_subscribers_list(self):
"""
Find the 'All Subscribers' list via the SOAP API, and return it.
"""
result = request('List', FuelSDK.ET_List, self.auth_stub, {
'Property': 'ListName',
'SimpleOperator': 'equals',
'Value': 'All Subscribers',
})
lists = list(result)
if len(lists) != 1:
msg = ('Found {} all subscriber lists, expected one!'
.format(len(lists)))
raise RuntimeError(msg)
return sudsobj_to_dict(lists[0])
def sync_data(self):
table = self.__class__.TABLE
subscriber_dao = SubscriberDataAccessObject(
self.config,
self.state,
self.auth_stub,
self.subscriber_catalog)
start = get_last_record_value_for_table(
self.state,
table,
self.config.get('start_date'),
self.config.get('offset_start_date', None),
self.is_full_table_mode()
)
if start is None:
start = self.config.get('start_date')
pagination_unit = self.config.get(
'pagination__list_subscriber_interval_unit', 'days')
pagination_quantity = self.config.get(
'pagination__list_subsctiber_interval_quantity', 1)
unit = {pagination_unit: int(pagination_quantity)}
end = increment_date(start, unit)
all_subscribers_list = self._get_all_subscribers_list()
while before_now(start):
stream = request('ListSubscriber',
FuelSDK.ET_List_Subscriber,
self.auth_stub,
_get_list_subscriber_filter(
all_subscribers_list,
start, unit))
batch_size = 100
if self.replicate_subscriber:
subscriber_dao.write_schema()
for list_subscribers_batch in partition_all(stream, batch_size):
for list_subscriber in list_subscribers_batch:
list_subscriber = self.filter_keys_and_parse(
list_subscriber)
if list_subscriber.get('ModifiedDate'):
self.state = incorporate(
self.state,
table,
'ModifiedDate',
list_subscriber.get('ModifiedDate'))
list_subscriber = self.remove_sensitive_data(list_subscriber)
singer.write_records(table, [list_subscriber])
if self.replicate_subscriber:
subscriber_keys = list(map(
_get_subscriber_key, list_subscribers_batch))
subscriber_dao.pull_subscribers_batch(subscriber_keys)
save_state(self.state)
start = end
end = increment_date(start, unit)