Skip to content

Commit

Permalink
EDGOAIPMH-99 - Harvesting Across Tenants Orchestrator (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
siarhei-charniak authored Aug 18, 2023
1 parent 386e225 commit 45892c3
Show file tree
Hide file tree
Showing 16 changed files with 589 additions and 51 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ The following schemas used:
### Required Permissions
Institutional users should be granted the following permissions in order to use this edge API:
- `oai-pmh.all`
In case when multi-tenant harvesting is needed, institutional users should be granted additional permission:
- `user-tenants.collection.get`

### Multi-tenant (consortia) harvesting setup
In order to perform harvesting across consortium tenants, central tenant's institutional user should be granted additional permission `user-tenants.collection.get`. Also, additional system users should be created for each consortia member tenant, with same as central tenant's `username` and granted `oai-pmh.all` permission.

### Configuration
Please refer to the [Configuration](https://github.com/folio-org/edge-common/blob/master/README.md#configuration) section in the [edge-common](https://github.com/folio-org/edge-common/blob/master/README.md) documentation to see all available system properties and their default values.
Expand Down
50 changes: 19 additions & 31 deletions src/main/java/org/folio/edge/oaipmh/OaiPmhHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import static org.folio.edge.oaipmh.utils.Constants.RESUMPTION_TOKEN;
import static org.folio.edge.oaipmh.utils.Constants.TENANT_ID;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
Expand Down Expand Up @@ -56,14 +54,12 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
import org.folio.edge.oaipmh.utils.ResponseConverter;
import org.openarchives.oai._2.ListRecordsType;
import org.openarchives.oai._2.OAIPMH;
import org.openarchives.oai._2.RequestType;
import org.openarchives.oai._2.ResumptionTokenType;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;

@Slf4j
public class OaiPmhHandler extends Handler {
/** Expected valid http status codes to be returned by repository logic */
Expand Down Expand Up @@ -221,29 +217,16 @@ protected void handleProxyResponse(RoutingContext ctx, HttpResponse<Buffer> oaiP
Optional<String> encodingHeader = ofNullable(oaiPmhResponse.getHeader(String.valueOf(HttpHeaders.CONTENT_ENCODING)));
encodingHeader.ifPresent(value -> edgeResponse.putHeader(HttpHeaders.CONTENT_ENCODING, value));
Buffer buffer = oaiPmhResponse.body();
var oaipmh = readOAIPMH(buffer);
var oaipmh = ResponseConverter.getInstance().toOaiPmh(buffer.toString());
if (isListRecords(oaipmh) && isResumptionTokenOnly(oaipmh.getListRecords())) {
performCall(ctx, oaiPmhClient, oaipmh.getListRecords().getResumptionToken().getValue());
} else if (isLastResponse(oaipmh)) {
getTenants(okapiClient)
.thenAccept(list -> {
var nextTenant = getNextTenant(list, oaiPmhClient.tenant);
if (nextTenant.isPresent()) {
var newResumptionToken = toResumptionToken(nextTenant.get(), fetchMetadataPrefix(oaipmh.getRequest()));
if (isListRecords(oaipmh)) {
var listRecords = oaipmh.getListRecords();
listRecords.setResumptionToken(listRecords.getResumptionToken().withValue(newResumptionToken));
} else {
var listIdentifiers = oaipmh.getListIdentifiers();
listIdentifiers.setResumptionToken(listIdentifiers.getResumptionToken().withValue(newResumptionToken));
}
try {
var stream = new ByteArrayOutputStream();
JAXBContext.newInstance(OAIPMH.class).createMarshaller().marshal(oaipmh, stream);
edgeResponse.end(BufferImpl.buffer(stream.toByteArray()));
} catch (JAXBException e) {
internalServerError(ctx, "Cannot marshall OAIPMH object");
}
updateResumptionTokenValue(oaipmh, nextTenant.get());
edgeResponse.end(BufferImpl.buffer(ResponseConverter.getInstance().convertToString(oaipmh)));
} else {
edgeResponse.end(buffer);
}
Expand All @@ -263,6 +246,21 @@ protected void handleProxyResponse(RoutingContext ctx, HttpResponse<Buffer> oaiP
}
}

private void updateResumptionTokenValue(OAIPMH oaipmh, String nextTenant) {
var newResumptionTokenValue = toResumptionToken(nextTenant, fetchMetadataPrefix(oaipmh.getRequest()));
if (isListRecords(oaipmh)) {
var listRecords = oaipmh.getListRecords();
listRecords.setResumptionToken(isNull(listRecords.getResumptionToken()) ?
new ResumptionTokenType().withValue(newResumptionTokenValue) :
listRecords.getResumptionToken().withValue(newResumptionTokenValue));
} else {
var listIdentifiers = oaipmh.getListIdentifiers();
listIdentifiers.setResumptionToken(isNull(listIdentifiers.getResumptionToken()) ?
new ResumptionTokenType().withValue(newResumptionTokenValue) :
listIdentifiers.getResumptionToken().withValue(newResumptionTokenValue));
}
}

@Override
protected void accessDenied(RoutingContext ctx, String msg) {
log.error("accessDenied: " + msg);
Expand Down Expand Up @@ -301,16 +299,6 @@ private void notAcceptableResponse(RoutingContext ctx, HttpServerRequest request
"Accept header must be \"text/xml\" for this request, but it is " + "\"" + unsupportedType + "\"" + ", can not send */*");
}

private OAIPMH readOAIPMH(Buffer buffer) {
try {
return (OAIPMH) JAXBContext.newInstance(OAIPMH.class)
.createUnmarshaller()
.unmarshal(new ByteArrayInputStream(buffer.getBytes()));
} catch (JAXBException e) {
return new OAIPMH();
}
}

private boolean isListRecords(OAIPMH oaipmh) {
return nonNull(oaipmh.getListRecords());
}
Expand Down
90 changes: 90 additions & 0 deletions src/main/java/org/folio/edge/oaipmh/utils/ResponseConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.folio.edge.oaipmh.utils;

import com.sun.xml.bind.marshaller.NamespacePrefixMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openarchives.oai._2.OAIPMH;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("squid:S1191") //The com.sun.xml.bind.marshaller.NamespacePrefixMapper is part of jaxb logic
public class ResponseConverter {

private static final Logger logger = LogManager.getLogger(ResponseConverter.class);
private static final Map<String, String> NAMESPACE_PREFIX_MAP = new HashMap<>();
private final NamespacePrefixMapper namespacePrefixMapper;

private static ResponseConverter ourInstance;

static {
NAMESPACE_PREFIX_MAP.put("http://www.loc.gov/MARC21/slim", "marc");
NAMESPACE_PREFIX_MAP.put("http://purl.org/dc/elements/1.1/", "dc");
NAMESPACE_PREFIX_MAP.put("http://www.openarchives.org/OAI/2.0/oai_dc/", "oai_dc");
NAMESPACE_PREFIX_MAP.put("http://www.openarchives.org/OAI/2.0/oai-identifier", "oai-identifier");
try {
ourInstance = new ResponseConverter();
} catch (JAXBException e) {
logger.error("The jaxb context could not be initialized.");
throw new IllegalStateException("Marshaller and unmarshaller are not available.", e);
}
}
private JAXBContext jaxbContext;

public static ResponseConverter getInstance() {
return ourInstance;
}

/**
* The main purpose is to initialize JAXB Marshaller and Unmarshaller to use the instances for business logic operations
*/
private ResponseConverter() throws JAXBException {
jaxbContext = JAXBContext.newInstance(OAIPMH.class);
namespacePrefixMapper = new NamespacePrefixMapper() {
@Override
public String getPreferredPrefix(String namespaceUri, String suggestion, boolean requirePrefix) {
return NAMESPACE_PREFIX_MAP.getOrDefault(namespaceUri, suggestion);
}
};
}

/**
* Marshals {@link OAIPMH} object and returns string representation
* @param oaipmh {@link OAIPMH} object to marshal
* @return marshaled {@link OAIPMH} object as string representation
*/
public String convertToString(OAIPMH oaipmh) {
try (var stream = new ByteArrayOutputStream()) {
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_SCHEMA_LOCATION,
"http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd");
jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
jaxbMarshaller.setProperty("com.sun.xml.bind.namespacePrefixMapper", namespacePrefixMapper);
jaxbMarshaller.marshal(oaipmh, stream);
return stream.toString();
} catch (JAXBException | IOException e) {
throw new IllegalStateException("The OAI-PMH response cannot be converted to string representation.", e);
}
}

/**
* Unmarshals {@link OAIPMH} object based on passed string
* @param oaipmhResponse the {@link OAIPMH} response in string representation
* @return the {@link OAIPMH} object based on passed string
*/
public OAIPMH toOaiPmh(String oaipmhResponse) {
try (StringReader reader = new StringReader(oaipmhResponse)) {
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
return (OAIPMH) jaxbUnmarshaller.unmarshal(reader);
} catch (JAXBException e) {
throw new IllegalStateException("The string cannot be converted to OAI-PMH response.", e);
}
}
}
6 changes: 4 additions & 2 deletions src/main/resources/ephemeral.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
secureStore.type=Ephemeral
# a comma separated list of tenants
tenants=diku,test_oaipmh,central,tenant1,tenant2,tenant3,tenant4
tenants=diku,test_oaipmh,central,central2,tenant1,tenant2,tenant3,tenant4,tenant5
#######################################################
# For each tenant, the institutional user password...
#
Expand All @@ -9,7 +9,9 @@ tenants=diku,test_oaipmh,central,tenant1,tenant2,tenant3,tenant4
diku=user,password
test_oaipmh=test-user,test
central=user,password
central2=user,password
tenant1=user,password
tenant2=user,password
tenant3=user,password
tenant4=user,password
tenant4=user,password
tenant5=user,password
38 changes: 34 additions & 4 deletions src/test/java/org/folio/edge/oaipmh/OaiPmhTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import io.restassured.RestAssured;
Expand Down Expand Up @@ -74,6 +75,7 @@ static void setUpOnce(Vertx vertx, VertxTestContext context) throws Exception {
knownTenants.add("tenant1");
knownTenants.add("tenant2");
knownTenants.add("tenant3");
knownTenants.add("tenant5");

System.setProperty(SYS_PORT, String.valueOf(serverPort));
System.setProperty(SYS_OKAPI_URL, "http://localhost:" + okapiPort);
Expand Down Expand Up @@ -679,15 +681,43 @@ void shouldContinueHarvestingForCurrentConsortiaTenantIfResumptionTokenIsPresent
assertEquals(expectedMockBody, actualBody);
}

@Test
void shouldAddResumptionTokenForLastResponseWhenNextTenantIsPresent() {
@ParameterizedTest
@CsvSource({"ListRecords,0", "ListIdentifiers,1"})
void shouldAddResumptionTokenForLastResponseWhenNextTenantIsPresent(String verb, int mockIndex) {
log.info("=== Test successful add resumption token if next tenant is present ===");

Path expectedMockPath = Paths.get(OaiPmhMockOkapi.PATH_TO_LIST_RECORDS_WITH_TOKEN_MOCK);
var pathsToMockFiles = List.of(OaiPmhMockOkapi.PATH_TO_LIST_RECORDS_WITH_TOKEN_MOCK,
OaiPmhMockOkapi.PATH_TO_LIST_IDENTIFIERS_WITH_TOKEN_MOCK);

Path expectedMockPath = Paths.get(pathsToMockFiles.get(mockIndex));
String expectedMockBody = OaiPmhMockOkapi.getOaiPmhResponseAsXml(expectedMockPath);

final Response resp = RestAssured
.get(String.format("/oai?verb=%s&resumptionToken=bWV0YWRhdGFQcmVmaXg9b2FpX2RjJnRlbmFudElkPXRlbmFudDQmcGFyYW09cGFyYW0&apikey=%s", verb, ApiKeyUtils.generateApiKey(10, "central", "user")))
.then()
.contentType(TEXT_XML)
.statusCode(HttpStatus.SC_OK)
.header(HttpHeaders.CONTENT_TYPE, TEXT_XML)
.extract()
.response();

String actualBody = resp.body().asString();
assertEquals(expectedMockBody, actualBody);
}

@ParameterizedTest
@CsvSource({"ListRecords,0", "ListIdentifiers,1"})
void shouldAddNewResumptionTokenIfNotPresentForLastResponseWhenNextTenantIsPresent(String verb, int mockIndex) {
log.info("=== Test successful add new resumption token if next tenant is present ===");

var pathsToMockFiles = List.of(OaiPmhMockOkapi.PATH_TO_LIST_RECORDS_WITH_NEW_TOKEN_MOCK,
OaiPmhMockOkapi.PATH_TO_LIST_IDENTIFIERS_WITH_NEW_TOKEN_MOCK);

Path expectedMockPath = Paths.get(pathsToMockFiles.get(mockIndex));
String expectedMockBody = OaiPmhMockOkapi.getOaiPmhResponseAsXml(expectedMockPath);

final Response resp = RestAssured
.get(String.format("/oai?verb=ListRecords&resumptionToken=bWV0YWRhdGFQcmVmaXg9b2FpX2RjJnRlbmFudElkPXRlbmFudDQmcGFyYW09cGFyYW0&apikey=%s", ApiKeyUtils.generateApiKey(10, "central", "user")))
.get(String.format("/oai?verb=%s&metadataPrefix=oai_dc&apiKey=%s", verb, ApiKeyUtils.generateApiKey(10, "central2", "user")))
.then()
.contentType(TEXT_XML)
.statusCode(HttpStatus.SC_OK)
Expand Down
Loading

0 comments on commit 45892c3

Please sign in to comment.