diff --git a/councilmatic_core/management/commands/import_data.py b/councilmatic_core/management/commands/import_data.py index abdc4f09..fe40b796 100644 --- a/councilmatic_core/management/commands/import_data.py +++ b/councilmatic_core/management/commands/import_data.py @@ -284,21 +284,20 @@ def bills_etl(self, self.insert_raw_action_related_entity(delete=delete) self.insert_raw_sponsorships(delete=delete) self.insert_raw_billdocuments(delete=delete) - self.insert_raw_subjects(delete=delete) self.insert_raw_relatedbills(delete=delete) self.update_existing_action_related_entity() self.update_existing_sponsorships() self.update_existing_billdocuments() - self.update_existing_subjects() self.update_existing_relatedbills() self.add_new_action_related_entity() self.add_new_sponsorships() self.add_new_billdocuments() - self.add_new_subjects() self.add_new_relatedbills() + self.insert_subjects() + self.log_message('Bills Complete!', fancy=True, style='SUCCESS', center=True) def events_etl(self, @@ -1374,56 +1373,6 @@ def insert_raw_billdocuments(self, delete=False): self.log_message('Inserted {0} raw bill attachments and versions\n'.format(counter), style='SUCCESS') - def insert_raw_subjects(self, delete=False): - pk_cols = ['bill_id', 'subject'] - - self.setup_raw('subject', - delete=delete, - updated_at=False, - pk_cols=pk_cols) - - inserts = [] - - insert_query = ''' - INSERT INTO raw_subject ( - subject, - bill_id - ) VALUES ( - :subject, - :bill_id - ) - ''' - - counter = 0 - for bill_json in os.listdir(self.bills_folder): - - with open(os.path.join(self.bills_folder, bill_json)) as f: - bill_info = json.loads(f.read()) - - if 'subject' in bill_info and bill_info['subject']: - for subject in bill_info['subject']: - insert = { - 'subject': subject, - 'bill_id': bill_info['id'], - } - - inserts.append(insert) - - if inserts and len(inserts) % 10000 == 0: - self.executeTransaction(sa.text(insert_query), *inserts) - - counter += 10000 - - self.log_message('Inserted {} raw subjects'.format(counter)) - - inserts = [] - - if inserts: - self.executeTransaction(sa.text(insert_query), *inserts) - counter += len(inserts) - - self.log_message('Inserted {0} raw subjects\n'.format(counter), style='SUCCESS') - def insert_raw_relatedbills(self, delete=False): pk_cols = ['related_bill_identifier', 'central_bill_id'] @@ -2201,58 +2150,6 @@ def update_existing_billdocuments(self): self.log_message('Found {0} changed bill documents'.format(change_count), style='SUCCESS') - - def update_existing_subjects(self): - self.executeTransaction('DROP TABLE IF EXISTS change_subject') - self.executeTransaction(''' - CREATE TABLE change_subject ( - bill_id VARCHAR, - subject VARCHAR - ) - ''') - - cols = [ - 'bill_id', - 'subject', - ] - - where_clause, set_values, fields = self.get_update_parts(cols, []) - - find_changes = ''' - INSERT INTO change_subject - SELECT - raw.bill_id, - raw.subject - FROM raw_subject AS raw - JOIN councilmatic_core_subject AS dat - ON (raw.bill_id = dat.bill_id - AND raw.subject = dat.subject) - WHERE {} - '''.format(where_clause) - - update_dat = ''' - UPDATE councilmatic_core_subject SET - {set_values} - FROM ( - SELECT - {fields} - FROM raw_subject AS raw - JOIN change_subject AS change - ON (raw.bill_id = change.bill_id - AND raw.subject = change.subject) - ) AS s - WHERE councilmatic_core_subject.bill_id = s.bill_id - AND councilmatic_core_subject.subject = s.subject - '''.format(set_values=set_values, - fields=fields) - - self.executeTransaction(find_changes) - self.executeTransaction(update_dat) - - change_count = self.connection.execute('select count(*) from change_subject').first().count - - self.log_message('Found {0} changed subjects'.format(change_count), style='SUCCESS') - def update_existing_relatedbills(self): self.executeTransaction('DROP TABLE IF EXISTS change_relatedbill') self.executeTransaction(''' @@ -2920,56 +2817,57 @@ def add_new_billdocuments(self): self.log_message('Found {0} new bill documents'.format(new_count), style='SUCCESS') - def add_new_subjects(self): - self.executeTransaction('DROP TABLE IF EXISTS new_subject') - self.executeTransaction(''' - CREATE TABLE new_subject ( - bill_id VARCHAR, - subject VARCHAR + def insert_subjects(self): + delete_statement = ''' + DELETE FROM councilmatic_core_subject + WHERE bill_id = '{}' + ''' + + insert_query = ''' + INSERT INTO councilmatic_core_subject ( + bill_id, + subject + ) VALUES ( + :bill_id, + :subject ) - ''') + ''' - cols = [ - 'bill_id', - 'subject', - ] + counter = 0 + inserts = [] - find_new = ''' - INSERT INTO new_subject - SELECT - raw.bill_id, - raw.subject - FROM raw_subject AS raw - LEFT JOIN councilmatic_core_subject AS dat - ON (raw.bill_id = dat.bill_id - AND raw.subject = dat.subject) - WHERE dat.bill_id IS NULL - AND dat.subject IS NULL - ''' + for bill_json in os.listdir(self.bills_folder): - self.executeTransaction(find_new) + with open(os.path.join(self.bills_folder, bill_json)) as f: + bill_info = json.loads(f.read()) - insert_fields = ', '.join(c for c in cols) - select_fields = ', '.join('raw.{}'.format(c) for c in cols) + bill_id = bill_info['id'] - insert_new = ''' - INSERT INTO councilmatic_core_subject ( - {insert_fields} - ) - SELECT {select_fields} - FROM raw_subject AS raw - JOIN new_subject AS new - ON (raw.bill_id = new.bill_id - AND raw.subject = new.subject) - '''.format(insert_fields=insert_fields, - select_fields=select_fields) + self.executeTransaction(delete_statement.format(bill_id)) - self.executeTransaction(insert_new) + if 'subject' in bill_info and bill_info['subject']: + for subject in bill_info['subject']: + insert = { + 'subject': subject, + 'bill_id': bill_id, + } + + inserts.append(insert) + + if inserts and len(inserts) % 10000 == 0: + self.executeTransaction(sa.text(insert_query), *inserts) + + counter += 10000 + + self.log_message('Inserted {} bill subjects'.format(counter)) - new_count = self.connection.execute('select count(*) from new_subject').first().count + inserts = [] - self.log_message('Found {0} new subjects'.format(new_count), style='SUCCESS') + if inserts: + self.executeTransaction(sa.text(insert_query), *inserts) + counter += len(inserts) + self.log_message('Added {0} bill subjects'.format(counter), style='SUCCESS') def add_new_relatedbills(self): self.executeTransaction('DROP TABLE IF EXISTS new_relatedbill')