Skip to content

Commit

Permalink
fix uploader
Browse files Browse the repository at this point in the history
- don't change cog_id/system/system_version
- stream from disk
- store result in completed queue
  • Loading branch information
robkooper committed May 2, 2024
1 parent ada8e01 commit 8bdb437
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.6.0] - 2024-05-01

### Added
- uploaded results are stored in a completed queue

### Changed
- uploader no longer changes the system_name and version

## [0.5.0] - 2024-04-29

This is a big change, instead of listening to `map.process` events we now listen for updates from
Expand Down
14 changes: 6 additions & 8 deletions uploader/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def process(self, method, properties, body):
self.properties = properties
self.body = body
self.exception = None
self.result = None

def run(self):
try:
Expand All @@ -34,15 +35,9 @@ def run(self):
if os.path.getsize(file) > max_size * 1024 * 1024: # size in bytes
raise ValueError(f"File {file} is larger than {max_size}MB, skipping upload.")
headers = {'Authorization': f'Bearer {cdr_token}', 'Content-Type': 'application/json'}
with open(file, 'rb') as f:
cdr_data = json.load(f)
# TODO this needs to be in pipeline.py
cdr_data['cog_id'] = data['cog_id']
cdr_data['system'] = data['system']
cdr_data['system_version'] = data['version']
response = requests.post(f'{cdr_url}/v1/maps/publish/features', data=json.dumps(cdr_data), headers=headers)
logging.debug(response.text)
response = requests.post(f'{cdr_url}/v1/maps/publish/features', data=open(file, rb), headers=headers)
response.raise_for_status()
result = response.text
except RequestException as e:
logging.exception(f"Request Error {response.text}.")
self.exception = e
Expand All @@ -63,6 +58,7 @@ def main():
# create queues
channel.queue_declare(queue=f"{prefix}upload", durable=True)
channel.queue_declare(queue=f"{prefix}upload.error", durable=True)
channel.queue_declare(queue=f"{prefix}completed", durable=True)

# listen for messages and stop if nothing found after 5 minutes
channel.basic_qos(prefetch_count=1)
Expand All @@ -86,6 +82,8 @@ def main():
channel.basic_publish(exchange='', routing_key=f"{prefix}upload.error", body=json.dumps(data), properties=worker.properties)
else:
logging.info(f"Finished all processing steps for map {data['cog_id']}")
data['result'] = worker.result
channel.basic_publish(exchange='', routing_key=f"{prefix}completed", body=json.dumps(data), properties=worker.properties)
channel.basic_ack(delivery_tag=worker.method.delivery_tag)
worker = None

Expand Down

0 comments on commit 8bdb437

Please sign in to comment.