Skip to content

Commit

Permalink
WiP - Jobtastic is broken with Clery 3.1 see PolicyStat/jobtastic#28
Browse files Browse the repository at this point in the history
  • Loading branch information
wengole committed Mar 4, 2015
1 parent c99e980 commit 2b2c980
Showing 1 changed file with 79 additions and 3 deletions.
82 changes: 79 additions & 3 deletions src/snapshots/tasks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
from __future__ import absolute_import
from datetime import datetime
from django.utils.timezone import get_default_timezone_name
import logging
from time import sleep
import os

from celery import task
from celery import task, group
from django.utils.timezone import get_default_timezone_name
from jobtastic import JobtasticTask
import magic
import pytz

from .models import File
from .models import File, Filesystem


logger = logging.getLogger(__name__)


@task
def create_file_object(full_path, snapshot=None, directory=False):
logger.info('Adding %s: %s',
('directory' if directory else 'file'), full_path)
statinfo = os.stat(full_path)
mtime = datetime.fromtimestamp(statinfo.st_mtime)
mtime = pytz.timezone(get_default_timezone_name()).localize(mtime)
Expand All @@ -32,3 +40,71 @@ def create_file_object(full_path, snapshot=None, directory=False):
modified=mtime,
size=statinfo.st_size,
)


class ReindexFilesystem(JobtasticTask):
"""
Task to walk a given filesystem (which may be a snapshot) and index all
files and directories within it
"""
significant_kwargs = [
('fs_name', str)
]
cache_duration = 0
# Shouldn't take longer than about 20 minutes
herd_avoidance_timeout = 1200
total_files = 0
groups = []

@property
def jobs(self):
"""
The individual jobs in all groups currently queued
"""
return [job for group in self.groups for job in group.children]

@property
def done_files(self):
"""
The number of files that have been indexed so far
"""
done = sum([int(x.ready()) for x in self.jobs])
return done

@property
def work_to_do(self):
to_do = any([not job.ready() for job in self.groups])
return to_do

def calculate_result(self, fs_name):
try:
fs = Filesystem.objects.get(
name=fs_name
)
except Filesystem.DoesNotExist:
logger.error('Filesystem "%s" does not exist', fs_name)
return
for dirname, subdirs, files in fs.walk_fs():
logger.info('Adding subdirs for %s', dirname)
self.total_files += len(subdirs)
subdirs_job = group([create_file_object.s(
full_path=u'%s/%s' % (dirname, s),
directory=True
) for s in subdirs])
self.groups.append(subdirs_job.apply_async())

logger.info('Adding files for %s', dirname)
self.total_files += len(files)
files_job = group([create_file_object.s(
full_path=u'%s/%s' % (dirname, f)
) for f in files])
self.groups.append(files_job.apply_async())
self.update_progress(self.done_files, self.total_files, 10)
logger.info('All files and directories queued')
if self.work_to_do:
logger.info('Still waiting on %d jobs',
self.total_files - self.done_files)
while self.work_to_do:
sleep(5)
logger.info('Still waiting on %d jobs',
self.total_files - self.done_files)

0 comments on commit 2b2c980

Please sign in to comment.