Skip to content

Commit

Permalink
Persist Globus file size
Browse files Browse the repository at this point in the history
Modify the FetchGlobusJob to use GlobusClient.list_files instead of
GlobusClient.get_filenames so that it can get access to the file sizes
as well as the file names.

GlobusService.download_chunk needed to be defined as a no-op or else the
call to attach the blob throws a NotImplementedError when it tries to
identify the content type of a blob with a non-zero size.

Fixes #3230
  • Loading branch information
edsu committed Jul 18, 2023
1 parent 7e895c3 commit bef5e6c
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 20 deletions.
26 changes: 14 additions & 12 deletions app/jobs/fetch_globus_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,43 @@ class FetchGlobusJob < BaseDepositJob

def perform(work_version)
work_version.attached_files.destroy_all
filepaths = filepaths_for(work_version)
files = files_for(work_version)

# Since it can take a while (hours) to get the filepaths from Globus API for large
# deposits we need to ensure that we still have an active database connection
# before trying to use it again or else we can get an error:
# PG::UnableToSend: SSL SYSCALL error: EOF detected
ActiveRecord::Base.clear_active_connections!

filepaths.each do |path|
next if ignore?(path)
files.each do |file|
next if ignore?(file.name)

work_version.attached_files << new_attached_file(path, work_version)
work_version.attached_files << new_attached_file(file, work_version)
end
work_version.upload_type = "browser"
work_version.fetch_globus_complete!
end

def filepaths_for(work_version)
def files_for(work_version)
GlobusClient
.get_filenames(path: work_version.globus_endpoint, user_id: work_version.work.owner.email)
.map { |filepath| filepath.delete_prefix(work_version.globus_endpoint_fullpath) }
.list_files(path: work_version.globus_endpoint, user_id: work_version.work.owner.email)
.map do |file|
file.tap { file.name = file.name.delete_prefix(work_version.globus_endpoint_fullpath) }
end
end

def ignore?(path)
path.start_with?("__MACOSX") || path.end_with?(".DS_Store")
end

def new_attached_file(path, work_version)
AttachedFile.new(path:, work_version:).tap do |attached_file|
def new_attached_file(file, work_version)
AttachedFile.new(path: file.name, work_version:).tap do |attached_file|
blob = ActiveStorage::Blob.create_before_direct_upload!(
key: attached_file.create_globus_active_storage_key,
filename: path,
filename: file.name,
service_name: ActiveStorage::Service::GlobusService::SERVICE_NAME,
byte_size: 0,
checksum: path
byte_size: file.size,
checksum: file.name
)
attached_file.file.attach(blob)
end
Expand Down
8 changes: 8 additions & 0 deletions app/services/active_storage/service/globus_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ def download(key, &)
raise NotImplementedError
end

def download_chunk(key, range)
# This is called by ActiveStorage::Blob::Identifiable when an
# ActiveStorage::Blob is being attached to an AttachedFile to identify the
# content type of the file. Since we don't have access to the content
# here we don't return anything. If we didn't have this here we would get a
# NotImplementedError exception.
end

def delete(key)
# This is called by ActiveSupport when #destroy is called on AttachedFile due to our use
# of ActiveStorage for file storage. This can happen during a decommission of a work.
Expand Down
59 changes: 59 additions & 0 deletions lib/tasks/cleanup.rake
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "logger"

namespace :cleanup do
desc "Remove unattached files"
task uploads: :environment do
Expand All @@ -8,4 +10,61 @@ namespace :cleanup do
.where("DATE(active_storage_blobs.created_at) = ?", 7.days.ago.to_date)
.find_each(&:purge_later)
end

desc "Update zero length files"
task file_sizes: :environment do
logger = Logger.new($stdout)

# find druids that have zero length files in active storage
sql =
<<-SQL
SELECT
druid,
work_versions.id AS work_version_id,
active_storage_blobs.filename AS filename,
active_storage_blobs.id AS blob_id
FROM works
JOIN work_versions ON work_versions.work_id = works.id
JOIN attached_files ON attached_files.work_version_id = work_versions.id
JOIN active_storage_attachments ON active_storage_attachments.record_id = attached_files.id
JOIN active_storage_blobs ON active_storage_blobs.id = active_storage_attachments.blob_id
WHERE active_storage_blobs.byte_size = 0
AND druid IS NOT NULL;
SQL

# update the blob with the filesize from the SDR
objects = {}
ActiveRecord::Base.connection.execute(sql).each do |result|
# look up the druid if we haven't seen it already
if !objects.has_key?(result["druid"])
begin
objects[result["druid"]] = Repository.find(result["druid"])
rescue RuntimeError
logger.error("Unable to lookup %{result['druid']} in SDR")
next
end
end
object = objects[result["druid"]]

# find the file in the structural metadata
sdr_file = nil
object.structural.contains.each do |fileset|
sdr_file ||= fileset.structural.contains.find { |file| file.filename == result["filename"] }
end

# update the blob!
if sdr_file
blob = ActiveStorage::Blob.find(result["blob_id"])
if blob.byte_size == 0
blob.byte_size = sdr_file.size
blob.save
logger.info("updated blob #{blob.id} size to #{sdr_file.size}")
else
logger.error(%(blob #{blob.id} for #{result["druid"]} doesn't have zero byte size!))
end
else
logger.error(%(couldn't find #{result["filename"]} for #{result["druid"]}))
end
end
end
end
19 changes: 11 additions & 8 deletions spec/jobs/fetch_globus_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

let(:work) { build(:work) }

let(:file_info) { GlobusClient::Endpoint::FileInfo }

before do
allow(GlobusClient).to receive(:get_filenames).and_return(
allow(GlobusClient).to receive(:list_files).and_return(
[
"/uploads/jstanford/work333/version1/file1.txt",
"/uploads/jstanford/work333/version1/__MACOSX/._file1.txt",
"/uploads/jstanford/work333/version1/dir1/file2.txt",
"/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt",
"/uploads/jstanford/work333/version1/dir2/.DS_Store",
"/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store"
file_info.new("/uploads/jstanford/work333/version1/file1.txt", 24601),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/._file1.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/dir1/file2.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/dir2/.DS_Store", 1),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store", 1)
]
)
work.update!(head: first_work_version)
Expand All @@ -36,9 +38,10 @@
expect(AttachedFile.find_by(id: attached_file.id)).to be_nil
attached_file = first_work_version.reload.attached_files.first
expect(attached_file.path).to eq("file1.txt")
expect(attached_file.byte_size).to eq(24601)
expect(attached_file.blob.service_name).to eq("globus")
expect(attached_file.blob.key).to eq("#{first_work_version.work.id}/1/file1.txt")
expect(GlobusClient).to have_received(:get_filenames).with(path: "jstanford/work333/version1",
expect(GlobusClient).to have_received(:list_files).with(path: "jstanford/work333/version1",
user_id: work.owner.email)
end
end

0 comments on commit bef5e6c

Please sign in to comment.