diff --git a/modules/storages/app/workers/storages/automatically_managed_storage_sync_job.rb b/modules/storages/app/workers/storages/automatically_managed_storage_sync_job.rb new file mode 100644 index 000000000000..baede8396cf3 --- /dev/null +++ b/modules/storages/app/workers/storages/automatically_managed_storage_sync_job.rb @@ -0,0 +1,54 @@ +#-- copyright +#++ + +module Storages + class AutomaticallyManagedStorageSyncJob < ApplicationJob + include GoodJob::ActiveJobExtensions::Concurrency + + SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds + class << self + def debounce(storage) + key = "sync-#{storage.short_provider_type}-#{storage.id}" + timestamp = RequestStore.store[key] + return false if timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current + + result = set(wait: 5.seconds).perform_later(storage) + RequestStore.store[key] = Time.current + result + end + end + + good_job_control_concurrency_with( + total_limit: 2, + enqueue_limit: 1, + perform_limit: 1, + key: -> { "StorageSyncJob-#{arguments.last.short_provider_type}-#{arguments.last.id}" } + ) + + retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, wait: 5, attempts: 10 + + retry_on Errors::IntegrationJobError, attempts: 5 do |job, error| + if job.executions >= 5 + OpenProject::Notifications.send( + OpenProject::Events::STORAGE_TURNED_UNHEALTHY, storage: job.arguments.last, reason: error.message + ) + end + end + + def perform(storage) + return unless storage.configured? && storage.automatically_managed? + + sync_result = case storage.short_provider_type + when "nextcloud" + NextcloudGroupFolderPropertiesSyncService.call(storage) + when "one_drive" + OneDriveManagedFolderSyncService.call(storage) + else + raise "Unknown Storage Type" + end + + sync_result.on_failure { raise Errors::IntegrationJobError, sync_result.errors.to_s } + sync_result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) } + end + end +end diff --git a/modules/storages/app/workers/storages/manage_storage_integrations_job.rb b/modules/storages/app/workers/storages/manage_storage_integrations_job.rb index 93c3d2a066db..ebf9b5b54f21 100644 --- a/modules/storages/app/workers/storages/manage_storage_integrations_job.rb +++ b/modules/storages/app/workers/storages/manage_storage_integrations_job.rb @@ -41,8 +41,8 @@ class ManageStorageIntegrationsJob < ApplicationJob ) retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, - wait: 5.minutes, - attempts: 3 + wait: 5, + attempts: 20 SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds.freeze KEY = :manage_nextcloud_integration_job_debounce_happened_at diff --git a/modules/storages/spec/workers/storages/automatically_managed_storage_sync_job_spec.rb b/modules/storages/spec/workers/storages/automatically_managed_storage_sync_job_spec.rb new file mode 100644 index 000000000000..32979cc56458 --- /dev/null +++ b/modules/storages/spec/workers/storages/automatically_managed_storage_sync_job_spec.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +#-- copyright +# OpenProject is an open source project management software. +# Copyright (C) 2012-2024 the OpenProject GmbH +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License version 3. +# +# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows: +# Copyright (C) 2006-2013 Jean-Philippe Lang +# Copyright (C) 2010-2013 the ChiliProject Team +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# See COPYRIGHT and LICENSE files for more details. +#++ + +require "spec_helper" +require_module_spec_helper + +RSpec.describe Storages::AutomaticallyManagedStorageSyncJob, type: :job do + let(:managed_nextcloud) { create(:nextcloud_storage_configured, :as_automatically_managed) } + + describe ".debounce", with_good_job: [described_class] do + context "when has been debounced by other thread" do + it "does not change the number of enqueued jobs" do + expect(GoodJob::Job.count).to eq(0) + expect(described_class.perform_later(managed_nextcloud).successfully_enqueued?).to be(true) + expect(described_class.perform_later(managed_nextcloud)).to be(false) + expect(GoodJob::Job.count).to eq(1) + + expect { described_class.debounce(managed_nextcloud) }.not_to change(GoodJob::Job, :count) + end + end + + context "when has not been debounced by other thread" do + before { RequestStore.delete("sync-nextcloud-#{managed_nextcloud.id}") } + + it "schedules a job" do + expect { described_class.debounce(managed_nextcloud) }.to change(GoodJob::Job, :count).from(0).to(1) + end + + # it "tries to schedule once when called 1000 times in a short period of time" do + # expect_any_instance_of(ActiveJob::ConfiguredJob) + # .to receive(:perform_later).once.and_call_original + # + # expect do + # 1000.times { described_class.debounce(managed_nextcloud) } + # end.to change(GoodJob::Job, :count).from(0).to(1) + # end + end + end + + describe ".perform" do + subject(:job_instance) { described_class.new } + + it "only runs for automatically managed storages" do + unmanaged_nextcloud = create(:nextcloud_storage_configured, :as_not_automatically_managed) + + allow(Storages::NextcloudGroupFolderPropertiesSyncService) + .to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success) + + job_instance.perform(managed_nextcloud) + job_instance.perform(unmanaged_nextcloud) + + expect(Storages::NextcloudGroupFolderPropertiesSyncService).to have_received(:call).with(managed_nextcloud) + expect(Storages::NextcloudGroupFolderPropertiesSyncService).not_to have_received(:call).with(unmanaged_nextcloud) + end + + it "marks storage as healthy if sync was successful" do + allow(Storages::NextcloudGroupFolderPropertiesSyncService) + .to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success) + + Timecop.freeze("2023-03-14T15:17:00Z") do + expect do + job_instance.perform(managed_nextcloud) + managed_nextcloud.reload + end.to( + change(managed_nextcloud, :health_changed_at).to(Time.now.utc) + .and(change(managed_nextcloud, :health_status).from("pending").to("healthy")) + ) + end + end + + it "marks storage as unhealthy if sync was unsuccessful" do + job = class_double(Storages::HealthStatusMailerJob) + allow(Storages::HealthStatusMailerJob).to receive(:set).and_return(job) + allow(job).to receive(:perform_later) + + allow(Storages::NextcloudGroupFolderPropertiesSyncService) + .to receive(:call) + .with(managed_nextcloud) + .and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :not_found))) + + Timecop.freeze("2023-03-14T15:17:00Z") do + expect do + perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) } + managed_nextcloud.reload + end.to( + change(managed_nextcloud, :health_changed_at).to(Time.now.utc) + .and(change(managed_nextcloud, :health_status).from("pending").to("unhealthy")) + .and(change(managed_nextcloud, :health_reason).from(nil).to("not_found")) + ) + end + end + + context "when Storages::Errors::IntegrationJobError is raised" do + before do + allow(Storages::NextcloudGroupFolderPropertiesSyncService) + .to receive(:call).with(managed_nextcloud) + .and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :custom_error))) + + allow(OpenProject::Notifications).to receive(:send) + end + + it "retries the job" do + perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) } + performed_jobs = described_class.queue_adapter.performed_jobs + + expect(performed_jobs.last.dig("exception_executions", "[Storages::Errors::IntegrationJobError]")).to eq(5) + end + + it "sends a notification after the maximum number of attempts" do + perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) } + + expect(OpenProject::Notifications).to have_received(:send).with( + OpenProject::Events::STORAGE_TURNED_UNHEALTHY, + storage: managed_nextcloud, + reason: "custom_error" + ) + end + end + end +end diff --git a/modules/storages/spec/workers/storages/manage_storage_integrations_job_spec.rb b/modules/storages/spec/workers/storages/manage_storage_integrations_job_spec.rb index 620e9d87dcf9..042de511c129 100644 --- a/modules/storages/spec/workers/storages/manage_storage_integrations_job_spec.rb +++ b/modules/storages/spec/workers/storages/manage_storage_integrations_job_spec.rb @@ -31,7 +31,7 @@ require "spec_helper" require_module_spec_helper -RSpec.describe Storages::ManageStorageIntegrationsJob, :webmock, type: :job do +RSpec.describe Storages::ManageStorageIntegrationsJob, type: :job do describe ".debounce" do context "when has been debounced by other thread" do before { ActiveJob::Base.disable_test_adapter } @@ -165,11 +165,11 @@ allow(Storages::NextcloudGroupFolderPropertiesSyncService) .to receive(:call).with(storage1) .and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :custom_error))) - end - it "retries the job" do allow(OpenProject::Notifications).to receive(:send) + end + it "retries the job" do perform_enqueued_jobs { described_class.perform_later } expect(described_class @@ -178,8 +178,6 @@ end it "sends a notification after the maximum number of attempts" do - allow(OpenProject::Notifications).to receive(:send) - perform_enqueued_jobs { described_class.perform_later } expect(OpenProject::Notifications).to have_received(:send).with(