Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Sarchak/spark #112

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
__copyright__ = 'Copyright © 2017, Instacart'
__credits__ = ['Montana Low', 'Jeremy Stanley', 'Emmanuel Turlay', 'Shrikar Archak']
__license__ = 'MIT'
__version__ = '0.6.14'
__version__ = '0.6.15'
__maintainer__ = 'Montana Low'
__email__ = '[email protected]'
__status__ = 'Development Status :: 4 - Beta'
Expand Down
4 changes: 3 additions & 1 deletion lore/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'graphviz>=0.8.2, <0.8.99'] + H5PY
XGBOOST = ['xgboost>=0.72, <0.80']
SKLEARN = ['scikit-learn>=0.19, <0.19.99']
PYSPARK = ['pyspark>=2.3.1']

ALL = list(set(
DATEUTIL +
Expand All @@ -57,7 +58,8 @@
H5PY +
KERAS +
XGBOOST +
SKLEARN
SKLEARN +
PYSPARK
))

TEST = ALL + [
Expand Down
1 change: 1 addition & 0 deletions lore/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ def set_python_version(python_version):
INSTALLED_PACKAGES = None
REQUIRED_VERSION = None


# -- UI ----------------------------------------------------------------------
COLOR = {
DEVELOPMENT: ansi.GREEN,
Expand Down
15 changes: 15 additions & 0 deletions lore/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os
from glob import glob
import logging
import lore
lore.env.require(lore.dependencies.PYSPARK)
os.environ['SPARK_CONF_DIR'] = os.path.join(lore.env.ROOT, 'config', 'spark')
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

SPARK_MASTER = os.environ.get('SPARK_MASTER', 'local[4]')

spark_conf = SparkConf().setAppName(lore.env.APP).setMaster(SPARK_MASTER).set('spark.driver.memory', '5G').set('spark.jars.packages', 'net.snowflake:snowflake-jdbc:3.6.7,net.snowflake:spark-snowflake_2.11:2.4.3')

spark_context = SparkContext(conf=spark_conf)
spark = SparkSession(spark_context)
85 changes: 85 additions & 0 deletions lore/spark/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import absolute_import
import lore
from lore.env import require

require(
lore.dependencies.NUMPY +
lore.dependencies.PANDAS +
lore.dependencies.PYSPARK
)
from lore.encoders import Uniform, Unique, Token, Enum, Quantile, Norm
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, StandardScaler, QuantileDiscretizer, MinMaxScaler, Tokenizer, CountVectorizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

class Pipelines(object):
def __init__(self, encoders):
self._encoders = encoders
self._pipeline = None
self._str2idx = {}
self._indexer_map = {}

def fit(self, data):
indexers = []
for encoder in self._encoders:
input_name = encoder.column
encoded_name = "{}_enc".format(encoder.column)

# Normalizer, Standard Scaler, MinMaxScaler act on vector and not scalars
if isinstance(encoder, (Uniform, Norm)):
vectorized = "{}_vectorized".format(input_name)
assembler = VectorAssembler(inputCols=[input_name],outputCol=vectorized)
indexers.append(assembler)

## For Token type create map
### HACK!!!!!!! Ideally we should write a transformer/estimator for doing this. Still trying to figure
### how to write it in pyspark

print("XXX Encoder : {}, {}".format(encoder, isinstance(encoder, Token)))
if isinstance(encoder, Token):
tokenizer = Tokenizer(inputCol=input_name, outputCol="{}_tokens".format(input_name))
indexers.append(tokenizer)

if isinstance(encoder, Token):
indexer = CountVectorizer(inputCol="{}_tokens".format(input_name), outputCol="ignored".format(input_name), minDF=1.0)
self._indexer_map[str(indexer)] = encoder.column
elif isinstance(encoder, Unique):
indexer = StringIndexer(inputCol=input_name, outputCol=encoded_name)
elif isinstance(encoder, Uniform):
indexer = MinMaxScaler(inputCol=vectorized, outputCol=encoded_name)
elif isinstance(encoder, Norm):
indexer = StandardScaler(withStd=True, withMean=True, inputCol=vectorized, outputCol=encoded_name)
elif isinstance(encoder, Quantile):
indexer = QuantileDiscretizer(numBuckets=encoder.quantiles, inputCol=encoder.column, outputCol=encoded_name)
indexers.append(indexer)

pipeline = Pipeline(stages=indexers)
self._pipeline = pipeline.fit(data)
for indexer in self._pipeline.stages:
if str(indexer) in self._indexer_map:
self._str2idx[self._indexer_map[str(indexer)]] = dict([(x, i) for i,x in enumerate(indexer.vocabulary)])
return self._pipeline

def transform(self, data):

## For Token type create map
### HACK!!!!!!! Ideally we should write a transformer/estimator for doing this. Still trying to figure
### how to write it in pyspark

def translate(mapping):
def translate_(col, pad = 10):
tmp = [mapping.get(x, -1) for x in col]
return [-1]* (pad - len(tmp)) + tmp
return udf(translate_, StringType())

result = self._pipeline.transform(data)
for indexer in self._pipeline.stages:
idx_key = str(indexer)
if idx_key in self._indexer_map:
column = self._indexer_map[idx_key]
input_name = "{}_tokens".format(column)
result = result.withColumn("{}_enc".format(column), translate(self._str2idx[column])(input_name))
return result
138 changes: 138 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,139 @@
absl-py==0.3.0
amqp==2.3.2
appnope==0.1.0
asn1crypto==0.24.0
aws-xray-sdk==0.95
azure-common==1.1.14
azure-nspkg==2.0.0
azure-storage==0.36.0
backcall==0.1.0
bleach==1.5.0
boto3==1.7.73
boto==2.49.0
botocore==1.10.73
bz2file==0.98
certifi==2018.4.16
cffi==1.11.5
chardet==3.0.4
click==6.7
cookies==2.2.1
cryptography==2.2.2
cycler==0.10.0
decorator==4.3.0
dill==0.2.8.2
docker-pycreds==0.3.0
docker==3.4.1
docutils==0.14
ecdsa==0.13
entrypoints==0.2.3
Flask==0.12.4
future==0.16.0
futures==3.1.1
geoip2==2.9.0
graphviz==0.8.4
h5py==2.8.0
html5lib==0.9999999
hub==0.0.8
idna==2.7
ijson==2.3
iml==0.6.2
inflection==0.3.1
ipykernel==4.8.2
ipython-genutils==0.2.0
ipython==6.5.0
ipywidgets==7.4.0
itsdangerous==0.24
jedi==0.12.1
Jinja2==2.9.6
jmespath==0.9.3
jsondiff==1.1.1
jsonpickle==0.9.6
jsonschema==2.6.0
jupyter-client==5.2.3
jupyter-console==5.2.0
jupyter-core==4.4.0
jupyter==1.0.0
Keras==2.1.6
kiwisolver==1.0.1
kombu==4.2.1
librato-metrics==3.1.0
Markdown==2.6.11
MarkupSafe==1.0
matplotlib==2.2.2
maxminddb==1.4.1
mistune==0.8.3
mock==2.0.0
money==1.3.0
moto==1.3.4
nbconvert==5.3.1
nbformat==4.4.0
notebook==5.6.0
numpy==1.14.5
pandas==0.23.4
pandocfilters==1.4.2
parso==0.3.1
pbr==4.2.0
pexpect==4.6.0
pickleshare==0.7.4
prometheus-client==0.3.1
prompt-toolkit==1.0.15
protobuf==3.6.0
psycopg2==2.7.5
ptyprocess==0.6.0
py4j==0.10.7
pyaml==17.12.1
pyasn1-modules==0.2.2
pyasn1==0.4.4
pycparser==2.18
pycryptodome==3.6.4
pydot==1.2.4
Pygments==2.2.0
PyJWT==1.6.4
pyOpenSSL==17.5.0
pyparsing==2.2.0
pyspark==2.3.1
python-dateutil==2.6.1
python-jose==2.0.2
pytz==2018.5
PyYAML==3.13
pyzmq==17.1.0
qtconsole==4.3.1
redis==2.10.6
requests==2.19.1
responses==0.9.0
rollbar==0.14.2
s3transfer==0.1.13
scikit-learn==0.19.2
scipy==1.1.0
Send2Trash==1.5.0
shap==0.12.1
simplegeneric==0.8.1
six==1.11.0
smart-open==1.5.7
snowflake-connector-python==1.5.8
snowflake-sqlalchemy==1.1.2
sqlalchemy-migrate==0.11.0
sqlalchemy-redshift==0.7.1
SQLAlchemy==1.2.10
sqlparse==0.2.4
tabulate==0.8.2
Tempita==0.5.2
tensorflow-tensorboard==1.5.1
tensorflow==1.5.1
terminado==0.8.1
testpath==0.3.1
tornado==5.1
tqdm==4.24.0
traitlets==4.3.2
u-msgpack-python==2.5.0
urllib3==1.23
vine==1.1.4
wcwidth==0.1.7
websocket-client==0.48.0
Werkzeug==0.14.1
widgetsnbextension==3.4.0
wrapt==1.10.11
xgboost==0.72.1
xmltodict==0.11.0
-e .

2 changes: 1 addition & 1 deletion runtime.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
python-3.6.6
python-3.6.4