-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custodial copy concurrency and other bugs #233
Conversation
* Download files into a folder called /downloads in the work directory * Delete this folder on container start * Add debug logging for the OCFL library. This is temporary but it'll be useful on prod for a while * Add a recursive method which will pull up to 50 messages from the SQS queue before processing. * Remove the parallel processing for multiple messages for the same IO * Add parallel processing for entities with different IOs * Add a new enum for the Result to replace the fiber Outcome. I don't want to use `Try` * Simplify the logic around deleted/non-deleted in Main * Stop grouping by path id in the OcflService createObjects method. All paths inside here will be for the same IO so it's pointless. * Call stage changes once per path with the semaphore around it. I don't know if this is causing a problem but it can't be helping. * Add a MOVE_SOURCE option so the /downloads folder doesn't fill up. * Move the object lock to a file system backed H2 database. The OCFL usage instructions says to use this if you're processing in parallel. * Update the tests to match the new structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just a few comments
- Should we add a test to see if
aggregateMessages
works the way it should, so- Does it call
receiveMessages
again if there are no messages - Does it call
receiveMessages
again if there are 50+ messages - Does it call
receiveMessages
again if there are < 50 messages
- Does it call
- Should we add a test for the
handleErrorWith
in that method? - Could you update the README? Could you also update the "Messages" section as that has gotten a bit out of date?
@@ -45,12 +52,20 @@ class ProcessorTest extends AnyFlatSpec with MockitoSugar { | |||
) | |||
} | |||
|
|||
"process" should "throw an Exception if a CO message has 'deleted' set to 'true'" in { | |||
"process" should "return a failure if a CO message has 'deleted' set to 'true'" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should we capitalise Failure so that it's clear that it's referring to the object Failure and not a failure (same question for the other tests)?
- Didn't we say that we were going to throw an error in this case, as something like this shouldn't happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yeah OK
- I'm going to leave it. I don't want special cases for certain errors. If this happens it will end up in the DLQ where we can investigate.
@@ -230,7 +245,7 @@ class ProcessorTest extends AnyFlatSpec with MockitoSugar { | |||
) | |||
) | |||
|
|||
utils.processor.process(utils.duplicatesIoMessageResponse, false).unsafeRunSync() | |||
val res = utils.processor.process(utils.duplicatesIoMessageResponse).unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
res
is unused
private def createDirectory: IO[Path] = { | ||
val path = Path(s"/tmp/${UUID.randomUUID()}/$id") | ||
private def createDirectory(workDir: String): IO[Path] = { | ||
val path = Path(s"$workDir/downloads/${UUID.randomUUID()}/$id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a more descriptive name than downloads
? Something like downloadedObjects
, downloadedUpdates
, newObjects
, newCcObjects
, ioUpdates
, objectUpdates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok, it's fairly standard to have a folder for downloaded files called downloads.
@@ -10,10 +10,10 @@ sealed trait CustodialCopyObject { | |||
def id: UUID | |||
def checksums: List[Checksum] | |||
def name: String | |||
def sourceFilePath: IO[Path] = createDirectory.map(cd => Path(s"$cd/$name")) | |||
def sourceFilePath(workDir: String): IO[Path] = createDirectory(workDir).map(cd => Path(s"$cd/$name")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of cd
, it could be dir
, newDir
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dir
is good. I've changed it.
_ <- logger.info(s"${results.count(_.isError)} messages out of ${results.length} unique messages failed") | ||
|
||
_ <- results.parTraverse { | ||
case Failure(e) => logError(e) >> IO.unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was in the code before but do we need the >> IO.unit
? since logError(e)
already returns IO.unit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, good spot, we don't. I've removed it.
I've added in tests for the aggregation. I've put a line in the README about it. I'm not sure what's up with the messages section in there though. |
The Message section in the README says
but this is no longer true, right? doesn't the Queue Creator lambda receive that message and then transforms the string Id into a UUID? The intro of the "Messages" section makes it seem as though it's happening in the Custodial Copy backend Also, the " Messages are deduped before they are processed" part, should that maybe go after the sentence that reads "Each group id corresponds to an OCFL object (which may not exist yet)"? |
We're still sending the messages in the "io:1b9555dd-43b7-4681-9b0d-85ebe951ca02" format. Maybe we could change it to have a "entityType" property but it's not a huge priority. It's a good point about the deduplication though, I've moved that line down a bit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Try