Skip to content

Commit

Permalink
Allow cancel job to fail gracefully if the job has been cleared from …
Browse files Browse the repository at this point in the history
…history

Also disabled error logging for commands that fail but for which we wish to consider not failed for ssh execute.
  • Loading branch information
atruskie committed Feb 5, 2025
1 parent 6c54d00 commit 22e61df
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
16 changes: 11 additions & 5 deletions lib/gems/pbs/lib/pbs/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class Connection

JOB_ID_REGEX = /(\d+)(\.[-\w]+)?/
JOB_FINISHED_REGEX = /Job has finished/
JOB_FINISHED_STATUS = 35
UNKNOWN_JOB_ID_STATUS = 153
QDEL_GRACEFUL_STATUSES = [0, JOB_FINISHED_STATUS, UNKNOWN_JOB_ID_STATUS].freeze

JSON_PARSER_OPTIONS = {
allow_nan: true,
Expand Down Expand Up @@ -200,7 +203,10 @@ def submit_job(script, working_directory, **options)
.fmap { |stdout, _stderr| stdout.strip }
end

# Deletes a job identified by a job id
# Deletes a job identified by a job id.
# Fails gracefully if the job has already finished or the job history has cleared the ID (unknown).
# Generally we're cancelling here to clean or sync our state with the cluster. So if the job is already
# gone/done we don't care.
# @param job_id [String] the job id
# @param wait [Boolean] whether to wait for the job to be finish
# @param completed [Boolean] whether to delete the job from the completed queue
Expand All @@ -213,10 +219,10 @@ def cancel_job(job_id, wait: false, completed: false, force: false)

command += " && while qstat '#{job_id}' &> /dev/null; do echo 'waiting' ; sleep 0.1; done" if wait

execute_safe(command, fail_message: "deleting job #{job_id}")
# if the job has already finished by the time we get up to cancelling it
# we don't want to consider it an error. Just be graceful - it has ended.
.or { |stdout, stderr| stdout =~ JOB_FINISHED_REGEX ? Success(stdout) : Failure([stdout, stderr]) }
# if the job has already finished by the time we get up to cancelling it
# we don't want to consider it an error. Just be graceful - it has ended.
# Same thing for a job that's been cleared from the cluster's history:
execute_safe(command, fail_message: "deleting job #{job_id}", success_statuses: QDEL_GRACEFUL_STATUSES)
end

# Releases a job identified by a job id
Expand Down
13 changes: 8 additions & 5 deletions lib/gems/pbs/lib/pbs/ssh.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module PBS
module SSH
include Dry::Monads[:result]

SUCCESS_STATUSES = [0].freeze

class TransportError < StandardError; end

private
Expand All @@ -34,9 +36,10 @@ def key_file

# Execute a command
# @param command [String]
# @param success_statuses [Array(Integer)] the statuses that are considered successful. Only affects logging level.
# @return [Array((Integer,nil),String,String)] a tuple of
# status, stdout, stderr. stdout and stderr are NOT split into lines.
def execute(command)
def execute(command, success_statuses: SUCCESS_STATUSES)
stdout = ''
stderr = ''
status = {}
Expand All @@ -56,7 +59,7 @@ def execute(command)
exit_code = status.fetch(:exit_code, nil)

ssh_logger.log(
exit_code&.zero? ? :debug : :error,
success_statuses.include?(status) ? :debug : :error,
command:, exit_code:, stdout:, stderr:
)

Expand All @@ -68,10 +71,10 @@ def execute(command)
# @param fail_message [String] a message to add to the failure if the command fails
# @return [::Dry::Monads::Result<Array(String,String)>] a tuple of
# stdout, stderr, each of which is NOT split into lines.
def execute_safe(command, fail_message: '')
status, stdout, stderr = execute(command)
def execute_safe(command, fail_message: '', success_statuses: SUCCESS_STATUSES)
status, stdout, stderr = execute(command, success_statuses:)

unless status&.zero?
unless success_statuses.include?(status)
fail_message = fail_message.blank? ? '' : " when #{fail_message}"
return Failure("Command failed with status `#{status}`#{fail_message}: \n#{stdout}\n#{stderr}")
end
Expand Down
35 changes: 32 additions & 3 deletions spec/lib/gems/pbs/lib/pbs/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@
expect(log).to include('LOG: TERM trap: job killed or cancelled')
end

it 'can cancel a job that has already finished' do
it 'is graceful when canceling a job that has already finished' do
# arrange
result = connection.submit_job(
'echo "hello tests my pwd is $(pwd)"',
Expand All @@ -358,9 +358,38 @@
# act
result = connection.cancel_job(job_id, wait: true)

# assert
expect(result).to be_success
stdout, = result.value!
expect(stdout).to match(/Job has finished/)
stdout, stderr = result.value!
expect(stderr).to match(/Job has finished/)
expect(stdout).not_to match(/waiting/)
end

it 'is graceful when canceling a job that is not in the job history' do
# arrange
result = connection.submit_job(
'echo "hello tests my pwd is $(pwd)"',
working_directory
)

expect(result).to be_success
job_id = result.value!

job = wait_for_pbs_job(job_id)
expect(job).to be_finished

# we're simulating a completed job being cleared from history
result = connection.cancel_job(job_id, wait: true, force: true, completed: true)
expect(result).to be_success

# act - now we try to cancel the job again
result = connection.cancel_job(job_id, wait: true)

# assert
expect(result).to be_success

stdout, stderr = result.value!
expect(stderr).to match(/Unknown Job Id/)
expect(stdout).not_to match(/waiting/)
end

Expand Down

0 comments on commit 22e61df

Please sign in to comment.