diff --git a/.ci/310.yml b/.ci/310.yml index f894ed7..e666b7a 100644 --- a/.ci/310.yml +++ b/.ci/310.yml @@ -3,6 +3,9 @@ channels: - conda-forge dependencies: - python=3.10 + - dask + - dask-geopandas + - distributed - jupyterlab - numpy - geopandas diff --git a/.ci/311.yml b/.ci/311.yml index a09fcd2..a533e59 100644 --- a/.ci/311.yml +++ b/.ci/311.yml @@ -4,6 +4,9 @@ channels: dependencies: - python=3.11 - jupyterlab + - dask + - dask-geopandas + - distributed - numpy - geopandas - pandas diff --git a/.ci/39.yml b/.ci/39.yml index 3eceed8..d029839 100644 --- a/.ci/39.yml +++ b/.ci/39.yml @@ -3,6 +3,9 @@ channels: - conda-forge dependencies: - python=3.9 + - dask + - dask-geopandas + - distributed - numpy - geopandas - pandas @@ -29,4 +32,4 @@ dependencies: - numpydoc - nbsphinx - joblib - - astropy \ No newline at end of file + - astropy diff --git a/environment.yml b/environment.yml index 3037858..7d4c6e2 100644 --- a/environment.yml +++ b/environment.yml @@ -2,6 +2,9 @@ name: tobler channels: - conda-forge dependencies: + - dask-geopandas + - dask + - distributed - jupyterlab - numpy - geopandas >=0.13 diff --git a/notebooks/04_area_interpolate_dask.ipynb b/notebooks/04_area_interpolate_dask.ipynb new file mode 100644 index 0000000..128b57b --- /dev/null +++ b/notebooks/04_area_interpolate_dask.ipynb @@ -0,0 +1,732 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e3f2586a-5b6a-4d46-b6e8-1991ae3bec6f", + "metadata": {}, + "source": [ + "# (Distributed) areal interpolation" + ] + }, + { + "cell_type": "markdown", + "id": "00f875bd-2714-4551-b10c-1ef3f514478d", + "metadata": {}, + "source": [ + "In this notebook, we compare the single-core version in `tobler.area_weighted.area_interpolate` with the distributed version in `tobler.area_weighted.area_interpolate_dask`. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "4084f715-3989-4424-943a-2a4066a8bcf2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "os.environ['USE_PYGEOS'] = '1'\n", + "\n", + "import pandas\n", + "import geopandas\n", + "import dask_geopandas\n", + "import tobler\n", + "from libpysal.examples import load_example\n", + "import numpy as np\n", + "\n", + "from dask.distributed import Client, LocalCluster" + ] + }, + { + "cell_type": "markdown", + "id": "d16a2e15-866b-407d-b65d-54a675aefbd7", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "markdown", + "id": "080369e7-f3d4-41c6-a629-12ed458eb743", + "metadata": {}, + "source": [ + "Load example data from `pysal`:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "cb395dc5-67f2-462e-a1cf-919c8e6d0ae8", + "metadata": {}, + "outputs": [], + "source": [ + "c1 = load_example('Charleston1')\n", + "c2 = load_example('Charleston2')\n", + "\n", + "crs = 6569 # https://epsg.io/6569\n", + "\n", + "tracts = geopandas.read_file(c1.get_path('sc_final_census2.shp')).to_crs(crs)\n", + "zip_codes = geopandas.read_file(c2.get_path('CharlestonMSA2.shp')).to_crs(crs)" + ] + }, + { + "cell_type": "markdown", + "id": "1d11c1d7-6435-40cb-a4d4-851f63eccf01", + "metadata": {}, + "source": [ + "We make up a categorical variable with four classes distributed randomly across the dataset:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "3543702f-5e8a-4336-a14d-19a4eeb77b1b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "rng = np.random.default_rng(seed=42)\n", + "\n", + "tracts['rando'] = pandas.Series(\n", + " rng.integers(0, 4, len(tracts)), dtype='category'\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "d2546bb7-abcb-4cad-8db8-c569ea9289ae", + "metadata": {}, + "source": [ + "We will set up a local Dask cluster so you can follow the computations on the dashboard (`http://localhost:8787` by default):" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "d65ac8ec-51e2-4d2d-abb2-96a7519ed749", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "client = Client(LocalCluster(n_workers=8))" + ] + }, + { + "cell_type": "markdown", + "id": "88c32c7d-0ca8-4945-a1f8-edfbc8917880", + "metadata": {}, + "source": [ + "Finally, for Dask, we need to provide `dask_geopandas.GeoDataFrame` objects with spatial partitions and categorical variables properly set up:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "a31a1a91-4071-40e2-a21f-7e035d734976", + "metadata": {}, + "outputs": [], + "source": [ + "dtracts = (\n", + " dask_geopandas.from_geopandas(tracts[['geometry', 'rando']], npartitions=16)\n", + " .spatial_shuffle(by='hilbert', shuffle=\"tasks\")\n", + ")\n", + "\n", + "dzips = (\n", + " dask_geopandas.from_geopandas(zip_codes[['ZIP', 'geometry']], npartitions=16)\n", + " .spatial_shuffle(by='hilbert', shuffle=\"tasks\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "54f986ec-ea46-479e-aed8-5edeeaf16fda", + "metadata": {}, + "source": [ + "---\n", + "\n", + "**IMPORTANT** - At this point, only *categorical* variables are implemented, so those are what we will test.\n", + "\n", + "---" + ] + }, + { + "cell_type": "markdown", + "id": "b783aabc-8221-40f6-a0d5-bf21dd75e2a6", + "metadata": { + "tags": [] + }, + "source": [ + "## Correctness" + ] + }, + { + "cell_type": "markdown", + "id": "92dafb11-ec94-43c2-baec-2a5e2a0b380d", + "metadata": {}, + "source": [ + "- Single core" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4d4cde6d-73c1-4197-86ed-131724e21296", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts, zip_codes, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "2982d8dc-c1e9-4927-8643-9900b1b09890", + "metadata": {}, + "source": [ + "- Dask" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "d8c7896f-9004-4a07-b3ba-75301f8120e5", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts, dzips, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + }, + { + "cell_type": "markdown", + "id": "5e19b8dd-505f-4dc1-ba85-9fd825e59b43", + "metadata": {}, + "source": [ + "And we can compare both results are the same:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "81de5e35-f3b6-4567-86b1-36d98583dca0", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "rando_0 4.188295e-08\n", + "rando_1 5.328575e-08\n", + "rando_2 5.396667e-08\n", + "rando_3 2.935173e-08\n", + "dtype: float64" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "a = (\n", + " cat_dk\n", + " .set_index('ZIP')\n", + " .reindex(zip_codes['ZIP'].values)\n", + " .drop(columns='geometry')\n", + ")\n", + "\n", + "b = (\n", + " cat_sc\n", + " .drop(columns='geometry')\n", + " [['rando_0', 'rando_1', 'rando_2', 'rando_3']]\n", + ")\n", + "b.index = a.index\n", + "\n", + "(a - b).max()" + ] + }, + { + "cell_type": "markdown", + "id": "e2e04df1-3331-449c-b74c-e910239c3067", + "metadata": {}, + "source": [ + "The differences in the estimates for the proportions of each area start at the 8th decimal, and thus likely rounding errors derived from the different approaches used to compute the interpolation (the single core does it in one-shot, while Dask computes parts and brings them together later with a sum)." + ] + }, + { + "cell_type": "markdown", + "id": "1debbdf4-892f-4fda-834a-0403595794ef", + "metadata": { + "tags": [] + }, + "source": [ + "## Performance\n", + "\n", + "---\n", + "\n", + "**NOTE** - Timings below do _not_ include computation time required for spatial shuffling and partitioning (which can be substantial with large datasets), or converting from `geopandas`. These are \"sunk costs\" that'll only make this approach preferable with large datasets, although they can be computed once and the result stored in disk efficiently (e.g., as Parquet files). Having said that, when \"larger\" is large enough is not very large in modern terms: from a handful of thousand observations the gains will be substantial if several cores/workers are available.\n", + "\n", + "---" + ] + }, + { + "cell_type": "markdown", + "id": "e5242c13-c4cd-46e2-9131-ec1734bcc142", + "metadata": {}, + "source": [ + "We can now time the example above:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "902e494b-65ba-4fa2-99e6-eb3a513769f8", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "85 ms ± 1.51 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" + ] + } + ], + "source": [ + "%%timeit\n", + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts, zip_codes, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "5cfc44d9-f79a-4b8e-9caa-975ea64d5f0e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1.41 s ± 51.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" + ] + } + ], + "source": [ + "%%timeit\n", + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts, dzips, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + }, + { + "cell_type": "markdown", + "id": "a124ee86-c527-4386-be8d-2dc833270fd9", + "metadata": {}, + "source": [ + "This is notably slower (about 5x!). For such a small dataset, the overhead in distributing computations and collecting them overcomes any gains in parallelism.\n", + "\n", + "Now we can artificially increase the size of the datasets by concatenating them several times and re-computing (this time we only time one execution):" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "5f56d579-0022-45c2-845c-f351bf96ed01", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "40x increase | N. tracts: 4680 | N. ZIPs: 1680\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/envs/tobler/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 30.18 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n", + "/opt/conda/envs/tobler/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 30.18 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "sizeup = 40\n", + "tracts_lrg = pandas.concat([tracts] * sizeup)\n", + "zips_lrg = pandas.concat([zip_codes] * sizeup)\n", + "print(\n", + " f'{sizeup}x increase | N. tracts: {len(tracts_lrg)} | N. ZIPs: {len(zips_lrg)}'\n", + ")\n", + "\n", + "dtracts_lrg = (\n", + " dask_geopandas.from_geopandas(tracts_lrg[['geometry', 'rando']], chunksize=500)\n", + " .spatial_shuffle(by='hilbert', shuffle=\"tasks\")\n", + ")\n", + "\n", + "dzips_lrg = (\n", + " dask_geopandas.from_geopandas(zips_lrg[['ZIP', 'geometry']], chunksize=500)\n", + " .spatial_shuffle(by='hilbert', shuffle=\"tasks\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "e5187109-ba95-4b5f-b373-2ec4745d0289", + "metadata": {}, + "source": [ + "And re-compute the timings:" + ] + }, + { + "cell_type": "markdown", + "id": "c0da372a-f791-47fb-ade0-317a1cf6ff9c", + "metadata": { + "jp-MarkdownHeadingCollapsed": true, + "tags": [] + }, + "source": [ + "---\n", + "\n", + "### 10x" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "620cf9ab-7b9e-4458-809c-c7a73d13f26c", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 10x\n", + "CPU times: user 7.21 s, sys: 11.3 ms, total: 7.23 s\n", + "Wall time: 6.95 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts_lrg, zips_lrg, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "c615b27a-e004-429b-a0c5-e4b237516f9f", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 10x\n", + "CPU times: user 548 ms, sys: 18 ms, total: 566 ms\n", + "Wall time: 3.56 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + }, + { + "cell_type": "markdown", + "id": "cc13af25-e97e-4b34-bb1f-bb946c15748e", + "metadata": { + "jp-MarkdownHeadingCollapsed": true, + "tags": [] + }, + "source": [ + "---\n", + "\n", + "### 20x" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "8dbb40d4-4b3b-446d-9d1b-99462a122d6e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 20x\n", + "CPU times: user 28.6 s, sys: 26.1 ms, total: 28.7 s\n", + "Wall time: 27.6 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts_lrg, zips_lrg, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "f2ca1394-5f8d-428f-a61c-87beb8778322", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 20x\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/envs/tobler/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 16.77 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 1.32 s, sys: 65.3 ms, total: 1.38 s\n", + "Wall time: 9.86 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + }, + { + "cell_type": "markdown", + "id": "335b34b4-9fea-48a6-b38b-8b1a5d755ca1", + "metadata": { + "jp-MarkdownHeadingCollapsed": true, + "tags": [] + }, + "source": [ + "---\n", + "\n", + "### 30x" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "1598ce3f-d21e-4a60-9619-ee5b1eb4932f", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 30x\n", + "CPU times: user 1min 4s, sys: 176 ms, total: 1min 4s\n", + "Wall time: 1min 1s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts_lrg, zips_lrg, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "224ffbca-7690-4b20-bad2-efbf042623a9", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 30x\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/envs/tobler/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 25.14 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 1.91 s, sys: 58.8 ms, total: 1.97 s\n", + "Wall time: 14.6 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + }, + { + "cell_type": "markdown", + "id": "b004834f-c5ce-4f92-be9a-364a07c7996b", + "metadata": { + "tags": [] + }, + "source": [ + "---\n", + "\n", + "### 40x" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "b6b9d06a-9034-4c39-b3a9-92fc6408d5c6", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 40x\n", + "CPU times: user 2min 2s, sys: 1.71 s, total: 2min 3s\n", + "Wall time: 1min 53s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_sc = tobler.area_weighted.area_interpolate(\n", + " tracts_lrg, zips_lrg, categorical_variables=['rando']\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "8a68e5fe-ee41-48cc-9222-6554a7651c28", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing for a sizeup of 40x\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/envs/tobler/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 33.52 MiB.\n", + "This may cause some slowdown.\n", + "Consider scattering data ahead of time and using futures.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 6.99 s, sys: 512 ms, total: 7.5 s\n", + "Wall time: 30.5 s\n" + ] + } + ], + "source": [ + "%%time\n", + "print(f'Computing for a sizeup of {sizeup}x')\n", + "cat_dk = tobler.area_weighted.area_interpolate_dask(\n", + " dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']\n", + ").compute()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "tobler", + "language": "python", + "name": "tobler" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tobler/area_weighted/__init__.py b/tobler/area_weighted/__init__.py index b9f4707..5c9e7e4 100644 --- a/tobler/area_weighted/__init__.py +++ b/tobler/area_weighted/__init__.py @@ -1,3 +1,4 @@ from .area_interpolate import _area_interpolate_binning as area_interpolate from .area_interpolate import _area_tables_binning from .area_join import area_join +from .area_interpolate_dask import area_interpolate_dask diff --git a/tobler/area_weighted/area_interpolate.py b/tobler/area_weighted/area_interpolate.py index 40bafe9..155d533 100644 --- a/tobler/area_weighted/area_interpolate.py +++ b/tobler/area_weighted/area_interpolate.py @@ -212,6 +212,7 @@ def _area_interpolate_binning( spatial_index="auto", n_jobs=1, categorical_variables=None, + categorical_frequency=True ): """ Area interpolation for extensive, intensive and categorical variables. @@ -249,6 +250,11 @@ def _area_interpolate_binning( available. If `table` is passed, this is ignored. categorical_variables : list [Optional. Default=None] Columns in dataframes for categorical variables + categorical_frequency : Boolean + [Optional. Default=True] If True, `estimates` returns the frequency of each + value in a categorical variable in every polygon of `target_df` (proportion of + area). If False, `estimates` contains the area in every polygon of `target_df` + that is occupied by each value of the categorical Returns ------- @@ -357,7 +363,8 @@ def _area_interpolate_binning( )[0] categorical = pd.DataFrame(categorical) - categorical = categorical.div(target_df.area.values, axis="rows") + if categorical_frequency is True: + categorical = categorical.div(target_df.area.values, axis="rows") if extensive_variables: dfs.append(extensive) diff --git a/tobler/area_weighted/area_interpolate_dask.py b/tobler/area_weighted/area_interpolate_dask.py new file mode 100755 index 0000000..7f3b26e --- /dev/null +++ b/tobler/area_weighted/area_interpolate_dask.py @@ -0,0 +1,268 @@ +''' +Area Weighted Interpolation, out-of-core and parallel through Dask +''' + +import pandas +import geopandas +import numpy as np +from .area_interpolate import _area_interpolate_binning as area_interpolate +try: + import dask_geopandas + from dask.base import tokenize + from dask.highlevelgraph import HighLevelGraph +except ImportError: + raise ImportError( + "Area interpolation with Dask requires `dask` and " + "`dask_geopandas` installed to run. Please install them " + "before importing this functionality." + ) + +def area_interpolate_dask( + source_dgdf, + target_dgdf, + id_col, + extensive_variables=None, + intensive_variables=None, + categorical_variables=None, + categorical_frequency=True +): + ''' + Out-of-core and parallel area interpolation for categorical variables. + + Parameters + ---------- + source_dgdf : dask_geopandas.GeoDataFrame + Dask-geopandas GeoDataFrame + IMPORTANT: the table needs to be spatially shuffled and with spatial partitions. + This is required so only overlapping partitions are checked for interpolation. See + more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html + target_dgdf : dask_geopandas.GeoDataFrame + Dask-geopandas GeoDataFrame + IMPORTANT: the table needs to be spatially shuffled and with spatial partitions. + This is required so only overlapping partitions are checked for interpolation. See + more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html + id_col : str + Name of the column in `target_dgdf` with unique IDs to be used in output table + extensive_variables : list + [Optional. Default=None] Columns in `source_dgdf` for extensive variables. + IMPORTANT: currently NOT implemented. + intensive_variables : list + [Optional. Default=None] Columns in `source_dgdf` for intensive variables + IMPORTANT: currently NOT implemented. + categorical_variables : list + [Optional. Default=None] Columns in `source_dgdf` for categorical variables + IMPORTANT: categorical variables must be of type `'category[known]'`. This is so + all categories are known ahead of time and Dask can run lazily. + categorical_frequency : Boolean + [Optional. Default=True] If True, `estimates` returns the frequency of each + value in a categorical variable in every polygon of `target_df` (proportion of + area). If False, `estimates` contains the area in every polygon of `target_df` + that is occupied by each value of the categorical + + + Returns + ------- + estimates : dask_geopandas.GeoDataFrame + new dask-geopandas geodaraframe with interpolated variables and `id_col` as + columns and target_df geometry as output geometry + + ''' + if intensive_variables is not None: + raise NotImplementedError(( + "Dask-based interpolation of intensive variables is " + "not implemented yet. Please remove intensive variables to " + "be able to run the rest." + )) + if extensive_variables is not None: + raise NotImplementedError(( + "Dask-based interpolation of extensive variables is " + "not implemented yet. Please remove intensive variables to " + "be able to run the rest." + )) + # Categoricals must be Dask's known categorical + if categorical_variables is not None: + category_vars = [] + for cat_var in categorical_variables: + var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] + category_vars.extend(var_names) + else: + category_vars = None + # Build tasks by joining pairs of chunks from left/right + dsk = {} + new_spatial_partitions = [] + parts = geopandas.sjoin( + source_dgdf.spatial_partitions.to_frame('geometry'), + target_dgdf.spatial_partitions.to_frame('geometry'), + how='inner', + predicate='intersects' + ) + parts_left = np.asarray(parts.index) + parts_right = np.asarray(parts['index_right'].values) + name = 'area_interpolate-' + tokenize( + target_dgdf, source_dgdf + ) + for i, (l, r) in enumerate(zip(parts_left, parts_right)): + dsk[(name, i)] = ( + id_area_interpolate, + (source_dgdf._name, l), + (target_dgdf._name, r), + id_col, + extensive_variables, + intensive_variables, + None, + True, + 'auto', + 1, + categorical_variables, + category_vars, + ) + lr = source_dgdf.spatial_partitions.iloc[l] + rr = target_dgdf.spatial_partitions.iloc[r] + extent = lr.intersection(rr) + new_spatial_partitions.append(extent) + # Create geometries for new spatial partitions + new_spatial_partitions = geopandas.GeoSeries( + data=new_spatial_partitions, crs=source_dgdf.crs + ) + # Build Dask graph + graph = HighLevelGraph.from_collections( + name, dsk, dependencies=[source_dgdf, target_dgdf] + ) + # Get metadata for the outcome table + meta = id_area_interpolate( + source_dgdf._meta, + target_dgdf._meta, + id_col, + extensive_variables=extensive_variables, + intensive_variables=intensive_variables, + table=None, + allocate_total=True, + spatial_index='auto', + n_jobs=1, + categorical_variables=categorical_variables, + category_vars=category_vars, + ) + # Build output table + transferred = dask_geopandas.GeoDataFrame( + graph, + name, + meta, + [None] * (len(dsk) + 1), + new_spatial_partitions + ) + # Merge chunks + out = target_dgdf[[id_col, 'geometry']] + ## Extensive --> Not implemented (DAB: the below does not match single-core) + ''' + if extensive_variables is not None: + out_extensive = ( + transferred + .groupby(id_col) + [extensive_variables] + .agg({v: 'sum' for v in extensive_variables}) + ) + out = out.join(out_extensive, on=id_col) + ''' + ## Intensive --> Weight by area of the chunk (Not implemented) + ## Categorical --> Add up proportions + if categorical_variables is not None: + out_categorical = ( + transferred + [category_vars] + .astype(float) + .groupby(transferred[id_col]) + .agg({v: 'sum' for v in category_vars}) + ) + out = out.join(out_categorical, on=id_col) + if categorical_frequency is True: + cols = out_categorical.columns.tolist() + out[cols] = out[cols].div( + out.area, axis='index' + ) + return out + +def id_area_interpolate( + source_df, + target_df, + id_col, + extensive_variables=None, + intensive_variables=None, + table=None, + allocate_total=True, + spatial_index='auto', + n_jobs=1, + categorical_variables=None, + category_vars=None +): + ''' + Light wrapper around single-core area interpolation to be run on distributed workers + + Parameters + ---------- + source_df : geopandas.GeoDataFrame + target_df : geopandas.GeoDataFrame + id_col : str + Name of the column in `target_dgdf` with unique IDs to be used in output table + extensive_variables : list + [Optional. Default=None] Columns in dataframes for extensive variables + intensive_variables : list + [Optional. Default=None] Columns in dataframes for intensive variables + table : scipy.sparse.csr_matrix + [Optional. Default=None] Area allocation source-target correspondence + table. If not provided, it will be built from `source_df` and + `target_df` using `tobler.area_interpolate._area_tables_binning` + allocate_total : boolean + [Optional. Default=True] True if total value of source area should be + allocated. False if denominator is area of i. Note that the two cases + would be identical when the area of the source polygon is exhausted by + intersections. See Notes for more details. + spatial_index : str + [Optional. Default="auto"] Spatial index to use to build the + allocation of area from source to target tables. It currently support + the following values: + - "source": build the spatial index on `source_df` + - "target": build the spatial index on `target_df` + - "auto": attempts to guess the most efficient alternative. + Currently, this option uses the largest table to build the + index, and performs a `bulk_query` on the shorter table. + This argument is ignored if n_jobs>1 (or n_jobs=-1). + n_jobs : int + [Optional. Default=1] Number of processes to run in parallel to + generate the area allocation. If -1, this is set to the number of CPUs + available. If `table` is passed, this is ignored. + categorical_variables : list + [Optional. Default=None] Columns in dataframes for categorical variables + categories : list + [Optional. Default=None] Full list of category names in the format + `f'{var_name}_{cat_name}'` + + Returns + ------- + estimates : geopandas.GeoDataFrame + new geodaraframe with interpolated variables as columns and target_df geometry + as output geometry + + ''' + estimates = area_interpolate( + source_df, + target_df, + extensive_variables=extensive_variables, + intensive_variables=intensive_variables, + table=table, + allocate_total=allocate_total, + spatial_index=spatial_index, + n_jobs=n_jobs, + categorical_variables=categorical_variables, + categorical_frequency=False + ) + estimates[id_col] = target_df[id_col].values + + if categorical_variables is not None: + category_vars_to_add = [] + for category_var in category_vars: + if category_var not in estimates.columns: + category_vars_to_add.append(category_var) + estimates = estimates.join( + pandas.DataFrame(index=estimates.index, columns=category_vars_to_add) + ) + return estimates diff --git a/tobler/tests/test_area_interpolators.py b/tobler/tests/test_area_interpolators.py index 27cd829..bc8791b 100644 --- a/tobler/tests/test_area_interpolators.py +++ b/tobler/tests/test_area_interpolators.py @@ -1,9 +1,11 @@ """test interpolation functions.""" import geopandas +import dask_geopandas from libpysal.examples import load_example from numpy.testing import assert_almost_equal from tobler.area_weighted import area_interpolate +from tobler.area_weighted import area_interpolate_dask from tobler.area_weighted.area_interpolate import _area_tables_binning from geopandas.testing import assert_geodataframe_equal import pytest @@ -79,6 +81,30 @@ def test_area_interpolate_categorical(): assert_almost_equal(area.animal_capybara.sum(), 20, decimal=0) +def test_area_interpolate_categorical_dask(): + sac1, sac2 = datasets() + sac1['animal'] = sac1['animal'].astype('category') + dsac1 = ( + dask_geopandas.from_geopandas(sac1, npartitions=2) + .spatial_shuffle(by='hilbert', shuffle='tasks') + ) + dsac2 = ( + dask_geopandas.from_geopandas(sac2, npartitions=2) + .spatial_shuffle(by='hilbert', shuffle='tasks') + ) + area = area_interpolate_dask( + source_dgdf=dsac1, + target_dgdf=dsac2, + id_col='ZIP', + categorical_variables=["animal"], + ).compute() + assert_almost_equal(area.animal_cat.sum(), 32, decimal=0) + assert_almost_equal(area.animal_dog.sum(), 19, decimal=0) + assert_almost_equal(area.animal_donkey.sum(), 22, decimal=0) + assert_almost_equal(area.animal_wombat.sum(), 23, decimal=0) + assert_almost_equal(area.animal_capybara.sum(), 20, decimal=0) + + def test_area_interpolate_custom_index(): sac1, sac2 = datasets() sac1.index = sac1.index * 2 @@ -193,4 +219,4 @@ def test_passed_table(): table=dok, ) assert_almost_equal(area.TOT_POP.sum(), 1796856, decimal=0) - assert_almost_equal(area.pct_poverty.sum(), 2140, decimal=0) \ No newline at end of file + assert_almost_equal(area.pct_poverty.sum(), 2140, decimal=0)