Skip to content

Commit

Permalink
Introduces a job that runs on a per storage basis
Browse files Browse the repository at this point in the history
Cleans up the big job
Update Event handling
Extracts the debounce logic to a module, so it can be reused by other jobs
  • Loading branch information
mereghost committed Jun 28, 2024
1 parent 4f78c9d commit 9808402
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 187 deletions.
49 changes: 49 additions & 0 deletions app/workers/debounceable_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

module DebounceableJob
# This module is generalizes the debounce logic that was originally used on {Storages::ManageStorageIntegrationsJob}
# Basically it ensures that a thread only queues one job per interval.

# it depends on the class method `key` being implemented. The method will receive all the arguments
# used to invoke the job to construct the RequestStore key.
SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds

def debounce(*, **)
store_key = key(*, **)
timestamp = RequestStore.store[store_key]

return false if timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current

result = set(wait: 5.seconds).perform_later(*, **)
RequestStore.store[store_key] = Time.current
result
end
end
2 changes: 2 additions & 0 deletions modules/storages/app/models/storages/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Storage < ApplicationRecord

scope :automatic_management_enabled, -> { where("provider_fields->>'automatically_managed' = 'true'") }

scope :in_project, ->(project_id) { joins(project_storages: :project).where(project_storages: { project_id: }) }

enum health_status: {
pending: "pending",
healthy: "healthy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_CREATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def persist(service_result)
delete_associated_file_links
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_DESTROYED,
project_folder_mode: deletion_result.result.project_folder_mode.to_sym
project_folder_mode: deletion_result.result.project_folder_mode.to_sym,
storage: deletion_result.result.storage
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_UPDATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

module Storages
class AutomaticallyManagedStorageSyncJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency
extend ::DebounceableJob

queue_with_priority :above_normal

good_job_control_concurrency_with(
total_limit: 2,
enqueue_limit: 1,
perform_limit: 1,
key: -> { "StorageSyncJob-#{arguments.last.short_provider_type}-#{arguments.last.id}" }
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, wait: 5, attempts: 10

retry_on Errors::IntegrationJobError, attempts: 5 do |job, error|
if job.executions >= 5
OpenProject::Notifications.send(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY, storage: job.arguments.last, reason: error.message
)
end
end

def self.key(storage) = "sync-#{storage.short_provider_type}-#{storage.id}"

def perform(storage)
return unless storage.configured? && storage.automatically_managed?

sync_result = case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
end

sync_result.on_failure { raise Errors::IntegrationJobError, sync_result.errors.to_s }
sync_result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
module Storages
class ManageStorageIntegrationsJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency

retry_on ::Storages::Errors::IntegrationJobError, attempts: 5
extend ::DebounceableJob

good_job_control_concurrency_with(
total_limit: 2,
Expand All @@ -41,35 +40,15 @@ class ManageStorageIntegrationsJob < ApplicationJob
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
wait: 5.minutes,
attempts: 3
wait: 5,
attempts: 20

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds.freeze
KEY = :manage_nextcloud_integration_job_debounce_happened_at
CRON_JOB_KEY = :"Storages::ManageStorageIntegrationsJob"

queue_with_priority :above_normal

class << self
def debounce
if debounce_happened_in_current_thread_recently?
false
else
# TODO:
# Why there is 5 seconds delay?
# it is like that because for 1 thread and if there is no delay more than
# SINGLE_THREAD_DEBOUNCE_TIME(4.seconds)
# then some events can be lost
#
# Possibly "true" solutions are:
# 1. have after_request middleware to schedule one job after a request cycle
# 2. use concurrent ruby to have 'true' debounce.
result = set(wait: 5.seconds).perform_later
RequestStore.store[KEY] = Time.current
result
end
end

def disable_cron_job_if_needed
if ::Storages::ProjectStorage.active_automatically_managed.exists?
GoodJob::Setting.cron_key_enable(CRON_JOB_KEY) unless GoodJob::Setting.cron_key_enabled?(CRON_JOB_KEY)
Expand All @@ -78,53 +57,12 @@ def disable_cron_job_if_needed
end
end

private

def debounce_happened_in_current_thread_recently?
timestamp = RequestStore.store[KEY]
timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current
end
def key = KEY
end

def perform
failed_sync = []
storage_scope.find_each do |storage|
next unless storage.configured?

result = sync_service_result_for(storage)
result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
result.on_failure { |failed| failed_sync << { storage:, errors: failed.errors } }
end

return emit_error_events(failed_sync) if executions >= 5

raise Errors::IntegrationJobError, failed_sync.first[:errors].to_s if failed_sync.any?
end

private

def emit_error_events(failed_sync)
failed_sync.each do |hash|
OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_UNHEALTHY,
storage: hash[:storage], reason: hash[:errors].to_s)
end
end

def storage_scope
::Storages::Storage.automatic_management_enabled.includes(:oauth_client)
end

def sync_service_result_for(storage)
# For the sake of making this easier to expand, we should register these and
# use the registry to figure out which one to use

case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
Storage.automatic_management_enabled.find_each do |storage|
AutomaticallyManagedStorageSyncJob.perform_later(storage)
end
end
end
Expand Down
29 changes: 19 additions & 10 deletions modules/storages/lib/open_project/storages/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ def self.permissions
[
OpenProject::Events::MEMBER_CREATED,
OpenProject::Events::MEMBER_UPDATED,
OpenProject::Events::MEMBER_DESTROYED,
OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED
OpenProject::Events::MEMBER_DESTROYED
].each do |event|
OpenProject::Notifications.subscribe(event) do |_payload|
::Storages::ManageStorageIntegrationsJob.debounce
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.in_project(payload[:member].project_id).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

[OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.in_project(payload[:project].id).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

Expand Down Expand Up @@ -97,7 +106,7 @@ def self.permissions
].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
if payload[:project_folder_mode] == :automatic
::Storages::ManageStorageIntegrationsJob.debounce
::Storages::AutomaticallyManagedStorageSyncJob.debounce(payload[:storage])
::Storages::ManageStorageIntegrationsJob.disable_cron_job_if_needed
end
end
Expand All @@ -106,13 +115,13 @@ def self.permissions
OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_UNHEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
::Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
end

OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_HEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).healthy
::Storages::HealthService.new(storage: payload[:storage]).healthy
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
subject

expect(OpenProject::Notifications).to(
have_received(:send).with(event, project_folder_mode: mode)
have_received(:send).with(event, project_folder_mode: mode, storage: model_instance.storage)
)
end
end
Expand Down
Loading

0 comments on commit 9808402

Please sign in to comment.