Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move execute_insert to top module nesting so pickle does not err #87

Open
wants to merge 73 commits into
base: ka/parallel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
db2f8ae
initial operator overloading
potash Nov 17, 2016
3f35a15
syntax, when
potash Nov 17, 2016
b6cbaaa
real test
potash Nov 17, 2016
b25ab25
truediv
potash Dec 12, 2016
a5463a4
consistency fix: make sure quantites is a tuple-valued dict
potash Dec 24, 2016
837329d
Fix whitespace issues
mbauman Dec 28, 2016
f528951
Update coverage from 4.2 to 4.3.1 (#46)
pyup-bot Dec 28, 2016
aef832f
Update pytest from 3.0.4 to 3.0.5 (#44)
pyup-bot Dec 28, 2016
4584d39
Update csvkit from 0.9.1 to 1.0.0 (#43)
pyup-bot Dec 28, 2016
3a61e1c
Update csvkit from 1.0.0 to 1.0.1 (#48)
pyup-bot Dec 29, 2016
e21034a
Ep/distinct (#41)
potash Jan 3, 2017
eac71bb
Add a categorical helper function
mbauman Dec 23, 2016
51d8d2f
flake8
mbauman Dec 23, 2016
51f1134
Rename categorical to multicompare; create simple MultiCompare subclass
mbauman Dec 29, 2016
e3f52a5
support lists of tuples in quantities
mbauman Dec 29, 2016
45029e0
flake8
mbauman Dec 29, 2016
75da2fa
Rename MultiCompare to Compare; introduce Categorical
mbauman Jan 3, 2017
f7b0342
Change default include_null behavior to False
mbauman Jan 4, 2017
b812dfa
Allow passing include_null shortname as a truthy value
mbauman Jan 4, 2017
ded6fb6
Merge pull request #38 from dssg/mb/helpers
potash Jan 4, 2017
7ef0e61
Add non-dev requirements.txt (#32)
mbauman Jan 4, 2017
3389d0a
Ep/refactor spacetime (#49)
potash Jan 5, 2017
ae5c8dd
Ep/join table (#50)
potash Jan 9, 2017
4cc018e
Update to v0.2.0
thcrock Feb 26, 2017
5a6033a
get statements before exec
potash Feb 27, 2017
0ab0cb1
merge master
potash Feb 27, 2017
c843215
arithmetic
potash Feb 27, 2017
e210332
operator_str
potash Feb 27, 2017
5f9e2ac
docstring
potash Feb 27, 2017
f6ae852
make interval to accessible to spacetime aggregates
potash Feb 27, 2017
192f133
Be less lazy for Python 2.7
mbauman Feb 27, 2017
8890de1
Support format_kwargs when getting columns of AggregateExpressions
mbauman Feb 27, 2017
e29931a
Update coverage from 4.3.1 to 4.3.4 (#54)
pyup-bot Feb 28, 2017
a538a13
Update sqlalchemy to 1.1.5 (#55)
pyup-bot Feb 28, 2017
ae9458a
Update cryptography from 1.7.1 to 1.7.2 (#58)
pyup-bot Feb 28, 2017
6142a0f
Update tox from 2.5.0 to 2.6.0 (#59)
pyup-bot Feb 28, 2017
6c14f11
Update flake8 from 3.2.1 to 3.3.0 (#60)
pyup-bot Feb 28, 2017
2565473
Update sphinx from 1.5.1 to 1.5.3 (#62)
pyup-bot Feb 28, 2017
8a9325d
Add trivial integration test
mbauman Feb 28, 2017
fedd7d1
Merge branch 'master' into ep/aggregate_arithmetic
mbauman Feb 28, 2017
70c90c1
Update sqlalchemy to 1.1.6 (#64)
pyup-bot Mar 1, 2017
260696d
Update psycopg2 from 2.6.2 to 2.7 (#66)
pyup-bot Mar 1, 2017
08542f6
Update pytest from 3.0.5 to 3.0.6 (#65)
pyup-bot Mar 1, 2017
80b8b63
Go back to using the count function
mbauman Mar 1, 2017
23361d8
Update psycopg2 from 2.7 to 2.7.1 (#71)
pyup-bot Mar 14, 2017
4ae3990
Merge pull request #63 from dssg/ep/aggregate_arithmetic
mbauman Mar 14, 2017
2c60460
Update cryptography from 1.7.2 to 1.8.1 (#70)
pyup-bot Mar 14, 2017
48def77
Update pytest from 3.0.6 to 3.0.7 (#73)
pyup-bot Mar 14, 2017
b2584fe
Bundle pyupdates, please
mbauman Mar 28, 2017
e18d8af
Update sqlalchemy to 1.1.7 (#76)
pyup-bot Mar 28, 2017
fd6eb13
Scheduled biweekly dependency update for week 14 (#77)
pyup-bot Apr 3, 2017
aee37a8
use filter instead of case when
potash Apr 3, 2017
c006e22
unused format kwarg
potash Apr 3, 2017
a59ac30
add mode to tests
potash Apr 3, 2017
d841725
include order in aggregate name and test it
potash Apr 3, 2017
67f4db2
Whitespace
mbauman Apr 4, 2017
939bcde
Merge pull request #79 from dssg/ep/order_name_bug
mbauman Apr 4, 2017
c59866e
Merge pull request #78 from dssg/ep/filter
mbauman Apr 12, 2017
cab2476
Add validation and support for restricting the "beginning of time" (#74)
mbauman Apr 12, 2017
82bb17f
Update sqlalchemy from 1.1.8 to 1.1.9
pyup-bot Apr 17, 2017
f92e98b
Update sqlalchemy from 1.1.8 to 1.1.9
pyup-bot Apr 17, 2017
bc3577e
Update tox from 2.6.0 to 2.7.0
pyup-bot Apr 17, 2017
4acdeaa
Merge pull request #80 from dssg/pyup-scheduled-update-04-17-2017
mbauman Apr 17, 2017
3f9429f
Allow overriding of choice quoting [Resolves #81]
thcrock Apr 21, 2017
a1d22ce
Untangle quoting logic, rename quoting argument
thcrock Apr 21, 2017
36a93e2
Merge pull request #83 from dssg/choice_quoting
mbauman Apr 21, 2017
9682010
Don't modify dict during iteration when shortening keys
mbauman Apr 21, 2017
2d97d5a
Merge pull request #84 from dssg/mb/82
mbauman Apr 26, 2017
c5d0791
move execute_insert to top module nesting so pickle does not err
May 15, 2017
cf05dbd
remove that pdb
May 15, 2017
470ae97
move execute_insert to sql
May 15, 2017
347da7e
merge master, solve conflict
May 16, 2017
661d32c
re-add SpacetimeSubQueryAggregation
May 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pyup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
schedule: "every two weeks"
531 changes: 235 additions & 296 deletions collate/collate.py

Large diffs are not rendered by default.

313 changes: 313 additions & 0 deletions collate/spacetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
# -*- coding: utf-8 -*-
from itertools import chain
import sqlalchemy.sql.expression as ex

from .sql import make_sql_clause
from .collate import Aggregation


class SpacetimeAggregation(Aggregation):
def __init__(self, aggregates, groups, intervals, from_obj, dates,
prefix=None, suffix=None, schema=None, date_column=None,
output_date_column=None, input_min_date=None):
"""
Args:
intervals: the intervals to aggregate over. either a list of
datetime intervals, e.g. ["1 month", "1 year"], or
a dictionary of group : intervals pairs where
group is a group in groups and intervals is a collection
of datetime intervals, e.g. {"address_id": ["1 month", "1 year]}
dates: list of PostgreSQL date strings,
e.g. ["2012-01-01", "2013-01-01"]
date_column: name of date column in from_obj, defaults to "date"
output_date_column: name of date column in aggregated output, defaults to "date"
input_min_date: minimum date for which rows shall be included, defaults
to no absolute time restrictions on the minimum date of included rows

For all other arguments see collate.Aggregation
"""
Aggregation.__init__(self,
aggregates=aggregates,
from_obj=from_obj,
groups=groups,
prefix=prefix,
suffix=suffix,
schema=schema)

if isinstance(intervals, dict):
self.intervals = intervals
else:
self.intervals = {g: intervals for g in self.groups}
self.dates = dates
self.date_column = date_column if date_column else "date"
self.output_date_column = output_date_column if output_date_column else "date"
self.input_min_date = input_min_date

def _get_aggregates_sql(self, interval, date, group):
"""
Helper for getting aggregates sql
Args:
interval: SQL time interval string, or "all"
date: SQL date string
group: group clause, for naming columns
Returns: collection of aggregate column SQL strings
"""
if interval != 'all':
when = "{date_column} >= '{date}'::date - interval '{interval}'".format(
interval=interval, date=date, date_column=self.date_column)
else:
when = None

prefix = "{prefix}_{group}_{interval}_".format(
prefix=self.prefix, interval=interval,
group=group)

return chain(*[a.get_columns(when, prefix, format_kwargs={"collate_date": date,
"collate_interval": interval})
for a in self.aggregates])

def get_selects(self):
"""
Constructs select queries for this aggregation

Returns: a dictionary of group : queries pairs where
group are the same keys as groups
queries is a list of Select queries, one for each date in dates
"""
queries = {}

for group, groupby in self.groups.items():
intervals = self.intervals[group]
queries[group] = []
for date in self.dates:
columns = [groupby,
ex.literal_column("'%s'::date"
% date).label(self.output_date_column)]
columns += list(chain(*[self._get_aggregates_sql(
i, date, group) for i in intervals]))

gb_clause = make_sql_clause(groupby, ex.literal_column)
query = ex.select(columns=columns, from_obj=self.from_obj)\
.group_by(gb_clause)
query = query.where(self.where(date, intervals))

queries[group].append(query)

return queries

def where(self, date, intervals):
"""
Generates a WHERE clause
Args:
date: the end date
intervals: intervals

Returns: a clause for filtering the from_obj to be between the date and
the greatest interval
"""
# upper bound
w = "{date_column} < '{date}'".format(
date_column=self.date_column, date=date)

# lower bound (if possible)
if 'all' not in intervals:
greatest = "greatest(%s)" % str.join(
",", ["interval '%s'" % i for i in intervals])
min_date = "'{date}'::date - {greatest}".format(date=date, greatest=greatest)
w += "AND {date_column} >= {min_date}".format(
date_column=self.date_column, min_date=min_date)
if self.input_min_date is not None:
w += "AND {date_column} >= '{bot}'::date".format(
date_column=self.date_column, bot=self.input_min_date)
return ex.text(w)

def get_indexes(self):
"""
Generate create index queries for this aggregation

Returns: a dictionary of group : index pairs where
group are the same keys as groups
index is a raw create index query for the corresponding table
"""
return {group: "CREATE INDEX ON %s (%s, %s);" %
(self.get_table_name(group), groupby, self.output_date_column)
for group, groupby in self.groups.items()}

def get_join_table(self):
"""
Generates a join table, consisting of an entry for each combination of
groups and dates in the from_obj
"""
groups = list(self.groups.values())
intervals = list(set(chain(*self.intervals.values())))

queries = []
for date in self.dates:
columns = groups + [ex.literal_column("'%s'::date" % date).label(
self.output_date_column)]
queries.append(ex.select(columns, from_obj=self.from_obj)
.where(self.where(date, intervals))
.group_by(*groups))

return str.join("\nUNION ALL\n", map(str, queries))

def get_create(self, join_table=None):
"""
Generate a single aggregation table creation query by joining
together the results of get_creates()
Returns: a CREATE TABLE AS query
"""
if not join_table:
join_table = '(%s) t1' % self.get_join_table()
query = "SELECT * FROM %s\n" % join_table
for group, groupby in self.groups.items():
query += " LEFT JOIN %s USING (%s, %s)" % (
self.get_table_name(group), groupby, self.output_date_column)

return "CREATE TABLE %s AS (%s);" % (self.get_table_name(), query)

def validate(self, conn):
"""
SpacetimeAggregations ensure that no intervals extend beyond the absolute
minimum time.
"""
if self.input_min_date is not None:
all_intervals = set(*self.intervals.values())
for date in self.dates:
for interval in all_intervals:
if interval == "all":
continue
# This could be done more efficiently all at once, but doing
# it this way allows for nicer error messages.
r = conn.execute("select ('%s'::date - '%s'::interval) < '%s'::date" %
(date, interval, self.input_min_date))
if r.fetchone()[0]:
raise ValueError(
"date '%s' - '%s' is before input_min_date ('%s')" %
(date, interval, self.input_min_date))


class SpacetimeSubQueryAggregation(SpacetimeAggregation):
def __init__(self, aggregates, groups, intervals, from_obj, dates,
prefix=None, suffix=None, schema=None, date_column=None, output_date_column=None,
sub_query=None, join_table=None):
"""
Args:
aggregates: collection of Aggregate objects
from_obj: defines the name of the sub query
groups: a list of expressions to group by in the aggregation or a dictionary
pairs group: expr pairs where group is the alias (used in column names)
intervals: the intervals to aggregate over. either a list of
datetime intervals, e.g. ["1 month", "1 year"], or
a dictionary of group : intervals pairs where
group is a group in groups and intervals is a collection
of datetime intervals, e.g. {"address_id": ["1 month", "1 year]}
dates: list of PostgreSQL date strings,
e.g. ["2012-01-01", "2013-01-01"]
prefix: prefix for column names, defaults to from_obj
suffix: suffix for aggregation table, defaults to "aggregation"
date_column: name of date column in from_obj, defaults to "date"
output_date_column: name of date column in aggregated output, defaults to "date"
join_table: specify a join table, i.e. a table containing unique sets of all possible
valid groups to left join the aggregations onto.
Defaults to None, in which case this table is created by querying the from_obj.

The group arguments is passed directly to the
SQLAlchemy Select object so could be anything supported there.
For details see:
http://docs.sqlalchemy.org/en/latest/core/selectable.html
"""
Aggregation.__init__(self,
aggregates=aggregates,
from_obj=from_obj,
groups=groups,
prefix=prefix,
suffix=suffix,
schema=schema)

if isinstance(intervals, dict):
self.intervals = intervals
else:
self.intervals = {g: intervals for g in self.groups}
self.dates = dates
self.date_column = date_column if date_column else "date"
self.output_date_column = output_date_column if output_date_column else "date"
self.sub_query = sub_query
self.join_table = join_table

def get_selects(self):
"""
Constructs select queries for this aggregation using a sub query

Returns: a dictionary of group : queries pairs where
group are the same keys as groups
queries is a list of Select queries, one for each date in dates
"""
queries = {}

for group, groupby in self.groups.items():
intervals = self.intervals[group]
queries[group] = []
for date in self.dates:
# sub query

# upper bound on date_column by date
where = ex.text("{date_column} < '{date}'".format(
date_column=self.date_column, date=date))

# the where clause is applied at the the sub_query as this query can make use of indices
sub_query = self.sub_query.where(where)

if 'all' not in intervals:
greatest = "greatest(%s)" % str.join(
",", ["interval '%s'" % i for i in intervals])
sub_query = sub_query.where(ex.text(
"{date_column} >= '{date}'::date - {greatest}".format(
date_column=self.date_column, date=date,
greatest=greatest)))

# name the sub query
sub_query = sub_query.alias(str(self.from_obj))

# main query
columns = [groupby,
ex.literal_column("'%s'::date"
% date).label(self.output_date_column)]
columns += list(chain(*(self._get_aggregates_sql(
i, date, group) for i in intervals)))

gb_clause = make_sql_clause(groupby, ex.literal_column)

# note: there is no where clause as the filtering is applied at the sub query level
query = ex.select(columns=columns, from_obj=sub_query) \
.group_by(gb_clause)

queries[group].append(query)

return queries

def get_join_table(self):
"""
Generate a query for a join table
"""
if self.join_table is not None:
return '(%s) t1' % ex.Select(columns=self.groups.values(), from_obj=self.join_table) \
.group_by(*self.groups.values())
else:
return '(%s) t1' % ex.Select(columns=self.groups.values(), from_obj=self.from_obj) \
.group_by(*self.groups.values())

def get_create(self):
"""
Generate a single aggregation table creation query by joining
together the results of get_creates()
Returns: a CREATE TABLE AS query
"""
query = ("SELECT * FROM %s\n"
"CROSS JOIN (select unnest('{%s}'::date[]) as %s) t2\n") % (
self.get_join_table(), str.join(',', self.dates), self.output_date_column)
for group, groupby in self.groups.items():
query += "LEFT JOIN %s USING (%s, %s)" % (
self.get_table_name(group), groupby, self.output_date_column)

return "CREATE TABLE %s AS (%s);" % (self.get_table_name(), query)
19 changes: 19 additions & 0 deletions collate/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,32 @@
#from sqlalchemy.sql import compiler
#from psycopg2.extensions import adapt as sqlescape


def make_sql_clause(s, constructor):
if not isinstance(s, ex.ClauseElement):
return constructor(s)
else:
return s


def execute_insert(get_engine, insert):
try:
engine = get_engine()
except:
print('Could not connect to the database within spawned process')
raise

print("Starting parallel process")

# transaction
with engine.begin() as conn:
conn.execute(insert)

engine.dispose()

return True


class CreateTableAs(ex.Executable, ex.ClauseElement):

def __init__(self, name, query):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SQLAlchemy==1.1.9
19 changes: 10 additions & 9 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ pip==9.0.1
bumpversion==0.5.3
wheel==0.29.0
watchdog==0.8.3
flake8==3.2.1
tox==2.5.0
coverage==4.2
Sphinx==1.5.1
cryptography==1.7.1
flake8==3.3.0
tox==2.7.0
coverage==4.3.4
Sphinx==1.5.4
cryptography==1.8.1
PyYAML==3.12
pytest==3.0.4
SQLAlchemy==1.1.4
psycopg2==2.6.2
csvkit==0.9.1
pytest==3.0.7
SQLAlchemy==1.1.9
psycopg2==2.7.1
csvkit==1.0.1
codecov==2.0.5
pytest-cov==2.4.0
testing.postgresql==1.3.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

setup(
name='collate',
version='0.1.0',
version='0.2.0',
description="Aggregated feature generation made easy.",
long_description=readme + '\n\n' + history,
author="DSaPP Researchers",
Expand Down
Loading