Skip to content

Commit

Permalink
Merge pull request #16 from csiro-easi/debug_dask
Browse files Browse the repository at this point in the history
Debug dask
  • Loading branch information
mpaget authored Feb 21, 2023
2 parents 0ad804d + 826ceda commit 50dd305
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 41 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ RUN chmod 777 /code/pywps.cfg \
&& chown wps:users /code/logs \
&& chown wps:users /code

RUN chmod -R 777 /code/outputs \
&& chown wps:users /code/outputs

ENTRYPOINT ["wps-entrypoint.sh"]

USER wps
Expand Down
79 changes: 44 additions & 35 deletions datacube_wps/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ def _render_outputs(
name="Timeseries",
header=True,
):
html_url = upload_chart_html_to_S3(chart, str(uuid))
img_url = upload_chart_svg_to_S3(chart, str(uuid))
# html_url = upload_chart_html_to_S3(chart, str(uuid))
# img_url = upload_chart_svg_to_S3(chart, str(uuid))

try:
csv_df = df.drop(columns=["latitude", "longitude"])
Expand All @@ -300,9 +300,13 @@ def _render_outputs(

output_json = json.dumps(output_dict, cls=DatetimeEncoder)

# outputs = {
# "image": {"data": img_url},
# "url": {"data": html_url},
# "timeseries": {"data": output_json},
# }

outputs = {
"image": {"data": img_url},
"url": {"data": html_url},
"timeseries": {"data": output_json},
}

Expand Down Expand Up @@ -344,6 +348,11 @@ def __init__(self, about, input, style):
self.input = input
self.style = style

self.dask_client = None
# self.dask_client = dask_client = Client(
# n_workers=num_workers(), processes=True, threads_per_worker=1
# )

def input_formats(self):
return [
ComplexInput(
Expand Down Expand Up @@ -376,24 +385,18 @@ def request_handler(self, request, response):
return response

@log_call
def query_handler(self, time, feature, dask_client=None, parameters=None):
def query_handler(self, time, feature, parameters=None):
if parameters is None:
parameters = {}

if dask_client is None:
dask_client = Client(
n_workers=1, processes=False, threads_per_worker=num_workers()
)

with dask_client:
configure_s3_access(
aws_unsigned=True,
region_name=os.getenv("AWS_DEFAULT_REGION", "auto"),
client=dask_client,
)
configure_s3_access(
aws_unsigned=True,
region_name=os.getenv("AWS_DEFAULT_REGION", "auto"),
client=self.dask_client,
)

with datacube.Datacube() as dc:
data = self.input_data(dc, time, feature)
with datacube.Datacube() as dc:
data = self.input_data(dc, time, feature)

df = self.process_data(data, {"time": time, "feature": feature, **parameters})
chart = self.render_chart(df)
Expand Down Expand Up @@ -421,8 +424,11 @@ def input_data(self, dc, time, feature):
lonlat = feature.coords[0]
measurements = self.input.output_measurements(bag.product_definitions)

data = self.input.fetch(box, dask_chunks={"time": 1})
data = data.compute()
if self.dask_client:
data = self.input.fetch(box, dask_chunks={"time": 1})
data = data.compute()
else:
data = self.input.fetch(box)

coords = {
"longitude": np.array([lonlat[0]]),
Expand Down Expand Up @@ -490,6 +496,11 @@ def __init__(self, about, input, style):
self.style = style
self.mask_all_touched = False

self.dask_client = None
# self.dask_client = dask_client = Client(
# n_workers=num_workers(), processes=True, threads_per_worker=1
# )

def input_formats(self):
return [
ComplexInput(
Expand Down Expand Up @@ -523,24 +534,18 @@ def request_handler(self, request, response):
return response

@log_call
def query_handler(self, time, feature, dask_client=None, parameters=None):
def query_handler(self, time, feature, parameters=None):
if parameters is None:
parameters = {}

if dask_client is None:
dask_client = Client(
n_workers=num_workers(), processes=True, threads_per_worker=1
)

with dask_client:
configure_s3_access(
aws_unsigned=True,
region_name=os.getenv("AWS_DEFAULT_REGION", "auto"),
client=dask_client,
)
configure_s3_access(
aws_unsigned=True,
region_name=os.getenv("AWS_DEFAULT_REGION", "auto"),
client=self.dask_client,
)

with datacube.Datacube() as dc:
data = self.input_data(dc, time, feature)
with datacube.Datacube() as dc:
data = self.input_data(dc, time, feature)

df = self.process_data(data, {"time": time, "feature": feature, **parameters})
chart = self.render_chart(df)
Expand Down Expand Up @@ -590,7 +595,11 @@ def input_data(self, dc, time, feature):
_guard_rail(self.input, box)

# TODO customize the number of processes
data = self.input.fetch(box, dask_chunks={"time": 1})
if self.dask_client:
data = self.input.fetch(box, dask_chunks={"time": 1})
else:
data = self.input.fetch(box)

mask = geometry_mask(
feature, data.geobox, all_touched=self.mask_all_touched, invert=True
)
Expand Down
11 changes: 6 additions & 5 deletions datacube_wps/processes/fcdrill.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ def process_data(self, data, parameters):
'Unobservable': (not_pixels / total_valid)['bs'] * 100
})

print('dask compute')
dask_time = default_timer()
new_ds = new_ds.compute()
print('dask took', default_timer() - dask_time, 'seconds')
print(new_ds)
if self.dask_client:
print('dask compute')
dask_time = default_timer()
new_ds = new_ds.compute()
print('dask took', default_timer() - dask_time, 'seconds')
print(new_ds)

df = new_ds.to_dataframe()
df = df.drop('spatial_ref', axis=1)
Expand Down
3 changes: 2 additions & 1 deletion datacube_wps/processes/mangrovedrill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ class MangroveDrill(PolygonDrill):

@log_call
def process_data(self, data, parameters):
data = data.compute()
if self.dask_client:
data = data.compute()

# TODO raise ProcessError('query returned no data') when appropriate
woodland = data.where(data == 1).count(['x', 'y'])
Expand Down
29 changes: 29 additions & 0 deletions wpsrequest.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<wps:Execute version="1.0.0" service="WPS" xml:lang="en-US" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.opengis.net/wps/1.0.0" xmlns:wfs="http://www.opengis.net/wfs" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:gml="http://www.opengis.net/gml" xmlns:ogc="http://www.opengis.net/ogc" xmlns:wcs="http://www.opengis.net/wcs/1.1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 ../wpsDescribeProcess_response.xsd">
<ows:Identifier>Mangrove Cover Drill</ows:Identifier>
<wps:DataInputs>
<wps:Input>
<ows:Identifier>geometry</ows:Identifier>
<wps:Data>
<wps:ComplexData mimeType="application/vnd.geo+json"><![CDATA[{"features": [{"geometry": {"type": "Polygon", "coordinates": [[[153.06, -27.19], [153.16, -27.19], [153.16, -27.29], [153.06, -27.29], [153.06, -27.19]]]}, "crs": "EPSG:4326"}]}]]></wps:ComplexData>
</wps:Data>
</wps:Input>
<wps:Input>
<ows:Identifier>start</ows:Identifier>
<wps:Data>
<wps:ComplexData mimeType="application/vnd.geo+json"><![CDATA[{"properties": {"timestamp": {"date-time": "2019-01-01"}}}]]></wps:ComplexData>
</wps:Data>
</wps:Input>
<wps:Input>
<ows:Identifier>end</ows:Identifier>
<wps:Data>
<wps:ComplexData mimeType="application/vnd.geo+json"><![CDATA[{"properties": {"timestamp": {"date-time": "2019-03-01"}}}]]></wps:ComplexData>
</wps:Data>
</wps:Input>
</wps:DataInputs>
<wps:ResponseForm>
<wps:RawDataOutput mimeType="application/vnd.terriajs.catalog-member+json">
<ows:Identifier>timeseries</ows:Identifier>
</wps:RawDataOutput>
</wps:ResponseForm>
</wps:Execute>

0 comments on commit 50dd305

Please sign in to comment.