Skip to content

Commit

Permalink
Merge pull request #244 from datamade/clear-bill-subjects
Browse files Browse the repository at this point in the history
Clear bill subjects when importing data
  • Loading branch information
hancush authored Apr 30, 2019
2 parents cbeee21 + 4deb632 commit 5387ae8
Showing 1 changed file with 43 additions and 145 deletions.
188 changes: 43 additions & 145 deletions councilmatic_core/management/commands/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,20 @@ def bills_etl(self,
self.insert_raw_action_related_entity(delete=delete)
self.insert_raw_sponsorships(delete=delete)
self.insert_raw_billdocuments(delete=delete)
self.insert_raw_subjects(delete=delete)
self.insert_raw_relatedbills(delete=delete)

self.update_existing_action_related_entity()
self.update_existing_sponsorships()
self.update_existing_billdocuments()
self.update_existing_subjects()
self.update_existing_relatedbills()

self.add_new_action_related_entity()
self.add_new_sponsorships()
self.add_new_billdocuments()
self.add_new_subjects()
self.add_new_relatedbills()

self.insert_subjects()

self.log_message('Bills Complete!', fancy=True, style='SUCCESS', center=True)

def events_etl(self,
Expand Down Expand Up @@ -1374,56 +1373,6 @@ def insert_raw_billdocuments(self, delete=False):

self.log_message('Inserted {0} raw bill attachments and versions\n'.format(counter), style='SUCCESS')

def insert_raw_subjects(self, delete=False):
pk_cols = ['bill_id', 'subject']

self.setup_raw('subject',
delete=delete,
updated_at=False,
pk_cols=pk_cols)

inserts = []

insert_query = '''
INSERT INTO raw_subject (
subject,
bill_id
) VALUES (
:subject,
:bill_id
)
'''

counter = 0
for bill_json in os.listdir(self.bills_folder):

with open(os.path.join(self.bills_folder, bill_json)) as f:
bill_info = json.loads(f.read())

if 'subject' in bill_info and bill_info['subject']:
for subject in bill_info['subject']:
insert = {
'subject': subject,
'bill_id': bill_info['id'],
}

inserts.append(insert)

if inserts and len(inserts) % 10000 == 0:
self.executeTransaction(sa.text(insert_query), *inserts)

counter += 10000

self.log_message('Inserted {} raw subjects'.format(counter))

inserts = []

if inserts:
self.executeTransaction(sa.text(insert_query), *inserts)
counter += len(inserts)

self.log_message('Inserted {0} raw subjects\n'.format(counter), style='SUCCESS')

def insert_raw_relatedbills(self, delete=False):
pk_cols = ['related_bill_identifier', 'central_bill_id']

Expand Down Expand Up @@ -2201,58 +2150,6 @@ def update_existing_billdocuments(self):

self.log_message('Found {0} changed bill documents'.format(change_count), style='SUCCESS')


def update_existing_subjects(self):
self.executeTransaction('DROP TABLE IF EXISTS change_subject')
self.executeTransaction('''
CREATE TABLE change_subject (
bill_id VARCHAR,
subject VARCHAR
)
''')

cols = [
'bill_id',
'subject',
]

where_clause, set_values, fields = self.get_update_parts(cols, [])

find_changes = '''
INSERT INTO change_subject
SELECT
raw.bill_id,
raw.subject
FROM raw_subject AS raw
JOIN councilmatic_core_subject AS dat
ON (raw.bill_id = dat.bill_id
AND raw.subject = dat.subject)
WHERE {}
'''.format(where_clause)

update_dat = '''
UPDATE councilmatic_core_subject SET
{set_values}
FROM (
SELECT
{fields}
FROM raw_subject AS raw
JOIN change_subject AS change
ON (raw.bill_id = change.bill_id
AND raw.subject = change.subject)
) AS s
WHERE councilmatic_core_subject.bill_id = s.bill_id
AND councilmatic_core_subject.subject = s.subject
'''.format(set_values=set_values,
fields=fields)

self.executeTransaction(find_changes)
self.executeTransaction(update_dat)

change_count = self.connection.execute('select count(*) from change_subject').first().count

self.log_message('Found {0} changed subjects'.format(change_count), style='SUCCESS')

def update_existing_relatedbills(self):
self.executeTransaction('DROP TABLE IF EXISTS change_relatedbill')
self.executeTransaction('''
Expand Down Expand Up @@ -2920,56 +2817,57 @@ def add_new_billdocuments(self):

self.log_message('Found {0} new bill documents'.format(new_count), style='SUCCESS')

def add_new_subjects(self):
self.executeTransaction('DROP TABLE IF EXISTS new_subject')
self.executeTransaction('''
CREATE TABLE new_subject (
bill_id VARCHAR,
subject VARCHAR
def insert_subjects(self):
delete_statement = '''
DELETE FROM councilmatic_core_subject
WHERE bill_id = '{}'
'''

insert_query = '''
INSERT INTO councilmatic_core_subject (
bill_id,
subject
) VALUES (
:bill_id,
:subject
)
''')
'''

cols = [
'bill_id',
'subject',
]
counter = 0
inserts = []

find_new = '''
INSERT INTO new_subject
SELECT
raw.bill_id,
raw.subject
FROM raw_subject AS raw
LEFT JOIN councilmatic_core_subject AS dat
ON (raw.bill_id = dat.bill_id
AND raw.subject = dat.subject)
WHERE dat.bill_id IS NULL
AND dat.subject IS NULL
'''
for bill_json in os.listdir(self.bills_folder):

self.executeTransaction(find_new)
with open(os.path.join(self.bills_folder, bill_json)) as f:
bill_info = json.loads(f.read())

insert_fields = ', '.join(c for c in cols)
select_fields = ', '.join('raw.{}'.format(c) for c in cols)
bill_id = bill_info['id']

insert_new = '''
INSERT INTO councilmatic_core_subject (
{insert_fields}
)
SELECT {select_fields}
FROM raw_subject AS raw
JOIN new_subject AS new
ON (raw.bill_id = new.bill_id
AND raw.subject = new.subject)
'''.format(insert_fields=insert_fields,
select_fields=select_fields)
self.executeTransaction(delete_statement.format(bill_id))

self.executeTransaction(insert_new)
if 'subject' in bill_info and bill_info['subject']:
for subject in bill_info['subject']:
insert = {
'subject': subject,
'bill_id': bill_id,
}

inserts.append(insert)

if inserts and len(inserts) % 10000 == 0:
self.executeTransaction(sa.text(insert_query), *inserts)

counter += 10000

self.log_message('Inserted {} bill subjects'.format(counter))

new_count = self.connection.execute('select count(*) from new_subject').first().count
inserts = []

self.log_message('Found {0} new subjects'.format(new_count), style='SUCCESS')
if inserts:
self.executeTransaction(sa.text(insert_query), *inserts)
counter += len(inserts)

self.log_message('Added {0} bill subjects'.format(counter), style='SUCCESS')

def add_new_relatedbills(self):
self.executeTransaction('DROP TABLE IF EXISTS new_relatedbill')
Expand Down

0 comments on commit 5387ae8

Please sign in to comment.