Skip to content

Commit

Permalink
Introduces a job that runs on a per storage basis
Browse files Browse the repository at this point in the history
  • Loading branch information
mereghost committed Jun 25, 2024
1 parent 7685566 commit 4eb1816
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#-- copyright
#++

module Storages
class AutomaticallyManagedStorageSyncJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds
class << self
def debounce(storage)
key = "sync-#{storage.short_provider_type}-#{storage.id}"
timestamp = RequestStore.store[key]
return false if timestamp.present? && (timestamp + SINGLE_THREAD_DEBOUNCE_TIME) > Time.current

result = set(wait: 5.seconds).perform_later(storage)
RequestStore.store[key] = Time.current
result
end
end

good_job_control_concurrency_with(
total_limit: 2,
enqueue_limit: 1,
perform_limit: 1,
key: -> { "StorageSyncJob-#{arguments.last.short_provider_type}-#{arguments.last.id}" }
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, wait: 5, attempts: 10

retry_on Errors::IntegrationJobError, attempts: 5 do |job, error|
if job.executions >= 5
OpenProject::Notifications.send(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY, storage: job.arguments.last, reason: error.message
)
end
end

def perform(storage)
return unless storage.configured? && storage.automatically_managed?

sync_result = case storage.short_provider_type
when "nextcloud"
NextcloudGroupFolderPropertiesSyncService.call(storage)
when "one_drive"
OneDriveManagedFolderSyncService.call(storage)
else
raise "Unknown Storage Type"
end

sync_result.on_failure { raise Errors::IntegrationJobError, sync_result.errors.to_s }
sync_result.on_success { OpenProject::Notifications.send(OpenProject::Events::STORAGE_TURNED_HEALTHY, storage:) }
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class ManageStorageIntegrationsJob < ApplicationJob
)

retry_on GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
wait: 5.minutes,
attempts: 3
wait: 5,
attempts: 20

SINGLE_THREAD_DEBOUNCE_TIME = 4.seconds.freeze
KEY = :manage_nextcloud_integration_job_debounce_happened_at
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

require "spec_helper"
require_module_spec_helper

RSpec.describe Storages::AutomaticallyManagedStorageSyncJob, type: :job do
let(:managed_nextcloud) { create(:nextcloud_storage_configured, :as_automatically_managed) }

describe ".debounce", with_good_job: [described_class] do
context "when has been debounced by other thread" do
it "does not change the number of enqueued jobs" do
expect(GoodJob::Job.count).to eq(0)
expect(described_class.perform_later(managed_nextcloud).successfully_enqueued?).to be(true)
expect(described_class.perform_later(managed_nextcloud)).to be(false)
expect(GoodJob::Job.count).to eq(1)

expect { described_class.debounce(managed_nextcloud) }.not_to change(GoodJob::Job, :count)
end
end

context "when has not been debounced by other thread" do
before { RequestStore.delete("sync-nextcloud-#{managed_nextcloud.id}") }

it "schedules a job" do
expect { described_class.debounce(managed_nextcloud) }.to change(GoodJob::Job, :count).from(0).to(1)
end

# it "tries to schedule once when called 1000 times in a short period of time" do
# expect_any_instance_of(ActiveJob::ConfiguredJob)
# .to receive(:perform_later).once.and_call_original
#
# expect do
# 1000.times { described_class.debounce(managed_nextcloud) }
# end.to change(GoodJob::Job, :count).from(0).to(1)
# end
end
end

describe ".perform" do
subject(:job_instance) { described_class.new }

it "only runs for automatically managed storages" do
unmanaged_nextcloud = create(:nextcloud_storage_configured, :as_not_automatically_managed)

allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success)

job_instance.perform(managed_nextcloud)
job_instance.perform(unmanaged_nextcloud)

expect(Storages::NextcloudGroupFolderPropertiesSyncService).to have_received(:call).with(managed_nextcloud)
expect(Storages::NextcloudGroupFolderPropertiesSyncService).not_to have_received(:call).with(unmanaged_nextcloud)
end

it "marks storage as healthy if sync was successful" do
allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud).and_return(ServiceResult.success)

Timecop.freeze("2023-03-14T15:17:00Z") do
expect do
job_instance.perform(managed_nextcloud)
managed_nextcloud.reload
end.to(
change(managed_nextcloud, :health_changed_at).to(Time.now.utc)
.and(change(managed_nextcloud, :health_status).from("pending").to("healthy"))
)
end
end

it "marks storage as unhealthy if sync was unsuccessful" do
job = class_double(Storages::HealthStatusMailerJob)
allow(Storages::HealthStatusMailerJob).to receive(:set).and_return(job)
allow(job).to receive(:perform_later)

allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call)
.with(managed_nextcloud)
.and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :not_found)))

Timecop.freeze("2023-03-14T15:17:00Z") do
expect do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }
managed_nextcloud.reload
end.to(
change(managed_nextcloud, :health_changed_at).to(Time.now.utc)
.and(change(managed_nextcloud, :health_status).from("pending").to("unhealthy"))
.and(change(managed_nextcloud, :health_reason).from(nil).to("not_found"))
)
end
end

context "when Storages::Errors::IntegrationJobError is raised" do
before do
allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(managed_nextcloud)
.and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :custom_error)))

allow(OpenProject::Notifications).to receive(:send)
end

it "retries the job" do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }
performed_jobs = described_class.queue_adapter.performed_jobs

expect(performed_jobs.last.dig("exception_executions", "[Storages::Errors::IntegrationJobError]")).to eq(5)
end

it "sends a notification after the maximum number of attempts" do
perform_enqueued_jobs { described_class.perform_later(managed_nextcloud) }

expect(OpenProject::Notifications).to have_received(:send).with(
OpenProject::Events::STORAGE_TURNED_UNHEALTHY,
storage: managed_nextcloud,
reason: "custom_error"
)
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
require "spec_helper"
require_module_spec_helper

RSpec.describe Storages::ManageStorageIntegrationsJob, :webmock, type: :job do
RSpec.describe Storages::ManageStorageIntegrationsJob, type: :job do
describe ".debounce" do
context "when has been debounced by other thread" do
before { ActiveJob::Base.disable_test_adapter }
Expand Down Expand Up @@ -165,11 +165,11 @@
allow(Storages::NextcloudGroupFolderPropertiesSyncService)
.to receive(:call).with(storage1)
.and_return(ServiceResult.failure(errors: Storages::StorageError.new(code: :custom_error)))
end

it "retries the job" do
allow(OpenProject::Notifications).to receive(:send)
end

it "retries the job" do
perform_enqueued_jobs { described_class.perform_later }

expect(described_class
Expand All @@ -178,8 +178,6 @@
end

it "sends a notification after the maximum number of attempts" do
allow(OpenProject::Notifications).to receive(:send)

perform_enqueued_jobs { described_class.perform_later }

expect(OpenProject::Notifications).to have_received(:send).with(
Expand Down

0 comments on commit 4eb1816

Please sign in to comment.