Skip to content

Commit

Permalink
integrate changes from open pull request ckan#81
Browse files Browse the repository at this point in the history
  • Loading branch information
pthiemt committed Jun 22, 2021
1 parent 035728d commit cadaeea
Show file tree
Hide file tree
Showing 17 changed files with 453 additions and 198 deletions.
19 changes: 12 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ To install ckanext-archiver:
config file (by default the config file is located at
``/etc/ckan/default/production.ini``).

5. Install a Celery queue backend - see later section.

6. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::
5. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::

sudo service apache2 reload

Expand Down Expand Up @@ -146,7 +144,7 @@ This is only necessary if you update ckanext-archiver and already have the datab
Installing a Celery queue backend
---------------------------------

Archiver uses Celery to manage its 'queues'. You need to install a queue back-end, such as Redis or RabbitMQ.
Archiver uses ckan jobs to manage its 'queues'. You need to install a queue back-end, such as Redis or RabbitMQ.

Redis backend
-------------
Expand Down Expand Up @@ -236,7 +234,7 @@ Config settings

* ``ckan.site_url`` = URL to your CKAN instance

This is the URL that the archive process (in Celery) will use to access the CKAN API to update it about the cached URLs. If your internal network names your CKAN server differently, then specify this internal name in config option: ``ckan.site_url_internally``
This is the URL that the archive process (in Job) will use to access the CKAN API to update it about the cached URLs. If your internal network names your CKAN server differently, then specify this internal name in config option: ``ckan.site_url_internally``


3. Additional Archiver settings
Expand All @@ -248,6 +246,7 @@ Config settings
* ``ckanext-archiver.max_content_length`` = the maximum size (in bytes) of files to archive (default ``50000000`` =50MB)
* ``ckanext-archiver.user_agent_string`` = identifies the archiver to servers it archives from
* ``ckanext-archiver.verify_https`` = true/false whether you want to verify https connections and therefore fail if it is specified in the URL but does not verify.
* ``ckan.download_proxy`` = URL to a HTTP/S proxy server that will be used to download resources.

4. Nightly report generation

Expand Down Expand Up @@ -286,7 +285,7 @@ applies whatever the format.
Using Archiver
--------------

First, make sure that Celery is running for each queue. For test/local use, you can run::
Prior to Ckan 2.7 make sure that Celery is running for each queue. For test/local use, you can run::

paster --plugin=ckanext-archiver celeryd2 run all -c development.ini

Expand All @@ -298,12 +297,18 @@ However in production you'd run the priority and bulk queues separately, or else
For production use, we recommend setting up Celery to run with supervisord. `apt-get install supervisor` and use `bin/celery-supervisor.conf` as a configuration template.

If you are running CKAN 2.7 or higher, configure job workers instead http://docs.ckan.org/en/2.8/maintaining/background-tasks.html#using-supervisor
For production use, we recommend setting up job workers to run with supervisord. `apt-get install supervisor` and use `bin/supervisor-ckan-archiver.conf` as a configuration template. Which would start running these two commands::

paster --plugin=ckan jobs worker priority -c production.ini
paster --plugin=ckan jobs worker bulk -c production.ini

An archival can be triggered by adding a dataset with a resource or updating a resource URL. Alternatively you can run::

paster --plugin=ckanext-archiver archiver update [dataset] --queue=priority -c <path to CKAN config>

Here ``dataset`` is a CKAN dataset name or ID, or you can omit it to archive all datasets.
Here ``dataset`` is a CKAN dataset name or ID, or you can omit it to archive all datasets. i.e. ::

paster --plugin=ckanext-archiver archiver update -c <path to CKAN config>

For a full list of manual commands run::

Expand Down
81 changes: 81 additions & 0 deletions bin/supervisor-ckan-archiver.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
[unix_http_server]
file=/var/tmp/supervisor.sock

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

; ======================================================================
; Supervisor configuration for CKAN Archiver background bulk jobs worker
; ======================================================================

; 1. Copy this file to /etc/supervisor/conf.d
; 2. Make sure the paths below match your setup


[program:ckan-worker-bulk]

; Use the full paths to the virtualenv and your configuration file here.
command=/usr/lib/ckan/default/bin/paster --plugin=ckan jobs worker bulk --config=/etc/ckan/default/production.ini


; User the jobs bulk worker runs as.
user=ckan


; Start just a single jobs bulk worker.
numprocs=1
process_name=%(program_name)s-%(process_num)02d


; Log files.
stdout_logfile=/var/log/ckan/ckan-worker-bulk.log
stderr_logfile=/var/log/ckan/ckan-worker-bulk.log


; Make sure that the jobs bulk worker is started on system start and automatically
; restarted if it crashes unexpectedly.
autostart=true
autorestart=true


; Number of seconds the process has to run before it is considered to have
; started successfully.
startsecs=10

; ==========================================================================
; Supervisor configuration for CKAN Archiver background priority jobs worker
; ==========================================================================

; 1. Copy this file to /etc/supervisor/conf.d
; 2. Make sure the paths below match your setup


[program:ckan-worker-priority]

; Use the full paths to the virtualenv and your configuration file here.
command=/usr/lib/ckan/default/bin/paster --plugin=ckan jobs worker priority --config=/etc/ckan/default/production.ini


; User the jobs priority worker runs as.
user=ckan


; Start just a single jobs priority worker.
numprocs=1
process_name=%(program_name)s-%(process_num)02d


; Log files.
stdout_logfile=/var/log/ckan/ckan-worker-priority.log
stderr_logfile=/var/log/ckan/ckan-worker-priority.log


; Make sure that the jobs priority worker is started on system start and automatically
; restarted if it crashes unexpectedly.
autostart=true
autorestart=true


; Number of seconds the process has to run before it is considered to have
; started successfully.
startsecs=10
2 changes: 1 addition & 1 deletion ckanext/archiver/bin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ def get_resources(state='active', publisher_ref=None, resource_id=None,
resources = resources.filter(model.Resource.id == resource_id)
criteria.append('Resource:%s' % resource_id)
resources = resources.all()
print '%i resources (%s)' % (len(resources), ' '.join(criteria))
print('%i resources (%s)' % (len(resources), ' '.join(criteria)))
return resources
8 changes: 4 additions & 4 deletions ckanext/archiver/bin/migrate_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ def migrate(options):
model.Session.add(archival)
add_stat('Added to archival table', res, stats)

print 'Summary\n', stats.report()
print('Summary\n', stats.report())
if options.write:
model.repo.commit_and_remove()
print 'Written'
print('Written')


def add_stat(outcome, res, stats, extra_info=None):
Expand Down Expand Up @@ -153,10 +153,10 @@ def date_str_to_datetime_or_none(date_str):
if len(args) != 1:
parser.error('Wrong number of arguments (%i)' % len(args))
config_ini = args[0]
print 'Loading CKAN config...'
print('Loading CKAN config...')
common.load_config(config_ini)
common.register_translator()
print 'Done'
print('Done')
# Setup logging to print debug out for theme stuff only
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.WARNING)
Expand Down
12 changes: 6 additions & 6 deletions ckanext/archiver/bin/running_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package_stats.increment('deleted')
else:
package_stats.increment('not deleted')
print package_stats.report()
print(package_stats.report())
> deleted: 30
> not deleted: 70
Expand All @@ -26,7 +26,7 @@
package_stats.add('deleted', package.name)
else:
package_stats.add('not deleted' package.name)
print package_stats.report()
print(package_stats.report())
> deleted: 30 pollution-uk, flood-regions, river-quality, ...
> not deleted: 70 spending-bristol, ...
Expand All @@ -42,7 +42,7 @@ class StatsCount(dict):
report_value_limit = 150

def __init__(self, *args, **kwargs):
self._start_time = datetime.datetime.now()
self._start_time = datetime.datetime.utcnow()
super(StatsCount, self).__init__(*args, **kwargs)

def _init_category(self, category):
Expand Down Expand Up @@ -80,7 +80,7 @@ def report(self, indent=1, order_by_title=False, show_time_taken=True):
lines = [indent_str + 'None']

if show_time_taken:
time_taken = datetime.datetime.now() - self._start_time
time_taken = datetime.datetime.utcnow() - self._start_time
lines.append(indent_str + 'Time taken (h:m:s): %s' % time_taken)
return '\n'.join(lines)

Expand Down Expand Up @@ -110,6 +110,6 @@ def report_value(self, category):
package_stats.add('Success', 'good3')
package_stats.add('Success', 'good4')
package_stats.add('Failure', 'bad1')
print package_stats.report()
print(package_stats.report())

print StatsList().report()
print(StatsList().report())
14 changes: 7 additions & 7 deletions ckanext/archiver/command_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def command(self):
Parse command line arguments and call appropriate method.
"""
if not self.args or self.args[0] in ['--help', '-h', 'help']:
print self.usage
print(self.usage)
sys.exit(1)

cmd = self.args[0]
Expand All @@ -71,7 +71,7 @@ def command(self):
concurrency=int(self.options.concurrency),
hostname=self.options.hostname)
else:
print 'Command %s not recognized' % cmd
print('Command %s not recognized' % cmd)
sys.exit(1)

def run_(self, loglevel='INFO', queue=None, concurrency=None,
Expand All @@ -83,7 +83,7 @@ def run_(self, loglevel='INFO', queue=None, concurrency=None,
elif os.path.isfile(default_ini):
os.environ['CKAN_CONFIG'] = default_ini
else:
print 'No .ini specified and none was found in current directory'
print('No .ini specified and none was found in current directory')
sys.exit(1)

# from ckan.lib.celery_app import celery
Expand All @@ -97,7 +97,7 @@ def run_(self, loglevel='INFO', queue=None, concurrency=None,
celery_args.append('--loglevel=%s' % loglevel)

argv = ['celeryd'] + celery_args
print 'Running: %s' % ' '.join(argv)
print('Running: %s' % ' '.join(argv))
celery_app = self._celery_app()
celery_app.worker_main(argv=argv)

Expand All @@ -118,9 +118,9 @@ def _celery_app(self):
celery_config['CELERY_IMPORTS'].extend(
entry_point.load()()
)
except VersionConflict, e:
except VersionConflict as e:
error = 'ERROR in entry point load: %s %s' % (entry_point, e)
print error
print(error)
pass

LIST_PARAMS = 'CELERY_IMPORTS ADMINS ROUTES'.split()
Expand All @@ -130,7 +130,7 @@ def _celery_app(self):
if key in LIST_PARAMS else value
except ConfigParser.NoSectionError:
error = 'Could not find celery config in your ckan ini file (a section headed "[app:celery]".'
print error
print(error)
sys.exit(1)

celery_app = Celery()
Expand Down
Loading

0 comments on commit cadaeea

Please sign in to comment.