Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CDC processing and arbitrary time granularity support (closes #110 and #111) #112

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions bin/cdc-tsupdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
#!/usr/bin/env python3

# Copyright © Los Alamos National Security, LLC, and others.

'''Parse and process CDC web log files in the same manner as the Wikimedia
access logs. The CDC logs are provided daily starting 2013-01-01.'''

import csv
from glob import iglob
from itertools import chain
import datetime
import os
import hashlib

import quacpath
import timeseries
import u
import testable
import time_
import db

# c is config (set at the bottom: u.configure())
c = u.c

# l is the logging object (set at the bottom: u.logging_init())
l = u.l

ap = u.ArgumentParser(description=__doc__)
gr = ap.default_group
gr.add_argument('outfile',
metavar='OUTFILE',
help='time series dataset to create or update')
gr.add_argument('uncompressed_folder',
help='location of the folder containing the uncompressed CSV files')

def get_sha256(path):
'''Get the SHA-256 hash of a file. This method can handle files of arbitrary
size. Idea comes from http://stackoverflow.com/a/3431838/1269634.'''
sha256 = hashlib.sha256()
with open(path, 'rb') as f:
# reading 1 MiB at a time should provide sufficient performance for our
# dataset of small files
for chunk in iter(lambda: f.read(1048576), b''):
sha256.update(chunk)
return sha256.hexdigest()

def main():
'''Read all new time series and write them to the dataset.'''
previously_processed_files = None
try:
# if the database of previously processed files doesn't exist, create it
previously_processed_files = db.SQLite(os.path.join(args.outfile,
'processed_files.db'),
True)
if not previously_processed_files.exists('sqlite_master',
"type='table' AND "
"name='processed_files'"):
previously_processed_files.sql('''CREATE TABLE processed_files (
path TEXT NOT NULL PRIMARY KEY,
sha256 TEXT NOT NULL
);''')

time_series, processed_files = read_time_series(previously_processed_files)
write_time_series(time_series, processed_files, previously_processed_files)
finally:
previously_processed_files.close()

def read_time_series(previously_processed_files):
'''Read all unprocessed files and build a time series for each region's pages.
The resulting file will be a dict mapping date to region to page/count
time series.'''
# the CSV files are inconsistently named
csv_paths = [
iglob(os.path.join(args.uncompressed_folder, 'Flu Pages by Region*.csv')),
iglob(os.path.join(args.uncompressed_folder, 'Pages by Region*.csv')),
iglob(os.path.join(args.uncompressed_folder,
'Pages Data Extract for CDC Internet*.csv')),
]

processed_files = dict() # map file path to hash
time_series = dict() # map date to region to page/count time series
for path in chain.from_iterable(csv_paths):
sha256 = get_sha256(path)
if previously_processed_files.exists('processed_files',
"path='{}'".format(path)):
previous_sha256 = previously_processed_files.get_one(
"SELECT sha256 FROM processed_files WHERE path='{}';"
.format(path))[0]

if sha256 == previous_sha256:
l.info('skipping {}'.format(path))
continue
else:
# Note that if the hash changes, we're going to output this as a
# warning and will process the file as normal, potentially
# overwriting existing data. The assumption here is that a hash
# change is intentional (e.g., a data correction).
l.warning('hash changed for {}'.format(path))

l.info('processing {}'.format(path))

with open(path) as csvfile:
# skip header
for line in csvfile:
# the first line always starts with the byte order mark (BOM)
# this is "\xef\xbb\xbf" or character number 65279
if not (line.startswith('#') or ord(line[0]) == 65279):
break

reader = csv.DictReader(csvfile)
for row in reader:
date = time_.utcify(datetime.datetime.strptime(row['Date'],
'%b %d, %Y'))
page = row['Pages'].replace('\n', '').strip()
region = row['Regions']

# there's no consistent name for the "count" row...ugh
if 'All CDC Visits' in row:
count = int(row['All CDC Visits'])
elif 'Visits' in row:
count = int(row['Visits'])
elif 'Page Views' in row:
count = int(row['Page Views'])
elif 'Page Views (Report-Specific)' in row:
count = int(row['Page Views (Report-Specific)'])
else:
raise ValueError('Count key missing from '
'row {} in file [{}].'.format(row, path))

#l.info('{} --- {} --- {} --- {}'.format(date, page, region, count))

if date not in time_series:
time_series[date] = dict()
if region not in time_series[date]:
time_series[date][region] = dict()

# Some of the files contain overlapping data. That is, for some
# reason, for a specific date/region/page context, there are
# multiple counts. Anecdotally, the first count often looks
# "real" (e.g., 164), while the second count is generally
# relatively small (e.g., 1 or 2). Based on this, we accept the
# first count we encounter and throw away any follow-up counts.
if page in time_series[date][region] and \
time_series[date][region][page] != count:
#l.warning('differing count: {} --- {} --- {} --- {} --- {}'
# .format(date,
# page,
# region,
# count,
# time_series[date][region][page]))
continue

time_series[date][region][page] = count

processed_files[path] = sha256

if not time_series:
l.info('no new data files')
return (None, None)

# ensure we aren't missing any dates
sorted_dates = sorted(time_series.keys())
current_date = sorted_dates[0]
last_date = sorted_dates[-1]
while current_date < last_date:
if current_date not in time_series.keys():
l.warning('missing date: {}'.format(current_date))
current_date += datetime.timedelta(days=1)

return (time_series, processed_files)

def write_time_series(time_series, processed_files, previously_processed_files):
'''Write out the time series to the databases. Right now, we're
using a hashmod of 4. This results in ~100k rows per table for
a full year's worth of data, which should provide for excellent
performance.'''
if not time_series:
l.info('no time series data to write')
return

ds = timeseries.Dataset(args.outfile, 'D', 4, writeable=True)
for date, regions in sorted(time_series.items()):
num_days = time_.days_in_year(date)
day_offset = time_.day_year_offset(date)

l.info('{} - day {} of {} days in the year'.format(date,
(day_offset + 1),
num_days))

# one DB per year
year = datetime.datetime(date.year, 1, 1)
date_ds = ds.open_timestamp(year, num_days)
date_ds.begin()

for region, pages in sorted(regions.items()):
pages = regions[region]

for page, count in sorted(pages.items()):
#l.info('{} - {} - {} - {}'.format(date, region, page, count))

# save the page fragment
page_f = date_ds.fetch_or_create('{}+{}'.format(region, page))
page_f.data[day_offset] = count
page_f.save()

# update the region's fragment
region_f = date_ds.fetch_or_create(region)
region_f.data[day_offset] += count
region_f.save()

date_ds.commit()
#ds.dump()

# update the DB of processed file names and their hashes
for path, sha256 in sorted(processed_files.items()):
if previously_processed_files.exists('processed_files',
"path='{}'".format(path)):
previously_processed_files.sql("UPDATE processed_files "
"SET sha256='{}' "
"WHERE path='{}';".format(sha256, path))
else:
previously_processed_files.sql("INSERT INTO processed_files "
"VALUES ('{}', '{}');".format(path,
sha256))

try:
args = u.parse_args(ap)
u.configure(args.config)
u.logging_init('cdclogs')
if __name__ == '__main__':
main()
except testable.Unittests_Only_Exception:
testable.register()
3 changes: 0 additions & 3 deletions bin/tssearch
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ Search a time series dataset for one or more specific named series and return
the result as TSV on stdout. Print a count of series found and not found.'''

import sys
import urllib.parse

import pandas as pd

import quacpath
import db
Expand Down
67 changes: 67 additions & 0 deletions bin/wp-migrate
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3

# Copyright © Los Alamos National Security, LLC, and others.

'''Migrate the schema v1 Wikipedia data to v2.'''

import os
from glob import iglob

import quacpath
import u
import testable
import time_
import db

# c is config (set at the bottom: u.configure())
c = u.c

# l is the logging object (set at the bottom: u.logging_init())
l = u.l

ap = u.ArgumentParser(description=__doc__)
gr = ap.default_group
gr.add_argument('outfile',
metavar='OUTFILE',
help='time series dataset to create or update')

def main():
# create the metadata DB
metadata_db = db.SQLite(os.path.join(args.outfile, 'metadata.db'), True)
metadata = {'hashmod': 64,
'interval': 'H',
'schema_version': 2}
metadata_db.sql("""PRAGMA encoding='UTF-8';
PRAGMA page_size = 65536; """)
metadata_db.begin()
metadata_db.sql("""CREATE TABLE metadata (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL )""")
metadata_db.sql_many("INSERT INTO metadata VALUES (?, ?)",
metadata.items())
metadata_db.commit()
metadata_db.close()

# for each DB, rename and delete the "metadata" table
for f in iglob(os.path.join(args.outfile, '*.db')):
filename = os.path.splitext(os.path.basename(f))[0]
if filename != 'metadata':
data_db = db.SQLite(f, True)
data_db.sql('DROP TABLE metadata;')
data_db.close()

date = time_.iso8601_parse(filename)
length = time_.hours_in_month(date)
new_filename = os.path.join(args.outfile,
'{}_{}.db'.format(date.replace(tzinfo=None).isoformat(),
length))
os.rename(f, new_filename)

try:
args = u.parse_args(ap)
u.configure(args.config)
u.logging_init('wp-migrate')
if __name__=='__main__':
main()
except testable.Unittests_Only_Exception:
testable.register()
19 changes: 10 additions & 9 deletions bin/wp-tsupdate
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ def main():
args.keep_threshold = int(c['wkpd']['keep_threshold'])
os.environ['SQLITE_TMPDIR'] = args.outfile
(month, pv_files) = pv_files_validated(args.pv_files)
ds = timeseries.Dataset(args.outfile, int(c['wkpd']['hashmod']),
ds = timeseries.Dataset(args.outfile, 'H', int(c['wkpd']['hashmod']),
writeable=True)
fg = ds.open_month(month)
num_hours = time_.hours_in_month(month)
fg = ds.open_timestamp(month, num_hours)
outfile_mtime = fg.mtime
l.info('opened %s/%s length %d hours' % (args.outfile, fg.tag, fg.length))
args.file_empty_p = fg.empty_p()
Expand Down Expand Up @@ -177,7 +178,7 @@ def main():
def file_read(file_, project_re):
try:
ts = wikimedia.timestamp_parse(file_)
hour_offset = time_.hour_offset(ts)
hour_month_offset = time_.hour_month_offset(ts)
fp = u.zcat(file_, r"zcat '%%s' | egrep '^%s [-A-Za-z0-9_~!*();@,./%%%%]+ [0-9]+ [0-9]+$'" % project_re)
badline_ct = 0
prev_line = b''
Expand All @@ -188,7 +189,7 @@ def file_read(file_, project_re):
(proj, url, count, _) = line.split(b' ')
prev_line = line
yield (proj.decode('ascii'), url.decode('ascii'),
hour_offset, int(count))
hour_month_offset, int(count))
except ValueError as x:
# Ignore lines that don't parse. Some files have thousands of
# these (pagecounts-20130201-010000.gz), and many files have at
Expand Down Expand Up @@ -238,13 +239,13 @@ def files_process(fg, files):
proj_last = proj
proj_totals = fetch_or_create(proj, np.float64, fill=np.nan)
url_v = fetch_or_create('%s+%s' % (proj, url), np.float32)
for (_, _, hour_offset, count) in gr:
for (_, _, hour_month_offset, count) in gr:
line_ct += 1
url_v.data[hour_offset] = count
if (np.isnan(proj_totals.data[hour_offset])):
proj_totals.data[hour_offset] = count
url_v.data[hour_month_offset] = count
if (np.isnan(proj_totals.data[hour_month_offset])):
proj_totals.data[hour_month_offset] = count
else:
proj_totals.data[hour_offset] += count
proj_totals.data[hour_month_offset] += count
if (url_v.save(keep_threshold)):
url_write_ct += 1
stats_printed = False # avoid multiple stats after non-write iterations
Expand Down
2 changes: 1 addition & 1 deletion doc-src/credits.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ Credits

* Reid Priedhorsky, Los Alamos National Laboratory
* Aron Culotta, Illinois Institute of Technology

* Geoffrey Fairchild, Los Alamos National Laboratory
Loading