Skip to content

Commit

Permalink
Introduces a job that runs on a per storage basis
Browse files Browse the repository at this point in the history
Cleans up the big job
Update Event handling
  • Loading branch information
mereghost committed Jun 26, 2024
1 parent c65d77a commit c2d68d2
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_CREATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def persist(service_result)
delete_associated_file_links
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_DESTROYED,
project_folder_mode: deletion_result.result.project_folder_mode.to_sym
project_folder_mode: deletion_result.result.project_folder_mode.to_sym,
storage: deletion_result.result.storage
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def after_perform(service_call)
add_historical_data(service_call) if project_folder_mode != :inactive
OpenProject::Notifications.send(
OpenProject::Events::PROJECT_STORAGE_UPDATED,
project_folder_mode:
project_folder_mode:,
storage: project_storage.storage
)

service_call
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#-- copyright
#++

module Storages
class AutomaticallyManagedStorageSyncJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds
class << self
def debounce(storage)
key = "sync-#{storage.short_provider_type}-#{storage.id}"
timestamp = RequestStore.store[key]

return false if timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current

result = set(wait: 5.seconds).perform_later(storage)
RequestStore.store[key] = Time.current
result
end
end

good_job_control_concurrency_with(
total_limit: 2,
enqueue_limit: 1,
perform_limit: 1,
key: -> { "StorageSyncJob-#{arguments.last.short_provider_type}-#{arguments.last.id}" }
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, wait: 5, attempts: 10

retry_on Errors::IntegrationJobError, attempts: 5 do |job, error|
if job.executions >= 5
OpenProject::Notifications.send(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY, storage: job.arguments.last, reason: error.message
)
end
end

def perform(storage)
return unless storage.configured? && storage.automatically_managed?

sync_result = case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
end

sync_result.on_failure { raise Errors::IntegrationJobError, sync_result.errors.to_s }
sync_result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ module Storages
class ManageStorageIntegrationsJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency

retry_on ::Storages::Errors::IntegrationJobError, attempts: 5

good_job_control_concurrency_with(
total_limit: 2,
enqueue_limit: 1,
perform_limit: 1
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
wait: 5.minutes,
attempts: 3
wait: 5,
attempts: 20

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds.freeze
KEY = :manage_nextcloud_integration_job_debounce_happened_at
Expand Down Expand Up @@ -87,44 +85,8 @@ def debounce_happened_in_current_thread_recently?
end

def perform
failed_sync = []
storage_scope.find_each do |storage|
next unless storage.configured?

result = sync_service_result_for(storage)
result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
result.on_failure { |failed| failed_sync << { storage:, errors: failed.errors } }
end

return emit_error_events(failed_sync) if executions >= 5

raise Errors::IntegrationJobError, failed_sync.first[:errors].to_s if failed_sync.any?
end

private

def emit_error_events(failed_sync)
failed_sync.each do |hash|
OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_UNHEALTHY,
storage: hash[:storage], reason: hash[:errors].to_s)
end
end

def storage_scope
::Storages::Storage.automatic_management_enabled.includes(:oauth_client)
end

def sync_service_result_for(storage)
# For the sake of making this easier to expand, we should register these and
# use the registry to figure out which one to use

case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
Storage.automatic_management_enabled.find_each do |storage|
AutomaticallyManagedStorageSyncJob.perform_later(storage)
end
end
end
Expand Down
31 changes: 21 additions & 10 deletions modules/storages/lib/open_project/storages/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,25 @@ def self.permissions
[
OpenProject::Events::MEMBER_CREATED,
OpenProject::Events::MEMBER_UPDATED,
OpenProject::Events::MEMBER_DESTROYED,
OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED
OpenProject::Events::MEMBER_DESTROYED
].each do |event|
OpenProject::Notifications.subscribe(event) do |_payload|
::Storages::ManageStorageIntegrationsJob.debounce
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.joins(project_storages: :project)
.where(project_storages: { project_id: payload[:member].project_id }).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

[OpenProject::Events::PROJECT_UPDATED,
OpenProject::Events::PROJECT_RENAMED,
OpenProject::Events::PROJECT_ARCHIVED,
OpenProject::Events::PROJECT_UNARCHIVED].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
::Storages::Storage.joins(project_storages: :project)
.where(project_storages: { project: payload[:project] }).find_each do |storage|
::Storages::AutomaticallyManagedStorageSyncJob.debounce(storage)
end
end
end

Expand Down Expand Up @@ -97,7 +108,7 @@ def self.permissions
].each do |event|
OpenProject::Notifications.subscribe(event) do |payload|
if payload[:project_folder_mode] == :automatic
::Storages::ManageStorageIntegrationsJob.debounce
::Storages::AutomaticallyManagedStorageSyncJob.debounce(payload[:storage])
::Storages::ManageStorageIntegrationsJob.disable_cron_job_if_needed
end
end
Expand All @@ -106,13 +117,13 @@ def self.permissions
OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_UNHEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
::Storages::HealthService.new(storage: payload[:storage]).unhealthy(reason: payload[:reason])
end

OpenProject::Notifications.subscribe(
::OpenProject::Events::STORAGE_TURNED_HEALTHY
) do |payload|
Storages::HealthService.new(storage: payload[:storage]).healthy
::Storages::HealthService.new(storage: payload[:storage]).healthy
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
subject

expect(OpenProject::Notifications).to(
have_received(:send).with(event, project_folder_mode: mode)
have_received(:send).with(event, project_folder_mode: mode, storage: model_instance.storage)
)
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

require "spec_helper"
require_module_spec_helper

RSpec.describe Storages::AutomaticallyManagedStorageSyncJob, type: :job do
let(:managed_nextcloud) { create(:nextcloud_storage_configured, :as_automatically_managed) }

describe ".debounce" do
context "when has been debounced by other thread" do
it "does not change the number of enqueued jobs" do
expect(performed_jobs.count).to eq(0)
expect(described_class.debounce(managed_nextcloud).successfully_enqueued?).to be(true)
expect(described_class.debounce(managed_nextcloud)).to be(false)
expect(enqueued_jobs.count).to eq(1)

expect { described_class.debounce(managed_nextcloud) }.not_to change(enqueued_jobs, :count)
end
end

context "when has not been debounced by other thread" do
before { RequestStore.delete("sync-nextcloud-#{managed_nextcloud.id}") }

it "schedules a job" do
expect { described_class.debounce(managed_nextcloud) }.to change(enqueued_jobs, :count).from(0).to(1)
end
end
end

describe ".perform" do
subject(:job_instance) { described_class.new }

it "only runs for automatically managed storages" do
unmanaged_nextcloud = create(:nextcloud_storage_configured, :as_not_automatically_managed)

allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success)

job_instance.perform(managed_nextcloud)
job_instance.perform(unmanaged_nextcloud)

expect(Storages::NextcloudGroupFolderPropertiesSyncService).to have_received(:call).with(managed_nextcloud)
expect(Storages::NextcloudGroupFolderPropertiesSyncService).not_to have_received(:call).with(unmanaged_nextcloud)
end

it "marks storage as healthy if sync was successful" do
allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success)

Timecop.freeze("2023-03-14T15:17:00Z") do
expect do
job_instance.perform(managed_nextcloud)
managed_nextcloud.reload
end.to(
change(managed_nextcloud, :health_changed_at).to(Time.now.utc)
.and(change(managed_nextcloud, :health_status).from("pending").to("healthy"))
)
end
end

it "marks storage as unhealthy if sync was unsuccessful" do
job = class_double(Storages::HealthStatusMailerJob)
allow(Storages::HealthStatusMailerJob).to receive(:set).and_return(job)
allow(job).to receive(:perform_later)

allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call)
.with(managed_nextcloud)
.and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :not_found)))

Timecop.freeze("2023-03-14T15:17:00Z") do
expect do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }
managed_nextcloud.reload
end.to(
change(managed_nextcloud, :health_changed_at).to(Time.now.utc)
.and(change(managed_nextcloud, :health_status).from("pending").to("unhealthy"))
.and(change(managed_nextcloud, :health_reason).from(nil).to("not_found"))
)
end
end

context "when Storages::Errors::IntegrationJobError is raised" do
before do
allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud)
.and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :custom_error)))

allow(OpenProject::Notifications).to receive(:send)
end

it "retries the job" do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }
performed_jobs = described_class.queue_adapter.performed_jobs

expect(performed_jobs.last.dig("exception_executions", "[Storages::Errors::IntegrationJobError]")).to eq(5)
end

it "sends a notification after the maximum number of attempts" do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }

expect(OpenProject::Notifications).to have_received(:send).with(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY,
storage: managed_nextcloud,
reason: "custom_error"
)
end
end
end
end
Loading

0 comments on commit c2d68d2

Please sign in to comment.