Skip to content

Commit

Permalink
add alias handling and atomic swap and remove of old indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-sc committed Dec 3, 2024
1 parent bd3ab8e commit c069a40
Showing 1 changed file with 140 additions and 23 deletions.
163 changes: 140 additions & 23 deletions Classes/Command/IndexCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,33 +143,46 @@ protected function fullSync(InputInterface $input): void
$collection = new Collection($response->getBody());
$this->bibliographyItems = $collection->pluck('data');
$cursor = 0; // set Cursor to 0, not to bulk size
$index = $this->extConf['elasticIndexName'];
$mappingParams = BibElasticMapping::getMappingParams($index);
// we are working with alias names to swap indexes from zotero_temp to zotero after successfully indexing
$tempIndexAlias = $this->extConf['elasticIndexName'].'_temp';
$indexName = $this->extConf['elasticIndexName'] . '_' . date('Ymd_His');
$tempIndexParams = BibElasticMapping::getMappingParams($indexName);

// add alias name 'zotero_temp to this index
// and add a wildcard alias to find all zotero_* indices with the alias zotero-index
$aliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->extConf['elasticIndexName'].'_*',
'alias' => $this->extConf['elasticIndexName'].'-index',
],
]
]
]
];

try {
// in older Elasticsearch versions (until 7) exists returns a bool
if ($this->client->indices()->exists(['index' => $index])) {
$this->client->indices()->delete(['index' => $index]);
$this->client->indices()->create($mappingParams);
}
$this->client->indices()->create($tempIndexParams);
$this->client->indices()->updateAliases($aliasParams);
} catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Index: " . $index . " does not exist. Trying to create new index.");
$this->client->indices()->create($mappingParams);
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error creating elasticsearch index.');
throw new \Exception('Bibliography sync unsuccessful.');
}
}

$apiCounter = self::API_TRIALS;

while ($cursor < $this->total) {
try {
$this->sync($cursor, 0);

$this->sync($indexName, $cursor, 0);
$apiCounter = self::API_TRIALS;
$remainingItems = $this->total - $cursor;
$advanceBy = min($remainingItems, $this->bulkSize);
Expand All @@ -188,6 +201,11 @@ protected function fullSync(InputInterface $input): void
}
}
}

// swap alias for index from zotero_temp to zotero and remove old indexes (keep the last one)
$this->swapIndexAliases($indexName, $tempIndexAlias);
//delete old indexes
$this->deleteOldIndexes($indexName);
$this->io->progressFinish();
}

Expand All @@ -196,7 +214,7 @@ protected function versionedSync(int $version): void
$apiCounter = self::API_TRIALS;
while (true) {
try {
$this->sync(0, $version);
$this->sync( $this->extConf['elasticIndexName'], 0, $version);
$this->io->text('done');
return;
} catch (\Exception $e) {
Expand All @@ -214,13 +232,13 @@ protected function versionedSync(int $version): void
}
}

protected function sync(int $cursor = 0, int $version = 0): void
protected function sync(string $indexName, int $cursor = 0, int $version = 0,): void
{
$this->fetchBibliography($cursor, $version);
$this->fetchCitations($cursor, $version);
$this->fetchTeiData($cursor, $version);
$this->buildDataSets();
$this->commitBibliography();
$this->commitBibliography($indexName);
}

protected function getVersion(InputInterface $input): int
Expand Down Expand Up @@ -344,21 +362,18 @@ protected function buildDataSets(): void
});
}

protected function commitBibliography(): void
protected function commitBibliography(string $indexName): void
{
if ($this->dataSets->count() == 0) {
$this->io->text('no new bibliographic entries');
return;
}
$index = $this->extConf['elasticIndexName'];

$params = [ 'body' => [] ];

$bulkCount = 0;
foreach ($this->dataSets as $document) {
$params['body'][] = [ 'index' =>
[
'_index' => $index,
'_index' => $indexName,
'_id' => $document['key']
]
];
Expand All @@ -372,6 +387,108 @@ protected function commitBibliography(): void
$this->client->bulk($params);
}

protected function swapIndexAliases(string $indexName, string $tempIndexAlias): void
{
// get index with alias = zotero
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName']]);
$aliasesArray = $aliasesRequest->asArray();

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note('Remove alias "' .$this->extConf['elasticIndexName']. '" from index '. $index . 'and add it to ' . $indexName );
// get index name with alias 'zotero'
if (array_key_exists($this->extConf['elasticIndexName'], $aliasArray['aliases'])) {
//swap alias from old to new index
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $index,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'add' => [
'index' => $indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);
}
}
}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Alias: " . $this->extConf['elasticIndexName'] . " does not exist. Move alias to ".$indexName);
// rename alias name from temp index to zotero
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);

} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName']);
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}
}

protected function deleteOldIndexes($indexName): void
{
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName'].'_*']);
$aliasesArray = $aliasesRequest->asArray();

// sort $aliasesArray by key name
ksort($aliasesArray);

// remove current key $indexName from array
unset($aliasesArray[$indexName]);

// remove the last key (we keep the last two indexes)
array_pop($aliasesArray);

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note("Delete index " . $index);
$this->client->indices()->delete(['index' => $index]);
}

}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Nothing to remove, there are no indexes with alias " . $this->extConf['elasticIndexName'].'_*');
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName'].'_*');
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}


}

/* protected function commitLocales(): void
{
$localeIndex = $this->extConf['elasticLocaleIndexName'];
Expand Down

0 comments on commit c069a40

Please sign in to comment.