Skip to content

Know how: site_hourly_aggregator is configured and works

Dan Mushkevych edited this page Apr 9, 2015 · 2 revisions

Synergy Scheduler comes with an illustration suite. This page describes in details Site Hourly Aggregator and how it works. Following components play their parts:

  • Synergy Supervisor: daemon that starts and monitors synergy processes; kills/restarts them if needed
  • Event Generator: publishes messages to RabbitMQ simulating users activity on the web-site
  • Single Session Worker: listens to RabbitMQ and forms Session object out of separated user's actions
  • Site Hourly Aggregator: parses session objects for a particular hourly timeperiod and aggregates them into a <site hourly> statistics
  • GarbageCollector: reviews if any unit_of_work have failed and needs to be re-triggered
  • Synergy Scheduler: responsible for triggering data processing and updating job state

Synergy Scheduler starts with building each tree in the timetable. This information is taken from context.py. For instance, following block instructs scheduler to construct a tree enclosing site yearly, monthly, daily and hourly processes:

TREE_SITE_VERTICAL: _timetable_context_entry(
    tree_name=TREE_SITE_VERTICAL,
    enclosed_processes=[PROCESS_SITE_YEARLY, PROCESS_SITE_MONTHLY, PROCESS_SITE_DAILY, PROCESS_SITE_HOURLY],
    dependent_on=[],
    mx_name=TOKEN_SITE,
    mx_page=MX_PAGE_TRAFFIC),

Each of referenced processes must be declared in the context.py as in the following snippet:

PROCESS_SITE_HOURLY: managed_context_entry(
    process_name=PROCESS_SITE_HOURLY,
    classname='workers.site_hourly_aggregator.SiteHourlyAggregator.start',
    token=TOKEN_SITE,
    time_qualifier=QUALIFIER_HOURLY,
    source=COLLECTION_SINGLE_SESSION,
    sink=COLLECTION_SITE_HOURLY,
    state_machine_name=STATE_MACHINE_CONTINUOUS,
    blocking_type=BLOCKING_NORMAL,
    trigger_frequency='every 600',
    present_on_boxes=['dev.*'])

Above block will start a Scheduler thread that wakes at the trigger_frequency. In presented case, we have SiteHourlyAggregator being triggered every 600 secs (10 min) to sum-up all information for given hourly timeperiod. Supervisor will manage SiteHourlyAggregator on any box with BOX_ID starting with "dev".

As soon as Scheduler thread, associated with SiteHourlyAggregator, wakes up - it identifies most current timeperiod for this process (i.e. most recent that has not been yet finalized), computes its LOWER and UPPER boundaries (in terms of object in the source table) and either:

  • Fires a new unit_of_work for this timeperiod
    This happens if for given timeperiod there are no associated unit_of_work or they are in STATE_IN_PROGRESS, STATE_PROCESSED, STATE_CANCELED.
  • Updates existing unit_of_work, if its state is STATE_REQUESTED or STATE_INVALID and publishes its ObjectId only to MQ
  • No new unit_of_work are published. This happens if there are no unresolved dependencies and its latest unit_of_work is resolved. Timeperiod is marked as either STATE_PROCESSED or STATE_SKIPPED.

Special case is when process is still busy with processing previous unit_of_work and Scheduler sends in new one. In this case process will:

  • Keep processing its current unit_of_work
  • All units of work that were issued by Synergy Scheduler in the meanwhile - are subjects to GarbageCollector life-support time window
    I.e. they can be canceled by Scheduler if they are not processed in 48 hours - thus taking our excessive computational overhead in case of improper Scheduler configuration or underestimated hardware provisioning.
  • Under normal circumstances Synergy Scheduler will issue one new unit_of_work, and will keep updating it until either GarbageCollector marks it as STATE_SKIPPED or it is finally processed.