diff --git a/README.md b/README.md index d2627da9e..5ad39b0c9 100644 --- a/README.md +++ b/README.md @@ -302,6 +302,7 @@ The special E2E client settings should be defined in `env.yml`: | TRIP_INSTRUCTION_UPCOMING_RADIUS | integer | Optional | 10 | The radius in meters under which an upcoming instruction is given. | | TRIP_SURVEY_ID | string | Optional | abcdef123y | The ID of a survey (on the platform of your choice) for trip-related feedback. | | TRIP_SURVEY_SUBDOMAIN | string | Optional | abcabc12a | The subdomain of a website where the trip-related surveys are administered. | +| TRIP_SURVEY_API_TOKEN | string | Optional | abcdef123y | The token for the survey API for downloading responses. | | TWILIO_ACCOUNT_SID | string | Optional | your-account-sid | Twilio settings available at: https://twilio.com/user/account | | TRUSTED_COMPANION_CONFIRMATION_PAGE_URL | string | Optional | https://otp-server.example.com/trusted/confirmation | URL to the trusted companion confirmation page. This page should support handling an error URL parameter. | | TWILIO_AUTH_TOKEN | string | Optional | your-auth-token | Twilio settings available at: https://twilio.com/user/account | diff --git a/configurations/default/env.yml.tmp b/configurations/default/env.yml.tmp index c7996a62e..c9d61dfc0 100644 --- a/configurations/default/env.yml.tmp +++ b/configurations/default/env.yml.tmp @@ -103,6 +103,8 @@ TRIP_INSTRUCTION_UPCOMING_RADIUS: 10 # Survey ID and domain that is offered after users complete certain trips. TRIP_SURVEY_ID: abcdef123y TRIP_SURVEY_SUBDOMAIN: abcabc12a +# Survey API credentials to download responses +TRIP_SURVEY_API_TOKEN: token-12345c US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_URL: https://bus.notifier.example.com US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_KEY: your-key diff --git a/src/main/java/org/opentripplanner/middleware/OtpMiddlewareMain.java b/src/main/java/org/opentripplanner/middleware/OtpMiddlewareMain.java index 212d54f0a..a15523d3b 100644 --- a/src/main/java/org/opentripplanner/middleware/OtpMiddlewareMain.java +++ b/src/main/java/org/opentripplanner/middleware/OtpMiddlewareMain.java @@ -24,6 +24,7 @@ import org.opentripplanner.middleware.persistence.Persistence; import org.opentripplanner.middleware.tripmonitor.jobs.MonitorAllTripsJob; import org.opentripplanner.middleware.triptracker.TripSurveySenderJob; +import org.opentripplanner.middleware.triptracker.TripSurveyUploadJob; import org.opentripplanner.middleware.utils.ConfigUtils; import org.opentripplanner.middleware.utils.HttpUtils; import org.opentripplanner.middleware.utils.Scheduler; @@ -54,7 +55,7 @@ public class OtpMiddlewareMain { public static final String API_PREFIX = "/api/"; public static boolean inTestEnvironment = false; - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) throws IOException { // Load configuration. ConfigUtils.loadConfig(args); @@ -77,8 +78,9 @@ public static void main(String[] args) throws IOException, InterruptedException // Schedule trip history uploads. ConnectedDataManager.scheduleTripHistoryUploadJob(); - // Schedule recurring Monitor All Trips Job. + // Schedule recurring jobs. // TODO: Determine whether this should go in some other process. + MonitorAllTripsJob monitorAllTripsJob = new MonitorAllTripsJob(); Scheduler.scheduleJob( monitorAllTripsJob, @@ -87,8 +89,6 @@ public static void main(String[] args) throws IOException, InterruptedException TimeUnit.MINUTES ); - // Schedule recurring job for post-trip surveys, once every half-hour to catch recently completed trips. - // TODO: Determine whether this should go in some other process. TripSurveySenderJob tripSurveySenderJob = new TripSurveySenderJob(); Scheduler.scheduleJob( tripSurveySenderJob, @@ -96,10 +96,21 @@ public static void main(String[] args) throws IOException, InterruptedException 30, TimeUnit.MINUTES ); + + if (TripSurveyUploadJob.isConfigured()) { + LOG.info("Scheduling trip survey upload every day"); + TripSurveyUploadJob tripSurveyUploadJob = new TripSurveyUploadJob(); + Scheduler.scheduleJob( + tripSurveyUploadJob, + 0, + 1, + TimeUnit.DAYS + ); + } } } - private static void initializeHttpEndpoints() throws IOException, InterruptedException { + private static void initializeHttpEndpoints() throws IOException { // Must start spark explicitly to use spark-swagger. // https://github.com/manusant/spark-swagger#endpoints-binding Service spark = Service.ignite().port(Service.SPARK_DEFAULT_PORT); @@ -120,7 +131,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc new CDPFilesController(API_PREFIX), new OtpRequestProcessor("/otp", OtpVersion.OTP2), new OtpRequestProcessor("/otp2", OtpVersion.OTP2) - // TODO Add other models. + // Add other endpoints as needed. )) // Spark-swagger auto-generates a swagger document at localhost:4567/doc.yaml. // (That path is not configurable.) @@ -138,7 +149,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc return Files.readString(publicDocPath); }); - /** + /* * End point to receive project errors as soon as they are processed by Bugsnag. Information on Bugsnag's * webhook can be found here: https://docs.bugsnag.com/product/integrations/data-forwarding/webhook/ * @@ -153,7 +164,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc return ""; }); - /** + /* * End point to handle redirecting to the correct registration page from Auth0 as described here: * * https://auth0.com/docs/auth0-email-services/customize-email-templates#dynamic-redirect-to-urls diff --git a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataManager.java b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataManager.java index c38bf17a1..fc1feec7b 100644 --- a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataManager.java +++ b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataManager.java @@ -9,6 +9,7 @@ import org.bson.conversions.Bson; import org.opentripplanner.middleware.bugsnag.BugsnagReporter; import org.opentripplanner.middleware.controllers.api.OtpRequestProcessor; +import org.opentripplanner.middleware.models.IntervalUpload; import org.opentripplanner.middleware.models.TripHistoryUpload; import org.opentripplanner.middleware.models.TripRequest; import org.opentripplanner.middleware.models.TripSummary; @@ -19,17 +20,14 @@ import org.opentripplanner.middleware.utils.DateTimeUtils; import org.opentripplanner.middleware.utils.FileUtils; import org.opentripplanner.middleware.utils.JsonUtils; -import org.opentripplanner.middleware.utils.S3Utils; import org.opentripplanner.middleware.utils.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.time.DayOfWeek; import java.time.LocalDate; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -133,7 +131,9 @@ public static void removeUsersTripHistory(String userId) { } // Get all hourly windows that have already been earmarked for uploading. Set incompleteUploadHours = new HashSet<>(); - getIncompleteUploads().forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour)); + IntervalUpload + .getIncompleteUploads(Persistence.tripHistoryUploads) + .forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour)); // Save all new hourly windows for uploading. Set newHourlyWindows = Sets.difference(userTripHourlyWindows, incompleteUploadHours); TripHistoryUpload first = TripHistoryUpload.getFirst(); @@ -383,26 +383,25 @@ public static int compileAndUploadTripHistory( // Not null because ReportedEntities only contains entries that correspond to persistenceMap. TypedPersistence typedPersistence = ReportedEntities.persistenceMap.get(entityName); - String filePrefix = getFilePrefix(reportingInterval, periodStart, entityName); - String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath(); - - String zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION); - String tempZipFile = String.join(File.separator, tempFileFolder, zipFileName); + String coreFileName = entityName; + boolean anonymize = isAnonymizedInterval(reportingMode); + boolean isTripRequest = "TripRequest".equals(entityName); + if (isTripRequest && anonymize) { + // Anonymized trip requests are stored under a special file name. + coreFileName = ANON_TRIP_FILE_NAME; + } - String jsonFileName = String.join(".", filePrefix, JSON_FILE_EXTENSION); - String tempDataFile = String.join(File.separator, tempFileFolder, jsonFileName); + IntervalUploadFiles uploadFiles = new IntervalUploadFiles( + getFilePrefix(reportingInterval, periodStart, coreFileName), + JSON_FILE_EXTENSION, + isTest + ); - try { + try (uploadFiles) { + String tempDataFile = uploadFiles.getTempDataFile(); int recordsWritten = Integer.MIN_VALUE; - if ("TripRequest".equals(entityName)) { - // Anonymized trip requests are stored under a special file name. - boolean anonymize = isAnonymizedInterval(reportingMode); - if (anonymize) { - tempDataFile = tempDataFile.replace("TripRequest.json", ANON_TRIP_JSON_FILE_NAME); - tempZipFile = tempZipFile.replace("TripRequest.zip", ANON_TRIP_ZIP_FILE_NAME); - } - + if (isTripRequest) { // TripRequests must be processed separately because they must be combined, one per batchId. // Note: Anonymized trips include TripRequest and TripSummary in the same entity. recordsWritten = streamTripsToFile(tempDataFile, periodStart, reportingInterval, anonymize); @@ -423,43 +422,19 @@ public static int compileAndUploadTripHistory( if (recordsWritten > 0 || "true".equals(CONNECTED_DATA_PLATFORM_UPLOAD_BLANK_FILES)) { // Upload the file if records were written or config setting requires uploading blank files. - FileUtils.addSingleFileToZip(tempDataFile, tempZipFile); - S3Utils.putObject( - CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME, - String.format( - "%s/%s", - getUploadFolderName( - CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME, - CONNECTED_DATA_PLATFORM_FOLDER_GROUPING, - periodStart.toLocalDate() - ), - zipFileName - ), - new File(tempZipFile) - ); + uploadFiles.compressAndUpload(getUploadFolderName( + CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME, + CONNECTED_DATA_PLATFORM_FOLDER_GROUPING, + periodStart.toLocalDate() + )); } allRecordsWritten += recordsWritten; - } catch (Exception e) { + } catch (IOException e) { BugsnagReporter.reportErrorToBugsnag( - String.format("Failed to process trip data for (%s)", periodStart), + String.format("Failed to write trip data for (%s)", periodStart), e ); return Integer.MIN_VALUE; - } finally { - // Delete the temporary files. This is done here in case the S3 upload fails. - try { - LOG.error("Deleting CDP zip file {} as an error occurred while processing the data it was supposed to contain.", tempZipFile); - FileUtils.deleteFile(tempDataFile); - if (!isTest) { - FileUtils.deleteFile(tempZipFile); - } else { - LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.", - tempZipFile - ); - } - } catch (IOException e) { - LOG.error("Failed to delete temp files", e); - } } } @@ -471,37 +446,25 @@ public static boolean isAnonymizedInterval(String reportingMode) { return reportingModes.contains("interval") && reportingModes.contains("anonymized"); } - /** - * Get all incomplete trip history uploads. - */ - public static List getIncompleteUploads() { - FindIterable tripHistoryUploads = Persistence.tripHistoryUploads.getFiltered( - Filters.ne("status", TripHistoryUploadStatus.COMPLETED.getValue()) - ); - return tripHistoryUploads.into(new ArrayList<>()); + public static String getDailyFileName(LocalDateTime date, String fileNameSuffix) { + return formatFileName(date, "yyyy-MM-dd", fileNameSuffix); } public static String getHourlyFileName(LocalDateTime date, String fileNameSuffix) { - final String DEFAULT_DATE_FORMAT_PATTERN = "yyyy-MM-dd-HH"; - return String.format( - "%s-%s", - getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN), - fileNameSuffix - ); + return formatFileName(date, "yyyy-MM-dd-HH", fileNameSuffix); } /** * Produce file name without path or extension. */ public static String getFilePrefix(ReportingInterval reportingInterval, LocalDateTime date, String entityName) { - final String DEFAULT_DATE_FORMAT_PATTERN = isReportingDaily(reportingInterval) - ? "yyyy-MM-dd" - : "yyyy-MM-dd-HH"; - return String.format( - "%s-%s", - getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN), - entityName - ); + return isReportingDaily(reportingInterval) + ? getDailyFileName(date, entityName) + : getHourlyFileName(date, entityName); + } + + public static String formatFileName(LocalDateTime date, String datePattern, String suffix) { + return String.format("%s-%s", getStringFromDate(date, datePattern), suffix); } /** diff --git a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadFiles.java b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadFiles.java new file mode 100644 index 000000000..43e15c61f --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadFiles.java @@ -0,0 +1,80 @@ +package org.opentripplanner.middleware.connecteddataplatform; + +import org.opentripplanner.middleware.bugsnag.BugsnagReporter; +import org.opentripplanner.middleware.utils.FileUtils; +import org.opentripplanner.middleware.utils.S3Exception; +import org.opentripplanner.middleware.utils.S3Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME; +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.ZIP_FILE_EXTENSION; + +/** + * Helper class for upload job file handling. + * When used in try-with-resource blocks, temp files will be automatically deleted. + */ +public class IntervalUploadFiles implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(IntervalUploadFiles.class); + + private final boolean isTest; + + private final String zipFileName; + + private final String tempZipFile; + + private final String tempDataFile; + + public IntervalUploadFiles(String filePrefix, String coreExtension, boolean isTest) { + this.isTest = isTest; + + zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION); + String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath(); + tempZipFile = String.join(File.separator, tempFileFolder, zipFileName); + tempDataFile = String.join(File.separator, tempFileFolder, String.join(".", filePrefix, coreExtension)); + } + + public String getTempDataFile() { + return tempDataFile; + } + + /** + * Compress and upload the data file. + */ + public void compressAndUpload(String folder) throws IOException { + FileUtils.addSingleFileToZip(tempDataFile, tempZipFile); + try { + S3Utils.putObject( + CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME, + String.format("%s/%s", folder, zipFileName), + new File(tempZipFile) + ); + } catch (S3Exception e) { + String message = String.format("Error uploading (%s) to S3", zipFileName); + LOG.error(message); + BugsnagReporter.reportErrorToBugsnag(message, e); + } + } + + @Override + public void close() throws IOException { + // Delete the temporary files here, to cover S3 upload success or failure. + try { + LOG.info("Deleting zip file {}.", tempZipFile); + FileUtils.deleteFile(tempDataFile); + if (!isTest) { + FileUtils.deleteFile(tempZipFile); + } else { + LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.", + tempZipFile + ); + } + } catch (IOException e) { + LOG.error("Failed to delete temp files", e); + throw e; + } + } +} diff --git a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadJob.java b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadJob.java new file mode 100644 index 000000000..8536b8966 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadJob.java @@ -0,0 +1,123 @@ +package org.opentripplanner.middleware.connecteddataplatform; + +import org.opentripplanner.middleware.models.IntervalUpload; +import org.opentripplanner.middleware.persistence.TypedPersistence; +import org.opentripplanner.middleware.utils.DateTimeUtils; +import org.slf4j.Logger; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.List; + +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.isReportingDaily; + +/** + * This job is responsible for keeping the uploads held on S3 up-to-date by defining the hours/days which should be + * uploaded and triggering the upload process. + */ +public abstract class IntervalUploadJob implements Runnable { + + private static final int HISTORIC_UPLOAD_HOURS_BACK_STOP = 24; + + protected final ReportingInterval reportingInterval; + private final Logger logger; + private final TypedPersistence persistence; + + protected IntervalUploadJob(Logger logger, ReportingInterval reportingInterval, TypedPersistence persistence) { + this.logger = logger; + this.reportingInterval = reportingInterval; + this.persistence = persistence; + } + + protected abstract void processInterval(T intervalUpload); + + protected abstract void createUpload(LocalDateTime time); + + public void run() { + logger.info("{} started", this.getClass().getSimpleName()); + if (isReportingDaily(reportingInterval)) { + stageUploadDays(); + } else { + stageUploadHours(); + } + runInnerLogic(); + } + + public void runInnerLogic() { + IntervalUpload.getIncompleteUploads(persistence).forEach(this::processInterval); + } + + /** + * Add to the upload list any hours between the previous whole hour and the last created (pending or + * completed) upload. This will cover any hours missed due to downtime and add the latest upload hour + * if not already accounted for. + */ + public void stageUploadHours() { + stageUploadTimes(DateTimeUtils.getPreviousWholeHourFrom(LocalDateTime.now()), ChronoUnit.HOURS); + } + + /** + * Add to the upload list any days between the previous day and the last created (pending or + * completed) upload. This will cover any days missed due to downtime and add the latest upload day + * if not already accounted for. + */ + public void stageUploadDays() { + stageUploadTimes(DateTimeUtils.getPreviousDayFrom(LocalDateTime.now()), ChronoUnit.DAYS); + } + + /** + * Add to the upload list any hours/days between the previous whole hour/day and the last created + * (pending or completed) upload. This will cover any hours/days missed due to downtime, + * up to HISTORIC_UPLOAD_HOURS_BACK_STOP hours, and add the latest upload hour/day if not already accounted for. + */ + private void stageUploadTimes(LocalDateTime previousTime, ChronoUnit chronoUnit) { + IntervalUpload lastCreated = getLastUploadCreated(); + if (lastCreated == null) { + // Stage first ever upload hour/day (will use 'hour' throughout whether referring to hours or days). + createUpload(previousTime); + logger.debug("Staging first ever upload hour: {}.", previousTime); + return; + } + // Stage all time between the last time uploaded and an hour/day ago. + List intermediateTimes = DateTimeUtils.getTimeUnitsBetween( + lastCreated.uploadHour, + previousTime, + chronoUnit + ); + intermediateTimes.forEach(uploadHour -> { + if (uploadHour.isAfter(getHistoricDateTimeBackStop())) { + logger.debug( + "Staging hour: {} that is between last created: {} and the previous whole hour: {}", + lastCreated, + previousTime, + uploadHour + ); + createUpload(uploadHour); + } + }); + if (!lastCreated.uploadHour.isEqual(previousTime)) { + // Last created is not the latest upload hour, so stage an hour ago. + createUpload(previousTime); + logger.debug("Last created {} is older than the latest {}, so staging.", lastCreated, previousTime); + } + } + + public T getLastUploadCreated() { + return IntervalUpload.getLastUploadCreated(persistence); + } + + /** + * This is the absolute historic date/time which trip history will be uploaded. This assumes that the service will + * not be offline longer than this period, but if it is, it will prevent potentially a lot of data being uploaded on + * start-up which will impact performance. + */ + private static LocalDateTime getHistoricDateTimeBackStop() { + return LocalDateTime.now().minusHours(HISTORIC_UPLOAD_HOURS_BACK_STOP); + } + + /** Marks an upload record as completed. */ + protected void markAsCompleted(T upload) { + upload.status = IntervalUploadStatus.COMPLETED; + persistence.replace(upload.id, upload); + } +} diff --git a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadStatus.java b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadStatus.java similarity index 70% rename from src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadStatus.java rename to src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadStatus.java index fc5452b5f..8e7853ca3 100644 --- a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadStatus.java +++ b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/IntervalUploadStatus.java @@ -1,10 +1,10 @@ package org.opentripplanner.middleware.connecteddataplatform; /** - * Used to define a trip data upload status. Trip uploads will remain 'pending' until successfully uploaded, at which + * Used to define an upload status. Uploads will remain 'pending' until successfully uploaded, at which * point the status is set to 'completed'. */ -public enum TripHistoryUploadStatus { +public enum IntervalUploadStatus { /** * Once a trip data upload for a day has been completed it's status is set to completed. */ @@ -16,7 +16,7 @@ public enum TripHistoryUploadStatus { private final String value; - TripHistoryUploadStatus(String value) { + IntervalUploadStatus(String value) { this.value = value; } diff --git a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadJob.java b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadJob.java index cfc98cb63..198be0110 100644 --- a/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadJob.java +++ b/src/main/java/org/opentripplanner/middleware/connecteddataplatform/TripHistoryUploadJob.java @@ -2,116 +2,51 @@ import org.opentripplanner.middleware.models.TripHistoryUpload; import org.opentripplanner.middleware.persistence.Persistence; -import org.opentripplanner.middleware.utils.DateTimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.List; import java.util.Map; +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.CONNECTED_DATA_PLATFORM_REPORTING_INTERVAL; + /** * This job is responsible for keeping the trip history held on s3 up-to-date by defining the hours which should be * uploaded and triggering the upload process. */ -public class TripHistoryUploadJob implements Runnable { +public class TripHistoryUploadJob extends IntervalUploadJob { private static final Logger LOG = LoggerFactory.getLogger(TripHistoryUploadJob.class); - private static final int HISTORIC_UPLOAD_HOURS_BACK_STOP = 24; - public void run() { - if (ConnectedDataManager.isReportingDaily()) { - stageUploadDays(); - } else { - stageUploadHours(); - } - processTripHistory(ConnectedDataManager.CONNECTED_DATA_PLATFORM_REPORTING_INTERVAL, null); - } + private final Map reportedEntities; - /** - * Add to the trip history upload list any hours between the previous whole hour and the last created (pending or - * completed) trip history upload. This will cover any hours missed due to downtime and add the latest upload hour - * if not already accounted for. - */ - public static void stageUploadHours() { - stageUploadTimes(DateTimeUtils.getPreviousWholeHourFrom(LocalDateTime.now()), ChronoUnit.HOURS); + public TripHistoryUploadJob() { + super(LOG, CONNECTED_DATA_PLATFORM_REPORTING_INTERVAL, Persistence.tripHistoryUploads); + this.reportedEntities = null; } - /** - * Add to the trip history upload list any days between the previous day and the last created (pending or - * completed) trip history upload. This will cover any days missed due to downtime and add the latest upload day - * if not already accounted for. - */ - public static void stageUploadDays() { - stageUploadTimes(DateTimeUtils.getPreviousDayFrom(LocalDateTime.now()), ChronoUnit.DAYS); + public TripHistoryUploadJob(ReportingInterval reportingInterval, Map reportedEntities) { + super(LOG, reportingInterval, Persistence.tripHistoryUploads); + this.reportedEntities = reportedEntities; } - /** - * Add to the trip history upload list any hours/days between the previous whole hour/day and the last created - * (pending or completed) trip history upload. This will cover any hours/days missed due to downtime, - * up to HISTORIC_UPLOAD_HOURS_BACK_STOP hours, and add the latest upload hour/day if not already accounted for. - */ - private static void stageUploadTimes(LocalDateTime previousTime, ChronoUnit chronoUnit) { - TripHistoryUpload lastCreated = TripHistoryUpload.getLastCreated(); - if (lastCreated == null) { - // Stage first ever upload hour. - Persistence.tripHistoryUploads.create(new TripHistoryUpload(previousTime)); - LOG.debug("Staging first ever upload hour: {}.", previousTime); - return; - } - // Stage all time between the last time uploaded and an hour/day ago. - List intermediateTimes = DateTimeUtils.getTimeUnitsBetween( - lastCreated.uploadHour, - previousTime, - chronoUnit + @Override + protected void processInterval(TripHistoryUpload upload) { + int numRecordsToUpload = ConnectedDataManager.compileAndUploadTripHistory( + upload.uploadHour, + reportingInterval, + reportedEntities ); - intermediateTimes.forEach(uploadHour -> { - if (uploadHour.isAfter(getHistoricDateTimeBackStop())) { - LOG.debug( - "Staging hour: {} that is between last created: {} and the previous whole hour: {}", - lastCreated, - previousTime, - uploadHour - ); - Persistence.tripHistoryUploads.create(new TripHistoryUpload(uploadHour)); - } - }); - if (!lastCreated.uploadHour.isEqual(previousTime)) { - // Last created is not the latest upload hour, so stage an hour ago. - Persistence.tripHistoryUploads.create(new TripHistoryUpload(previousTime)); - LOG.debug("Last created {} is older than the latest {}, so staging.", lastCreated, previousTime); + if (numRecordsToUpload != Integer.MIN_VALUE) { + // If successfully compiled and updated, update the status to 'completed' and record the number of trip + // requests uploaded (if any). + upload.numTripRequestsUploaded = numRecordsToUpload; + markAsCompleted(upload); } } - /** - * This is the absolute historic date/time which trip history will be uploaded. This assumes that the service will - * not be offline longer than this period, but if it is, it will prevent potentially a lot of data being uploaded on - * start-up which will impact performance. - */ - private static LocalDateTime getHistoricDateTimeBackStop() { - return LocalDateTime.now().minusHours(HISTORIC_UPLOAD_HOURS_BACK_STOP); - } - - /** - * Process incomplete upload dates. This will be uploads which are flagged as 'pending'. If the upload date is - * compiled and uploaded successfully, it is flagged as 'complete'. - */ - public static void processTripHistory(ReportingInterval reportingInterval, Map reportedEntities) { - List incompleteUploads = ConnectedDataManager.getIncompleteUploads(); - incompleteUploads.forEach(tripHistoryUpload -> { - int numRecordsToUpload = ConnectedDataManager.compileAndUploadTripHistory( - tripHistoryUpload.uploadHour, - reportingInterval, - reportedEntities - ); - if (numRecordsToUpload != Integer.MIN_VALUE) { - // If successfully compiled and updated, update the status to 'completed' and record the number of trip - // requests uploaded (if any). - tripHistoryUpload.status = TripHistoryUploadStatus.COMPLETED.getValue(); - tripHistoryUpload.numTripRequestsUploaded = numRecordsToUpload; - Persistence.tripHistoryUploads.replace(tripHistoryUpload.id, tripHistoryUpload); - } - }); + @Override + protected void createUpload(LocalDateTime time) { + Persistence.tripHistoryUploads.create(new TripHistoryUpload(time)); } } diff --git a/src/main/java/org/opentripplanner/middleware/models/IntervalUpload.java b/src/main/java/org/opentripplanner/middleware/models/IntervalUpload.java new file mode 100644 index 000000000..b747205ee --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/models/IntervalUpload.java @@ -0,0 +1,89 @@ +package org.opentripplanner.middleware.models; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Sorts; +import org.bson.codecs.pojo.annotations.BsonIgnore; +import org.bson.conversions.Bson; +import org.opentripplanner.middleware.connecteddataplatform.IntervalUploadStatus; +import org.opentripplanner.middleware.persistence.TypedPersistence; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * An interval upload represents an historic interval (e.g. hour, day) when data was or is planned to be uploaded. + * If the status is 'pending', the data is waiting to be uploaded. If the status is 'complete' the data has been + * uploaded. + */ +public class IntervalUpload extends Model { + public static final String STATUS_FIELD_NAME = "status"; + + public LocalDateTime uploadHour; // Regardless of whether dealing with hours or days. + + public IntervalUploadStatus status = IntervalUploadStatus.PENDING; + + public IntervalUpload() { + // Empty constructor for deserialization. + } + + public IntervalUpload(LocalDateTime uploadHour) { + this.uploadHour = uploadHour; + this.status = IntervalUploadStatus.PENDING; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + IntervalUpload that = (IntervalUpload) o; + return Objects.equals(uploadHour, that.uploadHour) && Objects.equals(status, that.status); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), uploadHour, status); + } + + /** + * Get the last created upload regardless of status. + */ + @BsonIgnore + public static U getLastUploadCreated(TypedPersistence persistence) { + return getOneOrdered(persistence, Sorts.descending("dateCreated")); + } + + /** + * Get the first created upload regardless of status. + */ + @BsonIgnore + public static U getFirst(TypedPersistence persistence) { + return getOneOrdered(persistence, Sorts.ascending("dateCreated")); + } + + /** + * Get one upload based on the sort order. + */ + public static U getOneOrdered(TypedPersistence persistence, Bson sortBy) { + return persistence.getOneFiltered( + Filters.or( + Filters.eq(STATUS_FIELD_NAME, IntervalUploadStatus.COMPLETED.getValue()), + Filters.eq(STATUS_FIELD_NAME, IntervalUploadStatus.PENDING.getValue()) + ), + sortBy + ); + } + + /** + * Get all incomplete uploads. + */ + public static List getIncompleteUploads(TypedPersistence persistence) { + FindIterable incompleteUploads = persistence.getFiltered( + Filters.ne(IntervalUpload.STATUS_FIELD_NAME, IntervalUploadStatus.COMPLETED.getValue()) + ); + return incompleteUploads.into(new ArrayList<>()); + } +} diff --git a/src/main/java/org/opentripplanner/middleware/models/OtpUser.java b/src/main/java/org/opentripplanner/middleware/models/OtpUser.java index 061a31326..1c9e32a02 100644 --- a/src/main/java/org/opentripplanner/middleware/models/OtpUser.java +++ b/src/main/java/org/opentripplanner/middleware/models/OtpUser.java @@ -6,6 +6,7 @@ import org.opentripplanner.middleware.auth.Auth0Users; import org.opentripplanner.middleware.auth.RequestingUser; import org.opentripplanner.middleware.persistence.Persistence; +import org.opentripplanner.middleware.utils.NotificationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,6 +136,10 @@ public boolean delete(boolean deleteAuth0User) { } } + // Delete push devices + NotificationUtils.deletePushDevices(email); + + // If a related user, invalidate relationship with all dependents. for (String userId : dependents) { OtpUser dependent = Persistence.otpUsers.getById(userId); diff --git a/src/main/java/org/opentripplanner/middleware/models/TripHistoryUpload.java b/src/main/java/org/opentripplanner/middleware/models/TripHistoryUpload.java index 14b2cbc83..924820a0e 100644 --- a/src/main/java/org/opentripplanner/middleware/models/TripHistoryUpload.java +++ b/src/main/java/org/opentripplanner/middleware/models/TripHistoryUpload.java @@ -1,24 +1,17 @@ package org.opentripplanner.middleware.models; -import com.mongodb.client.model.Filters; -import com.mongodb.client.model.Sorts; import org.bson.codecs.pojo.annotations.BsonIgnore; -import org.bson.conversions.Bson; -import org.opentripplanner.middleware.connecteddataplatform.TripHistoryUploadStatus; import org.opentripplanner.middleware.persistence.Persistence; import java.time.LocalDateTime; -import java.util.Objects; /** * A trip history upload represents an historic hour when trip history was or is planned to be uploaded to S3. If the * status is 'pending' the trip history is waiting to be uploaded. If the status is 'complete' the trip history has been * uploaded. */ -public class TripHistoryUpload extends Model { +public class TripHistoryUpload extends IntervalUpload { - public LocalDateTime uploadHour; - public String status = TripHistoryUploadStatus.PENDING.getValue(); public int numTripRequestsUploaded = 0; /** This no-arg constructor exists to make MongoDB happy. */ @@ -26,16 +19,7 @@ public TripHistoryUpload() { } public TripHistoryUpload(LocalDateTime uploadHour) { - this.uploadHour = uploadHour; - this.status = TripHistoryUploadStatus.PENDING.getValue(); - } - - /** - * Get the last created trip history upload regardless of status. - */ - @BsonIgnore - public static TripHistoryUpload getLastCreated() { - return getOneOrdered(Sorts.descending("dateCreated")); + super(uploadHour); } /** @@ -43,33 +27,6 @@ public static TripHistoryUpload getLastCreated() { */ @BsonIgnore public static TripHistoryUpload getFirst() { - return getOneOrdered(Sorts.ascending("dateCreated")); - } - - /** - * Get one upload based on the sort order. - */ - private static TripHistoryUpload getOneOrdered(Bson sortBy) { - return Persistence.tripHistoryUploads.getOneFiltered( - Filters.or( - Filters.eq("status", TripHistoryUploadStatus.COMPLETED.getValue()), - Filters.eq("status", TripHistoryUploadStatus.PENDING.getValue()) - ), - sortBy - ); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - TripHistoryUpload that = (TripHistoryUpload) o; - return Objects.equals(uploadHour, that.uploadHour) && Objects.equals(status, that.status); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), uploadHour, status); + return IntervalUpload.getFirst(Persistence.tripHistoryUploads); } } diff --git a/src/main/java/org/opentripplanner/middleware/models/TripSurveyUpload.java b/src/main/java/org/opentripplanner/middleware/models/TripSurveyUpload.java new file mode 100644 index 000000000..72bcad7ff --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/models/TripSurveyUpload.java @@ -0,0 +1,28 @@ +package org.opentripplanner.middleware.models; + +import org.opentripplanner.middleware.connecteddataplatform.IntervalUploadStatus; + +import java.time.LocalDateTime; + +/** + * A survey upload represents an historic day when surveys are uploaded to S3. If the + * status is 'pending' surveys are waiting to be uploaded. If the status is 'complete' the surveys have been + * uploaded. + */ +public class TripSurveyUpload extends IntervalUpload { + + public TripSurveyUpload() { + // No-arg constructor for deserialization. + } + + public TripSurveyUpload(LocalDateTime uploadHour) { + super(uploadHour); + } + + public TripSurveyUpload(String id, LocalDateTime uploadHour, IntervalUploadStatus status) { + super(uploadHour); + this.id = id; + this.uploadHour = uploadHour; + this.status = status; + } +} diff --git a/src/main/java/org/opentripplanner/middleware/persistence/Persistence.java b/src/main/java/org/opentripplanner/middleware/persistence/Persistence.java index 01a41299c..544c01635 100644 --- a/src/main/java/org/opentripplanner/middleware/persistence/Persistence.java +++ b/src/main/java/org/opentripplanner/middleware/persistence/Persistence.java @@ -16,6 +16,7 @@ import org.opentripplanner.middleware.models.MonitoredComponent; import org.opentripplanner.middleware.models.MonitoredTrip; import org.opentripplanner.middleware.models.OtpUser; +import org.opentripplanner.middleware.models.TripSurveyUpload; import org.opentripplanner.middleware.models.TrackedJourney; import org.opentripplanner.middleware.models.TripHistoryUpload; import org.opentripplanner.middleware.models.TripRequest; @@ -49,6 +50,7 @@ public class Persistence { public static TypedPersistence apiUsers; public static TypedPersistence cdpUsers; public static TypedPersistence tripHistoryUploads; + public static TypedPersistence tripSurveyUploads; public static TypedPersistence trackedJourneys; public static TypedPersistence tripRequests; public static TypedPersistence tripSummaries; @@ -98,6 +100,7 @@ public static void initialize () { apiUsers = new TypedPersistence(mongoDatabase, ApiUser.class); cdpUsers = new TypedPersistence(mongoDatabase, CDPUser.class); tripHistoryUploads = new TypedPersistence(mongoDatabase, TripHistoryUpload.class); + tripSurveyUploads = new TypedPersistence(mongoDatabase, TripSurveyUpload.class); trackedJourneys = new TypedPersistence(mongoDatabase, TrackedJourney.class); tripRequests = new TypedPersistence(mongoDatabase, TripRequest.class); tripSummaries = new TypedPersistence(mongoDatabase, TripSummary.class); diff --git a/src/main/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJob.java b/src/main/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJob.java new file mode 100644 index 000000000..080e9e96a --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJob.java @@ -0,0 +1,93 @@ +package org.opentripplanner.middleware.triptracker; + +import org.apache.logging.log4j.util.Strings; +import org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager; +import org.opentripplanner.middleware.connecteddataplatform.IntervalUploadFiles; +import org.opentripplanner.middleware.connecteddataplatform.IntervalUploadJob; +import org.opentripplanner.middleware.connecteddataplatform.ReportingInterval; +import org.opentripplanner.middleware.models.TripSurveyUpload; +import org.opentripplanner.middleware.typeform.Responses; +import org.opentripplanner.middleware.persistence.Persistence; +import org.opentripplanner.middleware.typeform.TypeFormDispatcher; +import org.opentripplanner.middleware.utils.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME; + +/** + * This job will analyze completed trips with deviations and send survey notifications about select trips. + */ +public class TripSurveyUploadJob extends IntervalUploadJob { + private static final Logger LOG = LoggerFactory.getLogger(TripSurveyUploadJob.class); + + public static final String SURVEY_ZIP_FILE_PREFIX = "merged"; + + private final Function surveyApiResponseProvider; + + private final String csvHeaders; + + public TripSurveyUploadJob() { + this(TypeFormDispatcher::downloadSurveyResponses, TypeFormDispatcher::downloadSurveyHeaders); + } + + public TripSurveyUploadJob( + Function surveyApiResponseProvider, + Supplier surveyCsvHeaderProvider + ) { + super(LOG, ReportingInterval.DAILY, Persistence.tripSurveyUploads); + this.surveyApiResponseProvider = surveyApiResponseProvider; + this.csvHeaders = surveyCsvHeaderProvider.get(); + } + + @Override + protected void processInterval(TripSurveyUpload upload) { + Responses apiResponse = surveyApiResponseProvider.apply(upload.uploadHour); + + // Dump responses to temp CSV/Zip file and upload to S3. + // TODO: Add support if more than 1000 responses are submitted the same day. + if (apiResponse != null && processSurveyHistory(upload, apiResponse)) { + // If successfully compiled and updated, update the status to 'completed'. + markAsCompleted(upload); + + // Attempt to delete the responses that were downloaded above + List ids = apiResponse.items.stream().map(i -> i.response_id).collect(Collectors.toList()); + TypeFormDispatcher.deleteSurveyResponses(ids); + } + } + + @Override + protected void createUpload(LocalDateTime time) { + Persistence.tripSurveyUploads.create(new TripSurveyUpload(time)); + } + + public boolean processSurveyHistory(TripSurveyUpload upload, Responses responses) { + IntervalUploadFiles uploadFiles = new IntervalUploadFiles( + ConnectedDataManager.getFilePrefix(reportingInterval, upload.uploadHour, SURVEY_ZIP_FILE_PREFIX), + "csv", + responses.isTest + ); + + // Dump CSV content to file + String tempDataFile = uploadFiles.getTempDataFile(); + try (uploadFiles) { + FileUtils.writeToFile(tempDataFile, false, responses.toCsv(csvHeaders)); + uploadFiles.compressAndUpload("trip-survey-responses"); + return true; + } catch (IOException e) { + LOG.warn("Error writing survey results to {}", tempDataFile); + return false; + } + } + + public static boolean isConfigured() { + return !Strings.isBlank(CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME) && TypeFormDispatcher.checkSurveyIdAndToken(); + } +} diff --git a/src/main/java/org/opentripplanner/middleware/typeform/Field.java b/src/main/java/org/opentripplanner/middleware/typeform/Field.java new file mode 100644 index 000000000..ca4513d02 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/typeform/Field.java @@ -0,0 +1,24 @@ +package org.opentripplanner.middleware.typeform; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** Relevant fields info in surveys. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Field { + public String id; + + public String title; + + public Field() { + // Empty constructor for deserialization + } + + public Field(String id) { + this.id = id; + } + + public Field(String id, String title) { + this.id = id; + this.title = title; + } +} diff --git a/src/main/java/org/opentripplanner/middleware/typeform/Form.java b/src/main/java/org/opentripplanner/middleware/typeform/Form.java new file mode 100644 index 000000000..1d8e97204 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/typeform/Form.java @@ -0,0 +1,26 @@ +package org.opentripplanner.middleware.typeform; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** Data structure for relevant TypeForm Form data. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Form { + + public List fields; + + public List hidden; + + public String toCsvHeader() { + List standardHeaders = List.of("id", "status", "started", "completed"); + List allFields = new ArrayList<>(); + allFields.addAll(standardHeaders); + allFields.addAll(hidden); + allFields.addAll(fields.stream().map(f -> String.format("\"%s\"", f.title)).collect(Collectors.toList())); + + return String.join(",", allFields); + } +} diff --git a/src/main/java/org/opentripplanner/middleware/typeform/Response.java b/src/main/java/org/opentripplanner/middleware/typeform/Response.java new file mode 100644 index 000000000..9136682c2 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/typeform/Response.java @@ -0,0 +1,90 @@ +package org.opentripplanner.middleware.typeform; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.List; +import java.util.stream.Collectors; + +/** Data structure for TypeForm survey responses. Only including relevant fields. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Response { + public String response_id; + + public String response_type; + + public String landed_at; + + public String submitted_at; + + public Hidden hidden; + + public List answers; + + /** Relevant hidden fields in surveys. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Hidden { + public String notification_id; + + public String user_id; + + public String trip_id; + } + + /** Relevant answer fields in surveys. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Answer { + public Field field; + + public String type; + + public Choice choice; + + public Choices choices; + + public String text; + + public String toRawContent() { + if ("choices".equals(type) && choices != null && choices.labels != null) { + return String.join(";", choices.labels); + } else if ("choice".equals(type) && choice != null && choice.label != null) { + return choice.label; + } else if ("text".equals(type) && text != null) { + return text; + } + return ""; + } + + public String toCsvContent() { + // Surround answers with quotes as answers may contain commas. + return String.format("\"%s\"", toRawContent()); + } + } + + /** Relevant choice fields in surveys. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Choice { + public String label; + } + + /** Relevant multiple choice fields in surveys. */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Choices { + public List labels; + } + + public String toCsvRow() { + // id, type/state, landed, submitted, hidden fields, textual responses in order they appear. + return String.join( + ",", + response_id, + response_type, + landed_at, + submitted_at, + hidden.notification_id, + hidden.trip_id, + hidden.user_id, + answers.stream().map(Answer::toCsvContent).collect(Collectors.joining(",")) + ); + } +} + diff --git a/src/main/java/org/opentripplanner/middleware/typeform/Responses.java b/src/main/java/org/opentripplanner/middleware/typeform/Responses.java new file mode 100644 index 000000000..32000f230 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/typeform/Responses.java @@ -0,0 +1,27 @@ +package org.opentripplanner.middleware.typeform; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.List; + +/** Data structure for TypeForm survey API response. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Responses { + public List items; + + /** Populated in tests only */ + public boolean isTest; + + public String toCsv(String headers) { + StringBuilder builder = new StringBuilder(); + builder.append(headers); + builder.append(System.lineSeparator()); + if (items != null) { + for (Response response : items) { + builder.append(response.toCsvRow()); + builder.append(System.lineSeparator()); + } + } + return builder.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/org/opentripplanner/middleware/typeform/TypeFormDispatcher.java b/src/main/java/org/opentripplanner/middleware/typeform/TypeFormDispatcher.java new file mode 100644 index 000000000..554820896 --- /dev/null +++ b/src/main/java/org/opentripplanner/middleware/typeform/TypeFormDispatcher.java @@ -0,0 +1,111 @@ +package org.opentripplanner.middleware.typeform; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.logging.log4j.util.Strings; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.opentripplanner.middleware.utils.DateTimeUtils; +import org.opentripplanner.middleware.utils.HttpResponseValues; +import org.opentripplanner.middleware.utils.HttpUtils; +import org.opentripplanner.middleware.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; + +import static org.eclipse.jetty.http.HttpMethod.DELETE; +import static org.eclipse.jetty.http.HttpMethod.GET; +import static org.opentripplanner.middleware.utils.ConfigUtils.getConfigPropertyAsText; + +/** + * This job will analyze completed trips with deviations and send survey notifications about select trips. + */ +public class TypeFormDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(TypeFormDispatcher.class); + + public static final String TRIP_SURVEY_API_TOKEN = getConfigPropertyAsText("TRIP_SURVEY_API_TOKEN"); + + public static final String TRIP_SURVEY_ID = getConfigPropertyAsText("TRIP_SURVEY_ID"); + + private TypeFormDispatcher() { + // Hide the default constructor because this class only has static methods. + } + + public static HttpResponseValues apiRequest(HttpMethod method, String subPath, String queryParams, String topic) { + if (checkSurveyIdAndToken()) { + HttpResponseValues response = HttpUtils.httpRequestRawResponse( + URI.create(String.format("https://api.typeform.com/forms/%s%s%s", TRIP_SURVEY_ID, subPath, queryParams)), + 30, + method, + Map.of("Authorization", String.format("Bearer %s", TRIP_SURVEY_API_TOKEN)), + null + ); + + if (response.status != HttpStatus.OK_200) { + LOG.warn("Error {}-ing {}: [{}] {}", method, topic, response.status, response.responseBody); + } + + return response; + } + return null; + } + + public static Responses downloadSurveyResponses(LocalDateTime day) { + HttpResponseValues response = apiRequest(GET, "/responses", responsesParams(day), "survey responses"); + if (response != null && response.status == HttpStatus.OK_200) { + try { + return JsonUtils.getPOJOFromJSON(response.responseBody, Responses.class); + } catch (JsonProcessingException e) { + LOG.warn("Error parsing survey responses", e); + } + } + + return null; + } + + public static String downloadSurveyHeaders() { + HttpResponseValues response = apiRequest(GET, "", "", "survey headers"); + if (response != null && response.status == HttpStatus.OK_200) { + try { + Form form = JsonUtils.getPOJOFromJSON(response.responseBody, Form.class); + return form.toCsvHeader(); + } catch (JsonProcessingException e) { + LOG.warn("Error parsing survey headers", e); + } + } + + return null; + } + + public static void deleteSurveyResponses(List ids) { + if (!ids.isEmpty()) { + String idParam = String.format("?included_response_ids=%s", String.join(",", ids)); + apiRequest(DELETE, "/responses", idParam, "survey responses"); + } + } + + /** Assembles the query params for retrieving TypeForm survey responses. */ + public static String responsesParams(LocalDateTime day) { + ZonedDateTime zonedDay = day.atZone(DateTimeUtils.getOtpZoneId()); + // The page_size param needs to be passed. Without it, only up to 25 responses are returned by TypeForm. + // TypeForm can return up to 1000 responses in one query, see + // https://www.typeform.com/developers/responses/reference/retrieve-responses/. + return String.format( + "?page_size=1000&since=%d&until=%d", + zonedDay.toEpochSecond(), + zonedDay.plusDays(1).minusSeconds(1).toEpochSecond() + ); + } + + public static boolean checkSurveyIdAndToken() { + boolean idAndTokenPresent = !Strings.isBlank(TRIP_SURVEY_API_TOKEN) && !Strings.isBlank(TRIP_SURVEY_ID); + if (!idAndTokenPresent) { + LOG.warn("Survey ID or survey response API token was not provided."); + } + return idAndTokenPresent; + } +} diff --git a/src/main/java/org/opentripplanner/middleware/utils/NotificationUtils.java b/src/main/java/org/opentripplanner/middleware/utils/NotificationUtils.java index b1635d778..251f39240 100644 --- a/src/main/java/org/opentripplanner/middleware/utils/NotificationUtils.java +++ b/src/main/java/org/opentripplanner/middleware/utils/NotificationUtils.java @@ -421,6 +421,35 @@ public static int getPushInfo(String toUser) { return 0; } + /** + * Deletes devices registered for push notifications, typically when a user deletes their account. + * Calls Push API's DELETE endpoint. + * @param toUser email address of user for which to delete device registration. + */ + public static void deletePushDevices(String toUser) { + // If Push API config properties aren't set, no info can be obtained. + if (PUSH_API_KEY == null || PUSH_API_URL == null) return; + try { + Map headers = Map.of("Accept", "application/json"); + var httpResponse = HttpUtils.httpRequestRawResponse( + URI.create(getPushDevicesUrl(String.format( + "%s/device/deregister?api_key=%s&user=", + PUSH_API_URL, + PUSH_API_KEY + ), toUser)), + 1000, + HttpMethod.DELETE, + headers, + null + ); + if (httpResponse.status != 200) { + LOG.error("Error {} deleting push notification devices for {}", httpResponse.status, toUser); + } + } catch (Exception e) { + LOG.error("Error deleting push notification devices for {}", toUser, e); + } + } + /** * Return the number of unique, non null, device names. */ diff --git a/src/main/resources/env.schema.json b/src/main/resources/env.schema.json index f40de5414..d5cc8fea9 100644 --- a/src/main/resources/env.schema.json +++ b/src/main/resources/env.schema.json @@ -329,6 +329,11 @@ "examples": ["abcabc12a"], "description": "The subdomain of a website where the trip-related surveys are administered." }, + "TRIP_SURVEY_API_TOKEN": { + "type": "string", + "examples": ["abcdef123y"], + "description": "The token for the survey API for downloading responses." + }, "TWILIO_ACCOUNT_SID": { "type": "string", "examples": ["your-account-sid"], diff --git a/src/test/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataPlatformTest.java b/src/test/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataPlatformTest.java index c6f8fa31e..cc8abda58 100644 --- a/src/test/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataPlatformTest.java +++ b/src/test/java/org/opentripplanner/middleware/connecteddataplatform/ConnectedDataPlatformTest.java @@ -146,7 +146,8 @@ public static void tearDown() { */ @Test void canStageFirstUpload() { - TripHistoryUploadJob.stageUploadHours(); + TripHistoryUploadJob job = new TripHistoryUploadJob(); + job.stageUploadHours(); TripHistoryUpload tripHistoryUpload = TripHistoryUpload.getFirst(); assertNotNull(tripHistoryUpload); assertTrue(PREVIOUS_WHOLE_HOUR_FROM_NOW.isEqual(tripHistoryUpload.uploadHour)); @@ -165,8 +166,9 @@ void canCreateZipFileWithContent() throws Exception { String batchId = "783726"; tripRequest = PersistenceTestUtils.createTripRequest(userId, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); tripSummary = PersistenceTestUtils.createTripSummary(tripRequest.id, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -209,8 +211,9 @@ void canCreateZipFileForTripSummaryWithError() throws Exception { String batchId = "783726"; tripRequest = PersistenceTestUtils.createTripRequest(userId, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); tripSummary = PersistenceTestUtils.createTripSummaryWithError(tripRequest.id, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -255,8 +258,9 @@ void canCreateContentWithTripRequestWithMaxModes() throws Exception { tripRequests.add(tripRequestOne); tripRequests.add(tripRequestTwo); tripSummary = PersistenceTestUtils.createTripSummary(tripRequestOne.id, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -294,8 +298,9 @@ void canRemoveUsersTripDataFromFile() throws Exception { tripRequests.add(tripRequestOne); tripSummary = PersistenceTestUtils.createTripSummary(tripRequestOne.id, batchIdTwo, PREVIOUS_WHOLE_HOUR_FROM_NOW); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -313,7 +318,9 @@ void canRemoveUsersTripDataFromFile() throws Exception { assertTrue(anonymizedTripRequests.stream().anyMatch(anonymizedTripRequest -> anonymizedTripRequest.requestId.equals(batchIdTwo))); ConnectedDataManager.removeUsersTripHistory(userIdOne); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + + job.stageUploadHours(); + job.runInnerLogic(); fileContents = getContentsOfFileInZip( tempFile, getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_JSON_FILE_NAME) @@ -333,8 +340,9 @@ void canRemoveUsersTripDataFromFile() throws Exception { void canCorrectlyStageHours() { LocalDateTime sevenHoursAgo = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(7); List betweenHours = DateTimeUtils.getHoursBetween(sevenHoursAgo, PREVIOUS_WHOLE_HOUR_FROM_NOW); - createTripHistoryUpload(sevenHoursAgo, TripHistoryUploadStatus.PENDING); - TripHistoryUploadJob.stageUploadHours(); + createTripHistoryUpload(sevenHoursAgo, IntervalUploadStatus.PENDING); + TripHistoryUploadJob job = new TripHistoryUploadJob(); + job.stageUploadHours(); assertEquals( betweenHours.size() + 1, // plus one for an hour ago. Persistence.tripHistoryUploads.getCountFiltered(Filters.gt("uploadHour", sevenHoursAgo)) @@ -348,8 +356,9 @@ void canCorrectlyStageHours() { @Test void canCorrectlyStageDays() { LocalDateTime fourDaysAgo = LocalDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(4); - createTripHistoryUpload(fourDaysAgo, TripHistoryUploadStatus.PENDING); - TripHistoryUploadJob.stageUploadDays(); + createTripHistoryUpload(fourDaysAgo, IntervalUploadStatus.PENDING); + TripHistoryUploadJob job = new TripHistoryUploadJob(); + job.stageUploadDays(); assertEquals( 1, // If system is down, it will only upload the previous day. Persistence.tripHistoryUploads.getCountFiltered(Filters.gt("uploadHour", fourDaysAgo)) @@ -368,7 +377,7 @@ void canRemoveTripHistoryViaAPI() throws Exception { // Set backstop. This allows dates after this to trigger an upload. createTripHistoryUpload( LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(20), - TripHistoryUploadStatus.COMPLETED + IntervalUploadStatus.COMPLETED ); // Create OTP user and trip data. @@ -403,13 +412,14 @@ void canStreamTheCorrectNumberOfTripsHourly() throws Exception { // Set backstop. This allows dates after this to trigger an upload. createTripHistoryUpload( LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(20), - TripHistoryUploadStatus.COMPLETED + IntervalUploadStatus.COMPLETED ); // Create trip history upload for required date. - createTripHistoryUpload(PREVIOUS_WHOLE_HOUR_FROM_NOW, TripHistoryUploadStatus.PENDING); + createTripHistoryUpload(PREVIOUS_WHOLE_HOUR_FROM_NOW, IntervalUploadStatus.PENDING); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -436,24 +446,25 @@ void canStreamTheCorrectNumberOfEntitiesDaily() throws Exception { // Set backstop. This allows dates after this to trigger an upload. createTripHistoryUpload( LocalDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(2), - TripHistoryUploadStatus.COMPLETED + IntervalUploadStatus.COMPLETED ); // Create trip history upload for required date. - createTripHistoryUpload(PREVIOUS_DAY, TripHistoryUploadStatus.PENDING); + createTripHistoryUpload(PREVIOUS_DAY, IntervalUploadStatus.PENDING); - TripHistoryUploadJob.processTripHistory( + TripHistoryUploadJob job = new TripHistoryUploadJob( ReportingInterval.DAILY, Map.of("TripRequest", "interval", "TripSummary", "interval") ); + job.runInnerLogic(); - String tripFileName = ConnectedDataManager.getFilePrefix(ReportingInterval.DAILY, PREVIOUS_DAY, "TripRequest"); + String tripFileName = ConnectedDataManager.getDailyFileName(PREVIOUS_DAY, "TripRequest"); zipFileName = String.join(".", tripFileName, ZIP_FILE_EXTENSION); tempFile = String.join("/", FileUtils.getTempDirectory().getAbsolutePath(), zipFileName); String fileContents = getContentsOfFileInZip(tempFile, String.join(".", tripFileName, JSON_FILE_EXTENSION)); MatcherAssert.assertThat(fileContents, matchesSnapshot()); - String summaryFileName = ConnectedDataManager.getFilePrefix(ReportingInterval.DAILY, PREVIOUS_DAY, "TripSummary"); + String summaryFileName = ConnectedDataManager.getDailyFileName(PREVIOUS_DAY, "TripSummary"); summaryZipFileName = String.join(".", summaryFileName, ZIP_FILE_EXTENSION); summaryTempFile = String.join("/", FileUtils.getTempDirectory().getAbsolutePath(), summaryZipFileName); @@ -473,23 +484,24 @@ void canStreamTheCorrectNumberOfEntities() throws Exception { // Set backstop. This allows dates after this to trigger an upload. createTripHistoryUpload( LocalDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(2), - TripHistoryUploadStatus.COMPLETED + IntervalUploadStatus.COMPLETED ); // Create trip history upload for required date. - createTripHistoryUpload(PREVIOUS_DAY, TripHistoryUploadStatus.PENDING); + createTripHistoryUpload(PREVIOUS_DAY, IntervalUploadStatus.PENDING); - TripHistoryUploadJob.processTripHistory( + TripHistoryUploadJob job = new TripHistoryUploadJob( ReportingInterval.DAILY, Map.of("TripSummary", "all") ); + job.runInnerLogic(); - String tripFileName = ConnectedDataManager.getFilePrefix(ReportingInterval.DAILY, PREVIOUS_DAY, "TripRequest"); + String tripFileName = ConnectedDataManager.getDailyFileName(PREVIOUS_DAY, "TripRequest"); zipFileName = String.join(".", tripFileName, ZIP_FILE_EXTENSION); tempFile = String.join("/", FileUtils.getTempDirectory().getAbsolutePath(), zipFileName); // Trips file should not exist because trips are not requested in the report for this test. assertFalse(new File(tempFile).exists()); - String summaryFileName = ConnectedDataManager.getFilePrefix(ReportingInterval.DAILY, PREVIOUS_DAY, "TripSummary"); + String summaryFileName = ConnectedDataManager.getDailyFileName(PREVIOUS_DAY, "TripSummary"); summaryZipFileName = String.join(".", summaryFileName, ZIP_FILE_EXTENSION); summaryTempFile = String.join("/", FileUtils.getTempDirectory().getAbsolutePath(), summaryZipFileName); @@ -500,9 +512,9 @@ void canStreamTheCorrectNumberOfEntities() throws Exception { } /** Create trip history upload for required date. */ - private static void createTripHistoryUpload(LocalDateTime time, TripHistoryUploadStatus status) { + private static void createTripHistoryUpload(LocalDateTime time, IntervalUploadStatus status) { TripHistoryUpload upload = new TripHistoryUpload(time); - upload.status = status.getValue(); + upload.status = status; Persistence.tripHistoryUploads.create(upload); } @@ -564,8 +576,9 @@ void canHandleMissingPlaceCoordinates() throws Exception { tripSummary = new TripSummary(planResponse.plan, planResponse.error, tripRequestOne.id, batchId); Persistence.tripSummaries.create(tripSummary); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", @@ -601,8 +614,9 @@ void canHandleMissingModes() throws Exception { tripRequests.clear(); tripRequests.add(tripRequestOne); tripSummary = PersistenceTestUtils.createTripSummary(tripRequestOne.id, batchId, PREVIOUS_WHOLE_HOUR_FROM_NOW); - TripHistoryUploadJob.stageUploadHours(); - TripHistoryUploadJob.processTripHistory(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + TripHistoryUploadJob job = new TripHistoryUploadJob(ReportingInterval.HOURLY, ANON_TRIP_REQ_ENTITIES); + job.stageUploadHours(); + job.runInnerLogic(); zipFileName = getHourlyFileName(PREVIOUS_WHOLE_HOUR_FROM_NOW, ConnectedDataManager.ANON_TRIP_ZIP_FILE_NAME); tempFile = String.join( "/", diff --git a/src/test/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJobTest.java b/src/test/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJobTest.java new file mode 100644 index 000000000..19ff2f3ef --- /dev/null +++ b/src/test/java/org/opentripplanner/middleware/triptracker/TripSurveyUploadJobTest.java @@ -0,0 +1,168 @@ +package org.opentripplanner.middleware.triptracker; + +import jersey.repackaged.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager; +import org.opentripplanner.middleware.connecteddataplatform.IntervalUploadStatus; +import org.opentripplanner.middleware.models.IntervalUpload; +import org.opentripplanner.middleware.models.TripSurveyUpload; +import org.opentripplanner.middleware.typeform.Responses; +import org.opentripplanner.middleware.typeform.ResponsesTest; +import org.opentripplanner.middleware.typeform.Response; +import org.opentripplanner.middleware.persistence.Persistence; +import org.opentripplanner.middleware.testutils.OtpMiddlewareTestEnvironment; +import org.opentripplanner.middleware.utils.DateTimeUtils; +import org.opentripplanner.middleware.utils.FileUtils; +import org.opentripplanner.middleware.utils.S3Utils; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; +import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.getDailyFileName; +import static org.opentripplanner.middleware.typeform.ResponseTest.makeResponse; +import static org.opentripplanner.middleware.typeform.ResponsesTest.EXPECTED_HEADER; +import static org.opentripplanner.middleware.utils.FileUtils.getContentsOfFileInZip; + +class TripSurveyUploadJobTest extends OtpMiddlewareTestEnvironment { + + private static TripSurveyUpload surveyUploadTwoDaysAgo; + private static TripSurveyUpload surveyUploadThreeDaysAgo; + private static List surveyUploads; + + private static final LocalDateTime PREVIOUS_WHOLE_DAY_FROM_NOW = DateTimeUtils.getPreviousDayFrom(LocalDateTime.now()); + + private String tempFile; + private String zipFileName; + + @BeforeAll + public static void setUp() { + assumeTrue(IS_END_TO_END); + + // Create records of previous trip survey uploads. + LocalDateTime twoDaysAgo = LocalDateTime.ofInstant(Instant.now().minus(2, ChronoUnit.DAYS), DateTimeUtils.getOtpZoneId()); + LocalDateTime threeDaysAgo = LocalDateTime.ofInstant(Instant.now().minus(3, ChronoUnit.DAYS), DateTimeUtils.getOtpZoneId()); + + surveyUploadTwoDaysAgo = new TripSurveyUpload("upload-2", twoDaysAgo, IntervalUploadStatus.PENDING); + surveyUploadThreeDaysAgo = new TripSurveyUpload("upload-3", threeDaysAgo, IntervalUploadStatus.COMPLETED); + + surveyUploads = Lists.newArrayList(surveyUploadTwoDaysAgo, surveyUploadThreeDaysAgo); + + Persistence.tripSurveyUploads.create(surveyUploadTwoDaysAgo); + Persistence.tripSurveyUploads.create(surveyUploadThreeDaysAgo); + } + + @AfterAll + public static void tearDown() { + assumeTrue(IS_END_TO_END); + + // Delete trip survey upload entries + surveyUploads.forEach(upload -> { + TripSurveyUpload storedUpload = Persistence.tripSurveyUploads.getById(upload.id); + if (storedUpload != null) { + Persistence.tripSurveyUploads.removeById(storedUpload.id); + } + }); + } + + @AfterEach + void afterEach() throws Exception { + assumeTrue(IS_END_TO_END); + + // Delete trip survey upload entries that were added. + for (TripSurveyUpload upload : Persistence.tripSurveyUploads.getAll()) { + if (!(upload.id.equals(surveyUploadTwoDaysAgo.id) && !upload.id.equals(surveyUploadThreeDaysAgo.id))) { + Persistence.tripSurveyUploads.removeById(upload.id); + } + } + + // Delete any files we created + if (tempFile != null) { + FileUtils.deleteFile(tempFile); + tempFile = null; + } + if (zipFileName != null) { + S3Utils.deleteObject( + ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME, + String.format("%s/%s", ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME, zipFileName) + ); + zipFileName = null; + } + } + + /** + * Make sure that the first upload is created and contains the correct upload date. + */ + @Test + void canStageFirstUpload() { + // Just for this test, delete the previous days. + Persistence.tripSurveyUploads.removeById(surveyUploadTwoDaysAgo.id); + Persistence.tripSurveyUploads.removeById(surveyUploadThreeDaysAgo.id); + + TripSurveyUploadJob job = new TripSurveyUploadJob(); + job.stageUploadDays(); + TripSurveyUpload upload = IntervalUpload.getFirst(Persistence.tripSurveyUploads); + assertNotNull(upload); + assertTrue(PREVIOUS_WHOLE_DAY_FROM_NOW.isEqual(upload.uploadHour)); + } + + @Test + void canRunJob() { + assumeTrue(IS_END_TO_END); + + // After running the job, assuming that API calls were successful, + // the upload records should be updated. + + TripSurveyUploadJob job = new TripSurveyUploadJob(localDateTime -> createSurveyApiResponse(), () -> EXPECTED_HEADER); + job.run(); + + TripSurveyUpload upload = job.getLastUploadCreated(); + assertNotNull(upload); + assertTrue(PREVIOUS_WHOLE_DAY_FROM_NOW.isEqual(upload.uploadHour)); + assertEquals(IntervalUploadStatus.COMPLETED, upload.status); + } + + /** + * Confirm that a single zip file is created which contains the compiled survey responses (CSV). Also confirm that the contents + * written to the CSV file is correct and covers a single day's worth of responses. + */ + @Test + void canCreateZipFileWithContent() throws Exception { + assumeTrue(IS_END_TO_END); + + Responses apiResponse = createSurveyApiResponse(); + + TripSurveyUploadJob job = new TripSurveyUploadJob(localDateTime -> apiResponse, () -> EXPECTED_HEADER); + job.stageUploadDays(); + assertTrue(job.processSurveyHistory(job.getLastUploadCreated(), apiResponse)); + zipFileName = getDailyFileName(PREVIOUS_WHOLE_DAY_FROM_NOW, TripSurveyUploadJob.SURVEY_ZIP_FILE_PREFIX + ".zip"); + tempFile = String.join( + "/", + FileUtils.getTempDirectory().getAbsolutePath(), + zipFileName + ); + String fileContents = getContentsOfFileInZip( + tempFile, + getDailyFileName(PREVIOUS_WHOLE_DAY_FROM_NOW, TripSurveyUploadJob.SURVEY_ZIP_FILE_PREFIX + ".csv") + ); + assertEquals(ResponsesTest.getExpectedCsv(), fileContents); + } + + private static Responses createSurveyApiResponse() { + Response response1 = makeResponse(); + Response response2 = makeResponse(); + Responses apiResponse = new Responses(); + apiResponse.items = List.of(response1, response2); + apiResponse.isTest = true; + return apiResponse; + } +} + diff --git a/src/test/java/org/opentripplanner/middleware/typeform/FormTest.java b/src/test/java/org/opentripplanner/middleware/typeform/FormTest.java new file mode 100644 index 000000000..6bd989b06 --- /dev/null +++ b/src/test/java/org/opentripplanner/middleware/typeform/FormTest.java @@ -0,0 +1,24 @@ +package org.opentripplanner.middleware.typeform; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class FormTest { + @Test + void toCsvHeader() { + Form form = new Form(); + form.hidden = List.of("user_id", "notification_id"); + form.fields = List.of( + new Field("field-1", "Hi, what is your name?"), // Questions can include commas + new Field("field-2", "Where do you live?") + ); + + assertEquals( + "id,status,started,completed,user_id,notification_id,\"Hi, what is your name?\",\"Where do you live?\"", + form.toCsvHeader() + ); + } +} diff --git a/src/test/java/org/opentripplanner/middleware/typeform/ResponseTest.java b/src/test/java/org/opentripplanner/middleware/typeform/ResponseTest.java new file mode 100644 index 000000000..821b7ba9a --- /dev/null +++ b/src/test/java/org/opentripplanner/middleware/typeform/ResponseTest.java @@ -0,0 +1,51 @@ +package org.opentripplanner.middleware.typeform; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ResponseTest { + + public static final String EXPECTED_CSV_ROW = "response-id-0,completed,2024-10-25T15:37:42Z,2024-10-25T15:46:27Z,notification-id-1,trip-id-2,user-id-3,\"Field1 choice\",\"Field2, ChoiceA;Field2, ChoiceB\",\"Field3 answer\""; + + + @Test + void toCsvRow() { + Response response = makeResponse(); + assertEquals(EXPECTED_CSV_ROW, response.toCsvRow()); + } + + public static Response makeResponse() { + Response response = new Response(); + response.landed_at = "2024-10-25T15:37:42Z"; + response.submitted_at = "2024-10-25T15:46:27Z"; + response.response_id = "response-id-0"; + response.response_type = "completed"; + response.hidden = new Response.Hidden(); + response.hidden.notification_id = "notification-id-1"; + response.hidden.trip_id = "trip-id-2"; + response.hidden.user_id = "user-id-3"; + + Response.Answer answer1 = new Response.Answer(); + answer1.type = "choice"; + answer1.choice = new Response.Choice(); + answer1.choice.label = "Field1 choice"; + answer1.field = new Field("field-id-1"); + + Response.Answer answer2 = new Response.Answer(); + answer2.type = "choices"; + answer2.choices = new Response.Choices(); + answer2.choices.labels = List.of("Field2, ChoiceA", "Field2, ChoiceB"); // Answers can include commas + answer2.field = new Field("field-id-2"); + + Response.Answer answer3 = new Response.Answer(); + answer3.type = "text"; + answer3.text = "Field3 answer"; + answer3.field = new Field("field-id-3"); + + response.answers = List.of(answer1, answer2, answer3); + return response; + } +} diff --git a/src/test/java/org/opentripplanner/middleware/typeform/ResponsesTest.java b/src/test/java/org/opentripplanner/middleware/typeform/ResponsesTest.java new file mode 100644 index 000000000..c422f2d17 --- /dev/null +++ b/src/test/java/org/opentripplanner/middleware/typeform/ResponsesTest.java @@ -0,0 +1,28 @@ +package org.opentripplanner.middleware.typeform; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opentripplanner.middleware.typeform.ResponseTest.EXPECTED_CSV_ROW; +import static org.opentripplanner.middleware.typeform.ResponseTest.makeResponse; + +public class ResponsesTest { + + public static final String EXPECTED_HEADER = "id,status,started,completed,notification_id,trip_id,user_id,field1,field2"; + + @Test + void toCsv() { + Response response1 = makeResponse(); + Response response2 = makeResponse(); + Responses apiResponse = new Responses(); + apiResponse.items = List.of(response1, response2); + + assertEquals(getExpectedCsv(), apiResponse.toCsv(EXPECTED_HEADER)); + } + + public static String getExpectedCsv() { + return String.format("%s%n%s%n%s%n", EXPECTED_HEADER, EXPECTED_CSV_ROW, EXPECTED_CSV_ROW); + } +} diff --git a/src/test/java/org/opentripplanner/middleware/typeform/TypeFormDispatcherTest.java b/src/test/java/org/opentripplanner/middleware/typeform/TypeFormDispatcherTest.java new file mode 100644 index 000000000..f3855db41 --- /dev/null +++ b/src/test/java/org/opentripplanner/middleware/typeform/TypeFormDispatcherTest.java @@ -0,0 +1,23 @@ +package org.opentripplanner.middleware.typeform; + +import org.junit.jupiter.api.Test; +import org.opentripplanner.middleware.testutils.OtpMiddlewareTestEnvironment; + +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TypeFormDispatcherTest extends OtpMiddlewareTestEnvironment { + @Test + void canMakeResponsesParams() { + // October 25, 2024 00:00, US Pacific Daylight Time + // CI sets OTP-middleware in that timezone. + // The timestamps in the expected string below correspond to 00:00 and 23:59, also in that timezone. + LocalDateTime date = LocalDateTime.of(2024, 10, 25, 0, 0); + + String s = TypeFormDispatcher.responsesParams(date); + + // Using epoch timestamps to avoid conversions from local to UTC time which TypeForm requires. + assertEquals("?page_size=1000&since=1729839600&until=1729925999", s); + } +}