From 68ff595a4a37d466e7c6bf6e8450fee6fb938e2a Mon Sep 17 00:00:00 2001 From: Justin Littman Date: Thu, 24 Oct 2024 10:29:54 -0400 Subject: [PATCH] Adds publish robot. refs #5197 --- Gemfile | 5 ++ Gemfile.lock | 24 +++++++++ README.md | 15 ++++++ app/controllers/objects_controller.rb | 2 +- app/jobs/publish_job.rb | 24 +++------ app/jobs/robots/dor_repo/accession/publish.rb | 21 ++++++++ app/jobs/robots/robot.rb | 18 +++++++ .../publish/metadata_transfer_service.rb | 12 ++--- config/initializers/sidekiq.rb | 11 ++++- config/settings.yml | 2 + openapi.yml | 9 ---- spec/jobs/publish_job_spec.rb | 49 +++++-------------- .../robots/dor_repo/accession/publish_spec.rb | 37 ++++++++++++++ spec/rails_helper.rb | 2 + spec/requests/publish_object_spec.rb | 35 +++---------- .../publish/metadata_transfer_service_spec.rb | 4 +- 16 files changed, 166 insertions(+), 104 deletions(-) create mode 100644 app/jobs/robots/dor_repo/accession/publish.rb create mode 100644 app/jobs/robots/robot.rb create mode 100644 spec/jobs/robots/dor_repo/accession/publish_spec.rb diff --git a/Gemfile b/Gemfile index ee5de817f..c13fe9afc 100644 --- a/Gemfile +++ b/Gemfile @@ -12,6 +12,7 @@ gem 'dor-workflow-client', '~> 7.3' gem 'druid-tools', '~> 2.2' gem 'folio_client', '~> 0.8' gem 'graphql' +gem 'lyber-core' # For robots gem 'mais_orcid_client' gem 'marc' gem 'marc-vocab', '~> 0.3.0' # for indexing @@ -21,6 +22,10 @@ gem 'purl_fetcher-client', '~> 2.1' gem 'stanford-mods' # for indexing gem 'sul_orcid_client', '~> 0.3' +source 'https://gems.contribsys.com/' do + gem 'sidekiq-pro' +end + # Ruby general dependencies gem 'bootsnap', '>= 1.4.2', require: false gem 'bunny', '~> 2.17' # Send messages to RabbitMQ diff --git a/Gemfile.lock b/Gemfile.lock index e69ef85ce..b7c58fca8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,9 @@ +GEM + remote: https://gems.contribsys.com/ + specs: + sidekiq-pro (7.3.2) + sidekiq (>= 7.3.0, < 8) + GEM remote: https://rubygems.org/ specs: @@ -173,6 +179,13 @@ GEM ed25519 docile (1.4.1) domain_name (0.6.20240107) + dor-services-client (15.1.0) + activesupport (>= 4.2, < 8) + cocina-models (~> 0.99.0) + deprecation + faraday (~> 2.0) + faraday-retry + zeitwerk (~> 2.1) dor-workflow-client (7.5.0) activesupport (>= 3.2.1, < 8) deprecation (>= 0.99.0) @@ -278,6 +291,15 @@ GEM loofah (2.22.0) crass (~> 1.0.2) nokogiri (>= 1.12.0) + lyber-core (7.6.0) + activesupport + config + dor-services-client (~> 15.0) + dor-workflow-client (>= 7.4) + druid-tools + honeybadger + sidekiq (~> 7.0) + zeitwerk mail (2.8.1) mini_mime (>= 0.1.1) net-imap @@ -611,6 +633,7 @@ DEPENDENCIES jsonpath (~> 1.1) jwt lograge + lyber-core mais_orcid_client marc marc-vocab (~> 0.3.0) @@ -637,6 +660,7 @@ DEPENDENCIES rubocop-rspec_rails ruby-cache (~> 0.3.0) sidekiq (~> 7.0) + sidekiq-pro! simplecov sneakers (~> 2.11) stanford-mods diff --git a/README.md b/README.md index 11976d1dc..fdc280cfe 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,21 @@ $ sudo systemctl restart rolling-index **NOTE 3**: The rolling indexer logs to `{capistrano_shared_dir}/log/rolling_indexer.log` +## Robots + +DSA hosts robots that perform DSA actions. This replaces the previous pattern in which a common accessioning robot which would invoke a DSA endpoint that would start a DSA job that would perform the action and then update the workflow status. + +Robots are in `jobs/robots/*`. All DSA robots must be added to Workflow Server Rails' `QueueService` so that the workflow jobs are handled by DSA robots (instead of normal robots). + +There also must be a sidekiq process to handle the DSA robot queues. For example: +``` +:labels: + - robot +:concurrency: 5 +:queues: + - [accessionWF_default_dsa, 2] + - accessionWF_low_dsa +``` ## Other tools diff --git a/app/controllers/objects_controller.rb b/app/controllers/objects_controller.rb index dba113737..2845d0a9f 100644 --- a/app/controllers/objects_controller.rb +++ b/app/controllers/objects_controller.rb @@ -107,7 +107,7 @@ def accession def publish result = BackgroundJobResult.create EventFactory.create(druid: params[:id], event_type: 'publish_request_received', data: { background_job_result_id: result.id }) - PublishJob.set(queue: publish_queue).perform_later(druid: params[:id], background_job_result: result, workflow: params[:workflow]) + PublishJob.set(queue: publish_queue).perform_later(druid: params[:id], background_job_result: result) head :created, location: result end diff --git a/app/jobs/publish_job.rb b/app/jobs/publish_job.rb index d4ca60cd0..bd8699a41 100644 --- a/app/jobs/publish_job.rb +++ b/app/jobs/publish_job.rb @@ -8,33 +8,21 @@ class PublishJob < ApplicationJob # @param [String] druid the identifier of the item to be published # @param [Integer,nil] user_version the version of the item to be published. If nil, the latest version will be published. # @param [BackgroundJobResult] background_job_result identifier of a background job result to store status info - # @param [String,nil] workflow workflow to report to. If nil, no workflow will be reported to. - # @param [Boolean] log_success whether success should be logged - def perform(druid:, background_job_result:, workflow: nil, user_version: nil, log_success: true) + def perform(druid:, background_job_result:, user_version: nil) background_job_result.processing! cocina_object = CocinaObjectStore.find(druid) # Note that LogFailureJob / LogSuccessJob will update the BackgroundJobResult. # If workflow is nil, no workflow will be reported to. if cocina_object.admin_policy? - return LogFailureJob.perform_later(druid:, - background_job_result:, - workflow:, - workflow_process: workflow_process_for(workflow), - output: { errors: [{ title: 'Publishing error', detail: 'Cannot publish an admin policy' }] }) + background_job_result.output = { errors: [{ title: 'Publishing error', detail: 'Cannot publish an admin policy' }] } + background_job_result.complete! + return end - Publish::MetadataTransferService.publish(druid:, user_version:, workflow:) + Publish::MetadataTransferService.publish(druid:, user_version:) EventFactory.create(druid:, event_type: 'publishing_complete', data: { background_job_result_id: background_job_result.id }) - return unless log_success - LogSuccessJob.perform_later(druid:, - background_job_result:, - workflow:, - workflow_process: workflow_process_for(workflow)) - end - - def workflow_process_for(workflow) - workflow == 'releaseWF' ? 'release-publish' : 'publish' + background_job_result.complete! end end diff --git a/app/jobs/robots/dor_repo/accession/publish.rb b/app/jobs/robots/dor_repo/accession/publish.rb new file mode 100644 index 000000000..31ad4244e --- /dev/null +++ b/app/jobs/robots/dor_repo/accession/publish.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Robots + module DorRepo + module Accession + # Publishing metadata and shelving files for object. + class Publish < Robots::Robot + def initialize + super('accessionWF', 'publish') + end + + def perform_work + return LyberCore::ReturnState.new(status: :skipped, note: 'Admin policy objects are not published') if cocina_object.admin_policy? + + ::Publish::MetadataTransferService.publish(druid:) + EventFactory.create(druid:, event_type: 'publishing_complete', data: {}) + end + end + end + end +end diff --git a/app/jobs/robots/robot.rb b/app/jobs/robots/robot.rb new file mode 100644 index 000000000..d182507e0 --- /dev/null +++ b/app/jobs/robots/robot.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Robots + # Base class for DSA robots. + class Robot < LyberCore::Robot + def cocina_object + @cocina_object ||= CocinaObjectStore.find(druid) + end + + def object_client + raise 'Object Client should not be used from a DSA robot' + end + + def workflow_service + @workflow_service ||= ::WorkflowClientFactory.build + end + end +end diff --git a/app/services/publish/metadata_transfer_service.rb b/app/services/publish/metadata_transfer_service.rb index 359549b87..c1f6f3cd8 100644 --- a/app/services/publish/metadata_transfer_service.rb +++ b/app/services/publish/metadata_transfer_service.rb @@ -8,13 +8,11 @@ module Publish class MetadataTransferService # @param [String] druid for the object to be published # @param [Integer] user_version if a specific version is to be published - # @param [String] workflow (optional) the workflow used for reporting back status to (defaults to 'accessionWF') - def self.publish(druid:, user_version: nil, workflow: 'accessionWF') - new(druid:, workflow:, user_version:).publish + def self.publish(druid:, user_version: nil) + new(druid:, user_version:).publish end - def initialize(druid:, workflow:, user_version:) - @workflow = workflow + def initialize(druid:, user_version:) @public_cocina = PublicCocina.new(druid:, user_version:) end @@ -39,13 +37,13 @@ def republish_collection_members! Array.wrap( MemberService.for(druid, publishable: true) ).each do |member_druid| - PublishJob.set(queue: :publish_low).perform_later(druid: member_druid, background_job_result: BackgroundJobResult.create, workflow:, log_success: false) + PublishJob.set(queue: :publish_low).perform_later(druid: member_druid, background_job_result: BackgroundJobResult.create) end end def republish_virtual_object_constituents! VirtualObjectService.constituents(cocina_object, publishable: true).each do |constituent_druid| - PublishJob.set(queue: :publish_low).perform_later(druid: constituent_druid, background_job_result: BackgroundJobResult.create, workflow:, log_success: false) + PublishJob.set(queue: :publish_low).perform_later(druid: constituent_druid, background_job_result: BackgroundJobResult.create) end end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 9e1691a3a..6f5c047c8 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -1,7 +1,16 @@ # frozen_string_literal: true Sidekiq.configure_server do |config| - config.redis = { url: Settings.redis_url } + # Add the following to a sidekiq.yml to have it handle robot jobs. + # :labels: + # - robot + if config[:labels].include?('robot') + config.redis = { url: Settings.robots_redis_url } + # For Sidekiq Pro + config.super_fetch! + else + config.redis = { url: Settings.redis_url } + end end Sidekiq.configure_client do |config| diff --git a/config/settings.yml b/config/settings.yml index 80615390b..dc5327e28 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -44,6 +44,8 @@ solr: dor_indexing: url: 'https://dor-indexing-app.example.edu/dor' redis_url: 'redis://localhost:6379/' +# This is the redis used by all robots, including the robots running inside DSA. +robots_redis_url: 'redis://localhost:6379/1' workflow_url: 'https://workflow.example.com/workflow' sdr: diff --git a/openapi.yml b/openapi.yml index de589314d..25df92dd9 100644 --- a/openapi.yml +++ b/openapi.yml @@ -128,15 +128,6 @@ paths: required: true schema: $ref: "#/components/schemas/Druid" - - name: workflow - in: query - description: which workflow should this be reported to - schema: - type: string - enum: - - accessionWF - - releaseWF - example: releaseWF - name: lane-id in: query description: Lane for prioritizing the work diff --git a/spec/jobs/publish_job_spec.rb b/spec/jobs/publish_job_spec.rb index a3743a572..4549994d8 100644 --- a/spec/jobs/publish_job_spec.rb +++ b/spec/jobs/publish_job_spec.rb @@ -4,26 +4,25 @@ RSpec.describe PublishJob do subject(:perform) do - described_class.perform_now(druid:, background_job_result: result, workflow:) + described_class.perform_now(druid:, background_job_result: result) end let(:druid) { 'druid:mk420bs7601' } let(:result) { create(:background_job_result) } let(:item) { instance_double(Cocina::Models::DRO, admin_policy?: false) } - let(:workflow) { 'accessionWF' } let(:valid) { true } let(:invalid_filenames) { [] } before do allow(CocinaObjectStore).to receive(:find).with(druid).and_return(item) allow(result).to receive(:processing!) + allow(result).to receive(:complete!) allow(EventFactory).to receive(:create) + allow(Publish::MetadataTransferService).to receive(:publish) end context 'with no errors' do before do - allow(Publish::MetadataTransferService).to receive(:publish) - allow(LogSuccessJob).to receive(:perform_later) perform end @@ -32,41 +31,13 @@ end it 'invokes the Publish::MetadataTransferService' do - expect(Publish::MetadataTransferService).to have_received(:publish).with(druid:, workflow:, user_version: nil).once + expect(Publish::MetadataTransferService).to have_received(:publish).with(druid:, user_version: nil).once end it 'marks the job as complete' do expect(EventFactory).to have_received(:create) - expect(LogSuccessJob).to have_received(:perform_later) - .with(druid:, background_job_result: result, workflow: 'accessionWF', workflow_process: 'publish') - end - - context 'when log_success is set to false' do - subject(:perform) do - described_class.perform_now(druid:, user_version: 4, background_job_result: result, workflow:, log_success: false) - end - - it 'does not mark the job as complete' do - expect(Publish::MetadataTransferService).to have_received(:publish).with(druid:, workflow:, user_version: 4).once - expect(EventFactory).to have_received(:create) - - expect(LogSuccessJob).not_to have_received(:perform_later) - .with(druid:, background_job_result: result, workflow: 'accessionWF', workflow_process: 'publish') - end - end - - context 'when log_success is set to true' do - subject(:perform) do - described_class.perform_now(druid:, background_job_result: result, workflow:, log_success: true) - end - - it 'mark the job as complete' do - expect(EventFactory).to have_received(:create) - - expect(LogSuccessJob).to have_received(:perform_later) - .with(druid:, background_job_result: result, workflow: 'accessionWF', workflow_process: 'publish') - end + expect(result).to have_received(:complete!).once end end @@ -74,13 +45,17 @@ let(:item) { instance_double(Cocina::Models::AdminPolicy, admin_policy?: true) } before do - allow(LogFailureJob).to receive(:perform_later) + allow(result).to receive(:output=) perform end it 'marks the job as a failure' do - expect(LogFailureJob).to have_received(:perform_later) - .with(druid:, background_job_result: result, workflow: 'accessionWF', workflow_process: 'publish', output: Hash) + expect(result).to have_received(:output=).with({ errors: [{ title: 'Publishing error', detail: 'Cannot publish an admin policy' }] }) + expect(result).to have_received(:complete!).once + end + + it 'does not transfer' do + expect(Publish::MetadataTransferService).not_to have_received(:publish) end end end diff --git a/spec/jobs/robots/dor_repo/accession/publish_spec.rb b/spec/jobs/robots/dor_repo/accession/publish_spec.rb new file mode 100644 index 000000000..96c0d6ea6 --- /dev/null +++ b/spec/jobs/robots/dor_repo/accession/publish_spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Robots::DorRepo::Accession::Publish, type: :robot do + subject(:perform) { test_perform(robot, druid) } + + let(:druid) { 'druid:zz000zz0001' } + let(:robot) { described_class.new } + + before do + allow(CocinaObjectStore).to receive(:find).with(druid).and_return(object) + allow(Publish::MetadataTransferService).to receive(:publish) + allow(EventFactory).to receive(:create) + end + + context 'when the object is an admin policy' do + let(:object) { build(:admin_policy, id: druid) } + + it 'skips the object' do + expect(perform.status).to eq 'skipped' + expect(perform.note).to eq 'Admin policy objects are not published' + expect(Publish::MetadataTransferService).not_to have_received(:publish) + expect(EventFactory).not_to have_received(:create) + end + end + + context 'when the object is not an admin policy' do + let(:object) { build(:dro, id: druid) } + + it 'publishes the object' do + expect(perform).to be_nil # no return state defaults to completed. + expect(Publish::MetadataTransferService).to have_received(:publish).with(druid: druid) + expect(EventFactory).to have_received(:create).with(druid: druid, event_type: 'publishing_complete', data: {}) + end + end +end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index b2eaa8686..3cc007c49 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -62,6 +62,8 @@ # https://relishapp.com/rspec/rspec-rails/docs config.infer_spec_type_from_file_location! + config.include LyberCore::Rspec, type: :robot + # Filter lines from Rails gems in backtraces. config.filter_rails_from_backtrace! # arbitrary gems may also be filtered via: diff --git a/spec/requests/publish_object_spec.rb b/spec/requests/publish_object_spec.rb index 49f1b2ea9..7322ef3e2 100644 --- a/spec/requests/publish_object_spec.rb +++ b/spec/requests/publish_object_spec.rb @@ -11,35 +11,12 @@ allow(PublishJob).to receive(:set).and_return(job) end - context 'with a workflow provided' do - it 'calls Publish::MetadataTransferService and returns 201' do - post "/v1/objects/#{druid}/publish?workflow=releaseWF&lane-id=low", headers: { 'Authorization' => "Bearer #{jwt}" } + # This happens when Argo invokes the API + it 'calls Publish::MetadataTransferService and returns 201' do + post "/v1/objects/#{druid}/publish", headers: { 'Authorization' => "Bearer #{jwt}" } - expect(PublishJob).to have_received(:set).with(queue: :publish_low) - expect(job).to have_received(:perform_later) - .with(druid:, background_job_result: BackgroundJobResult, workflow: 'releaseWF') - expect(response).to have_http_status(:created) - end - end - - context 'with an invalid workflow provided' do - let(:error) { JSON.parse(response.body)['errors'][0]['detail'] } # rubocop:disable Rails/ResponseParsedBody - - it 'is a bad request' do - post "/v1/objects/#{druid}/publish?workflow=badWF", headers: { 'Authorization' => "Bearer #{jwt}" } - expect(response).to have_http_status(:bad_request) - expect(error).to eq("\"badWF\" isn't part of the enum in #/paths/~1v1~1objects~1{id}~1publish/post/parameters/1/schema") - end - end - - context 'without a workflow provided' do - # This happens when Argo invokes the API - it 'calls Publish::MetadataTransferService and returns 201' do - post "/v1/objects/#{druid}/publish", headers: { 'Authorization' => "Bearer #{jwt}" } - - expect(job).to have_received(:perform_later) - .with(druid:, background_job_result: BackgroundJobResult, workflow: nil) - expect(response).to have_http_status(:created) - end + expect(job).to have_received(:perform_later) + .with(druid:, background_job_result: BackgroundJobResult) + expect(response).to have_http_status(:created) end end diff --git a/spec/services/publish/metadata_transfer_service_spec.rb b/spec/services/publish/metadata_transfer_service_spec.rb index c969f2c9e..2c7409212 100644 --- a/spec/services/publish/metadata_transfer_service_spec.rb +++ b/spec/services/publish/metadata_transfer_service_spec.rb @@ -40,7 +40,7 @@ described_class.publish(druid:) expect(MemberService).to have_received(:for).with(druid, publishable: true) - expect(publish_job).to have_received(:perform_later).once.with(druid: member_druid, background_job_result: BackgroundJobResult, workflow: 'accessionWF', log_success: false) + expect(publish_job).to have_received(:perform_later).once.with(druid: member_druid, background_job_result: BackgroundJobResult) expect(PurlFetcher::Client::Publish).to have_received(:publish).with(cocina: public_cocina, file_uploads: {}, version: 1, must_version: false, version_date: closed_at) expect(Publish::TransferStager).not_to have_received(:copy) @@ -246,7 +246,7 @@ described_class.publish(druid:) expect(VirtualObjectService).to have_received(:constituents).with(Cocina::Models::DROWithMetadata, publishable: true) - expect(publish_job).to have_received(:perform_later).once.with(druid: constituent_druid, background_job_result: BackgroundJobResult, workflow: 'accessionWF', log_success: false) + expect(publish_job).to have_received(:perform_later).once.with(druid: constituent_druid, background_job_result: BackgroundJobResult) expect(PurlFetcher::Client::Publish).to have_received(:publish).with(cocina: public_cocina, file_uploads: {}, version: 1, must_version: false, version_date: closed_at) expect(Publish::TransferStager).not_to have_received(:copy)