diff --git a/lore/__init__.py b/lore/__init__.py index 20b8192..3dfcb35 100644 --- a/lore/__init__.py +++ b/lore/__init__.py @@ -17,7 +17,7 @@ __copyright__ = 'Copyright © 2017, Instacart' __credits__ = ['Montana Low', 'Jeremy Stanley', 'Emmanuel Turlay', 'Shrikar Archak'] __license__ = 'MIT' -__version__ = '0.6.14' +__version__ = '0.6.15' __maintainer__ = 'Montana Low' __email__ = 'montana@instacart.com' __status__ = 'Development Status :: 4 - Beta' diff --git a/lore/dependencies.py b/lore/dependencies.py index 5cda62d..a579c9a 100644 --- a/lore/dependencies.py +++ b/lore/dependencies.py @@ -34,6 +34,7 @@ 'graphviz>=0.8.2, <0.8.99'] + H5PY XGBOOST = ['xgboost>=0.72, <0.80'] SKLEARN = ['scikit-learn>=0.19, <0.19.99'] +PYSPARK = ['pyspark>=2.3.1'] ALL = list(set( DATEUTIL + @@ -57,7 +58,8 @@ H5PY + KERAS + XGBOOST + - SKLEARN + SKLEARN + + PYSPARK )) TEST = ALL + [ diff --git a/lore/env.py b/lore/env.py index c697d30..81a7e96 100644 --- a/lore/env.py +++ b/lore/env.py @@ -515,6 +515,7 @@ def set_python_version(python_version): INSTALLED_PACKAGES = None REQUIRED_VERSION = None + # -- UI ---------------------------------------------------------------------- COLOR = { DEVELOPMENT: ansi.GREEN, diff --git a/lore/spark/__init__.py b/lore/spark/__init__.py new file mode 100644 index 0000000..c56cb36 --- /dev/null +++ b/lore/spark/__init__.py @@ -0,0 +1,15 @@ +import os +from glob import glob +import logging +import lore +lore.env.require(lore.dependencies.PYSPARK) +os.environ['SPARK_CONF_DIR'] = os.path.join(lore.env.ROOT, 'config', 'spark') +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +SPARK_MASTER = os.environ.get('SPARK_MASTER', 'local[4]') + +spark_conf = SparkConf().setAppName(lore.env.APP).setMaster(SPARK_MASTER).set('spark.driver.memory', '5G').set('spark.jars.packages', 'net.snowflake:snowflake-jdbc:3.6.7,net.snowflake:spark-snowflake_2.11:2.4.3') + +spark_context = SparkContext(conf=spark_conf) +spark = SparkSession(spark_context) diff --git a/lore/spark/pipelines.py b/lore/spark/pipelines.py new file mode 100644 index 0000000..34703cc --- /dev/null +++ b/lore/spark/pipelines.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import +import lore +from lore.env import require + +require( + lore.dependencies.NUMPY + + lore.dependencies.PANDAS + + lore.dependencies.PYSPARK +) +from lore.encoders import Uniform, Unique, Token, Enum, Quantile, Norm +from pyspark.ml import Pipeline +from pyspark.ml.feature import StringIndexer, StandardScaler, QuantileDiscretizer, MinMaxScaler, Tokenizer, CountVectorizer +from pyspark.ml.linalg import Vectors +from pyspark.ml.feature import VectorAssembler +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +class Pipelines(object): + def __init__(self, encoders): + self._encoders = encoders + self._pipeline = None + self._str2idx = {} + self._indexer_map = {} + + def fit(self, data): + indexers = [] + for encoder in self._encoders: + input_name = encoder.column + encoded_name = "{}_enc".format(encoder.column) + + # Normalizer, Standard Scaler, MinMaxScaler act on vector and not scalars + if isinstance(encoder, (Uniform, Norm)): + vectorized = "{}_vectorized".format(input_name) + assembler = VectorAssembler(inputCols=[input_name],outputCol=vectorized) + indexers.append(assembler) + + ## For Token type create map + ### HACK!!!!!!! Ideally we should write a transformer/estimator for doing this. Still trying to figure + ### how to write it in pyspark + + print("XXX Encoder : {}, {}".format(encoder, isinstance(encoder, Token))) + if isinstance(encoder, Token): + tokenizer = Tokenizer(inputCol=input_name, outputCol="{}_tokens".format(input_name)) + indexers.append(tokenizer) + + if isinstance(encoder, Token): + indexer = CountVectorizer(inputCol="{}_tokens".format(input_name), outputCol="ignored".format(input_name), minDF=1.0) + self._indexer_map[str(indexer)] = encoder.column + elif isinstance(encoder, Unique): + indexer = StringIndexer(inputCol=input_name, outputCol=encoded_name) + elif isinstance(encoder, Uniform): + indexer = MinMaxScaler(inputCol=vectorized, outputCol=encoded_name) + elif isinstance(encoder, Norm): + indexer = StandardScaler(withStd=True, withMean=True, inputCol=vectorized, outputCol=encoded_name) + elif isinstance(encoder, Quantile): + indexer = QuantileDiscretizer(numBuckets=encoder.quantiles, inputCol=encoder.column, outputCol=encoded_name) + indexers.append(indexer) + + pipeline = Pipeline(stages=indexers) + self._pipeline = pipeline.fit(data) + for indexer in self._pipeline.stages: + if str(indexer) in self._indexer_map: + self._str2idx[self._indexer_map[str(indexer)]] = dict([(x, i) for i,x in enumerate(indexer.vocabulary)]) + return self._pipeline + + def transform(self, data): + + ## For Token type create map + ### HACK!!!!!!! Ideally we should write a transformer/estimator for doing this. Still trying to figure + ### how to write it in pyspark + + def translate(mapping): + def translate_(col, pad = 10): + tmp = [mapping.get(x, -1) for x in col] + return [-1]* (pad - len(tmp)) + tmp + return udf(translate_, StringType()) + + result = self._pipeline.transform(data) + for indexer in self._pipeline.stages: + idx_key = str(indexer) + if idx_key in self._indexer_map: + column = self._indexer_map[idx_key] + input_name = "{}_tokens".format(column) + result = result.withColumn("{}_enc".format(column), translate(self._str2idx[column])(input_name)) + return result \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d6e1198..a95e1be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,139 @@ +absl-py==0.3.0 +amqp==2.3.2 +appnope==0.1.0 +asn1crypto==0.24.0 +aws-xray-sdk==0.95 +azure-common==1.1.14 +azure-nspkg==2.0.0 +azure-storage==0.36.0 +backcall==0.1.0 +bleach==1.5.0 +boto3==1.7.73 +boto==2.49.0 +botocore==1.10.73 +bz2file==0.98 +certifi==2018.4.16 +cffi==1.11.5 +chardet==3.0.4 +click==6.7 +cookies==2.2.1 +cryptography==2.2.2 +cycler==0.10.0 +decorator==4.3.0 +dill==0.2.8.2 +docker-pycreds==0.3.0 +docker==3.4.1 +docutils==0.14 +ecdsa==0.13 +entrypoints==0.2.3 +Flask==0.12.4 +future==0.16.0 +futures==3.1.1 +geoip2==2.9.0 +graphviz==0.8.4 +h5py==2.8.0 +html5lib==0.9999999 +hub==0.0.8 +idna==2.7 +ijson==2.3 +iml==0.6.2 +inflection==0.3.1 +ipykernel==4.8.2 +ipython-genutils==0.2.0 +ipython==6.5.0 +ipywidgets==7.4.0 +itsdangerous==0.24 +jedi==0.12.1 +Jinja2==2.9.6 +jmespath==0.9.3 +jsondiff==1.1.1 +jsonpickle==0.9.6 +jsonschema==2.6.0 +jupyter-client==5.2.3 +jupyter-console==5.2.0 +jupyter-core==4.4.0 +jupyter==1.0.0 +Keras==2.1.6 +kiwisolver==1.0.1 +kombu==4.2.1 +librato-metrics==3.1.0 +Markdown==2.6.11 +MarkupSafe==1.0 +matplotlib==2.2.2 +maxminddb==1.4.1 +mistune==0.8.3 +mock==2.0.0 +money==1.3.0 +moto==1.3.4 +nbconvert==5.3.1 +nbformat==4.4.0 +notebook==5.6.0 +numpy==1.14.5 +pandas==0.23.4 +pandocfilters==1.4.2 +parso==0.3.1 +pbr==4.2.0 +pexpect==4.6.0 +pickleshare==0.7.4 +prometheus-client==0.3.1 +prompt-toolkit==1.0.15 +protobuf==3.6.0 +psycopg2==2.7.5 +ptyprocess==0.6.0 +py4j==0.10.7 +pyaml==17.12.1 +pyasn1-modules==0.2.2 +pyasn1==0.4.4 +pycparser==2.18 +pycryptodome==3.6.4 +pydot==1.2.4 +Pygments==2.2.0 +PyJWT==1.6.4 +pyOpenSSL==17.5.0 +pyparsing==2.2.0 +pyspark==2.3.1 +python-dateutil==2.6.1 +python-jose==2.0.2 +pytz==2018.5 +PyYAML==3.13 +pyzmq==17.1.0 +qtconsole==4.3.1 +redis==2.10.6 +requests==2.19.1 +responses==0.9.0 +rollbar==0.14.2 +s3transfer==0.1.13 +scikit-learn==0.19.2 +scipy==1.1.0 +Send2Trash==1.5.0 +shap==0.12.1 +simplegeneric==0.8.1 +six==1.11.0 +smart-open==1.5.7 +snowflake-connector-python==1.5.8 +snowflake-sqlalchemy==1.1.2 +sqlalchemy-migrate==0.11.0 +sqlalchemy-redshift==0.7.1 +SQLAlchemy==1.2.10 +sqlparse==0.2.4 +tabulate==0.8.2 +Tempita==0.5.2 +tensorflow-tensorboard==1.5.1 +tensorflow==1.5.1 +terminado==0.8.1 +testpath==0.3.1 +tornado==5.1 +tqdm==4.24.0 +traitlets==4.3.2 +u-msgpack-python==2.5.0 +urllib3==1.23 +vine==1.1.4 +wcwidth==0.1.7 +websocket-client==0.48.0 +Werkzeug==0.14.1 +widgetsnbextension==3.4.0 +wrapt==1.10.11 +xgboost==0.72.1 +xmltodict==0.11.0 -e . + diff --git a/runtime.txt b/runtime.txt index 1935e97..5c45380 100644 --- a/runtime.txt +++ b/runtime.txt @@ -1 +1 @@ -python-3.6.6 +python-3.6.4