Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces a job that runs the AMPF sync on a per storage basis #15979

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions app/workers/debounceable_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

module DebounceableJob
# This module is generalizes the debounce logic that was originally used on {Storages::ManageStorageIntegrationsJob}
# Basically it ensures that a thread only queues one job per interval.

# it depends on the class method `key` being implemented. The method will receive all the arguments
# used to invoke the job to construct the RequestStore key.
SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds

def debounce(*, **)
store_key = key(*, **)
timestamp = RequestStore.store[store_key]

return false if timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current

result = set(wait: 5.seconds).perform_later(*, **)
RequestStore.store[store_key] = Time.current
result
end
end
2 changes: 2 additions & 0 deletions modules/storages/app/models/storages/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Storage < ApplicationRecord

scope :automatic_management_enabled, -> { where("provider_fields->>'automatically_managed' = 'true'") }

scope :in_project, ->(project_id) { joins(project_storages: :project).where(project_storages: { project_id: }) }

enum health_status: {
pending: "pending",
healthy: "healthy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_CREATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def persist(service_result)
delete_associated_file_links
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_DESTROYED,
project_folder_mode: deletion_result.result.project_folder_mode.to_sym
project_folder_mode: deletion_result.result.project_folder_mode.to_sym,
storage: deletion_result.result.storage
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_UPDATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

module Storages
class AutomaticallyManagedStorageSyncJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency
extend ::DebounceableJob

queue_with_priority :above_normal

good_job_control_concurrency_with(
total_limit: 2,
enqueue_limit: 1,
perform_limit: 1,
key: -> { "StorageSyncJob-#{arguments.last.short_provider_type}-#{arguments.last.id}" }
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, wait: 5, attempts: 10

retry_on Errors::IntegrationJobError, attempts: 5 do |job, error|
if job.executions >= 5
OpenProject::Notifications.send(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY, storage: job.arguments.last, reason: error.message
)
end
end

def self.key(storage) = "sync-#{storage.short_provider_type}-#{storage.id}"

def perform(storage)
return unless storage.configured? && storage.automatically_managed?

sync_result = case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
end

sync_result.on_failure { raise Errors::IntegrationJobError, sync_result.errors.to_s }
sync_result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
module Storages
class ManageStorageIntegrationsJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency

retry_on ::Storages::Errors::IntegrationJobError, attempts: 5
extend ::DebounceableJob

good_job_control_concurrency_with(
total_limit: 2,
Expand All @@ -41,35 +40,15 @@ class ManageStorageIntegrationsJob < ApplicationJob
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
wait: 5.minutes,
attempts: 3
wait: 5,
attempts: 20

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds.freeze
KEY = :manage_nextcloud_integration_job_debounce_happened_at
CRON_JOB_KEY = :"Storages::ManageStorageIntegrationsJob"

queue_with_priority :above_normal

class << self
def debounce
if debounce_happened_in_current_thread_recently?
false
else
# TODO:
# Why there is 5 seconds delay?
# it is like that because for 1 thread and if there is no delay more than
# SINGLE_THREAD_DEBOUNCE_TIME(4.seconds)
# then some events can be lost
#
# Possibly "true" solutions are:
# 1. have after_request middleware to schedule one job after a request cycle
# 2. use concurrent ruby to have 'true' debounce.
result = set(wait: 5.seconds).perform_later
RequestStore.store[KEY] = Time.current
result
end
end

def disable_cron_job_if_needed
if ::Storages::ProjectStorage.active_automatically_managed.exists?
GoodJob::Setting.cron_key_enable(CRON_JOB_KEY) unless GoodJob::Setting.cron_key_enabled?(CRON_JOB_KEY)
Expand All @@ -78,53 +57,12 @@ def disable_cron_job_if_needed
end
end

private

def debounce_happened_in_current_thread_recently?
timestamp = RequestStore.store[KEY]
timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current
end
def key = KEY
end

def perform
failed_sync = []
storage_scope.find_each do |storage|
next unless storage.configured?

result = sync_service_result_for(storage)
result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
result.on_failure { |failed| failed_sync << { storage:, errors: failed.errors } }
end

return emit_error_events(failed_sync) if executions >= 5

raise Errors::IntegrationJobError, failed_sync.first[:errors].to_s if failed_sync.any?
end

private

def emit_error_events(failed_sync)
failed_sync.each do |hash|
OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_UNHEALTHY,
storage: hash[:storage], reason: hash[:errors].to_s)
end
end

def storage_scope
::Storages::Storage.automatic_management_enabled.includes(:oauth_client)
end

def sync_service_result_for(storage)
# For the sake of making this easier to expand, we should register these and
# use the registry to figure out which one to use

case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
Storage.automatic_management_enabled.find_each do |storage|
AutomaticallyManagedStorageSyncJob.perform_later(storage)
end
end
end
Expand Down
29 changes: 19 additions & 10 deletions modules/storages/lib/open_project/storages/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ def self.permissions
[
OpenProject::Events::MEMBER_CREATED,
OpenProject::Events::MEMBER_UPDATED,
OpenProject::Events::MEMBER_DESTROYED,
OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED
OpenProject::Events::MEMBER_DESTROYED
].each do |event|
OpenProject::Notifications.subscribe(event) do |_payload|
::Storages::ManageStorageIntegrationsJob.debounce
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.in_project(payload[:member].project_id).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

[OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.in_project(payload[:project].id).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

Expand Down Expand Up @@ -97,7 +106,7 @@ def self.permissions
].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
if payload[:project_folder_mode] == :automatic
::Storages::ManageStorageIntegrationsJob.debounce
::Storages::AutomaticallyManagedStorageSyncJob.debounce(payload[:storage])
::Storages::ManageStorageIntegrationsJob.disable_cron_job_if_needed
end
end
Expand All @@ -106,13 +115,13 @@ def self.permissions
OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_UNHEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
::Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
end

OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_HEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).healthy
::Storages::HealthService.new(storage: payload[:storage]).healthy
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
subject

expect(OpenProject::Notifications).to(
have_received(:send).with(event, project_folder_mode: mode)
have_received(:send).with(event, project_folder_mode: mode, storage: model_instance.storage)
)
end
end
Expand Down
Loading
Loading