Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer Trip Surveys to S3 #267

Merged
merged 47 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
defbc4e
docs: Introduce new survey API config params.
binh-dam-ibigroup Nov 6, 2024
629de0c
refactor(IntervalUploadJob): Introduce abstract classes for periodic …
binh-dam-ibigroup Nov 7, 2024
a5705e5
refactor(TripHistoryUploadJob): Reuse IntervalUploadJob.
binh-dam-ibigroup Nov 7, 2024
6298eb1
refactor(TripHistoryUpload): Extend from IntervalUpload.
binh-dam-ibigroup Nov 7, 2024
505cfba
refactor(TripSurveyUpload): Introduce class for trip survey upload jo…
binh-dam-ibigroup Nov 7, 2024
9a9ffde
refactor(Persistence): Add new persistence class for trip survey uplo…
binh-dam-ibigroup Nov 7, 2024
67b30bd
refactor(TypeFormTripSurveyResponse): Add classes for survey responses.
binh-dam-ibigroup Nov 7, 2024
99ef868
refactor(TypeFormTripSurveyResponse): Add CSV output.
binh-dam-ibigroup Nov 7, 2024
524b2f9
refactor(TypeFormTripSurveyApiResponse): Add CSV output.
binh-dam-ibigroup Nov 7, 2024
0d10cc9
test(TripSurveyUploadJob): Add tests for TripSurveyUploadJob.
binh-dam-ibigroup Nov 8, 2024
f21cf6a
refactor(TripSurveyUploadJob): Integrate CSV dumping into inner logic.
binh-dam-ibigroup Nov 8, 2024
0fb1f2d
refactor(TripSurveyUploadJob): Rework logic for download vs tests.
binh-dam-ibigroup Nov 11, 2024
256a4ab
refactor(TripSurveyUploadJob): Reuse getFilePrefix function.
binh-dam-ibigroup Nov 11, 2024
6f11275
refactor(IntervalUploadJob): Reuse getFirst, getLast helpers.
binh-dam-ibigroup Nov 11, 2024
d94a681
refactor(IntervalUploadJob): Orient code toward processing intervals.
binh-dam-ibigroup Nov 11, 2024
a2b9c97
fix(TripSurveyUploadJob): Generate correct TypeForm URL.
binh-dam-ibigroup Nov 12, 2024
733edda
refactor(typeform.Form): Introduce typeform package with Form response.
binh-dam-ibigroup Nov 13, 2024
aa15498
refactor(typeform.Response): Move TypeForm response classes to that p…
binh-dam-ibigroup Nov 13, 2024
355100b
refactor(TripSurveyUploadJob): Support downloading survey headers.
binh-dam-ibigroup Nov 13, 2024
a275729
refactor(TripSurveyUploadJob): Refactor id and token check.
binh-dam-ibigroup Nov 13, 2024
bbb5810
fix(typeform.Response): Surround answers with quotes.
binh-dam-ibigroup Nov 13, 2024
dcddc13
fix(typeform.Response): Support text answers.
binh-dam-ibigroup Nov 13, 2024
0b56b4e
feat(OtpMiddlewareMain): Schedule trip survey upload job.
binh-dam-ibigroup Nov 13, 2024
e1d73fd
refactor(TripSurveyUploadJo): Extract api request code.
binh-dam-ibigroup Nov 13, 2024
c19bb81
refactor(ConnectedDataManager): Perform light refactoring.
binh-dam-ibigroup Nov 14, 2024
9ca61b2
fix(TripSurveyUploadJob): Pass midnight-23:59:59 local as timestamps.
binh-dam-ibigroup Nov 14, 2024
5efcc81
refactor(IntervalUploadFiles): Extract logic handling file names, zip…
binh-dam-ibigroup Nov 14, 2024
49dd51f
Merge branch 'dev' into transfer-trip-surveys
binh-dam-ibigroup Nov 14, 2024
33d3ab6
docs: Update config variables.
binh-dam-ibigroup Nov 14, 2024
359b844
refactor(ConnectedDataManager): Remove class prefixes.
binh-dam-ibigroup Nov 14, 2024
3ace3db
refactor(IntervalUploadJob): Inline getFilePrefix method.
binh-dam-ibigroup Nov 14, 2024
4173abe
style(IntervalUploadFiles): Remove uneeded line breaks.
binh-dam-ibigroup Nov 14, 2024
3704d01
docs(env.yml.tmp): Update config variables.
binh-dam-ibigroup Nov 14, 2024
b2190fc
test(TripSurveyUploadJob): Harden test with uploads from previous days.
binh-dam-ibigroup Nov 14, 2024
b6c8a08
refactor(IntervalUploadStatus): Rename from TripHistoryUploadStatus, …
binh-dam-ibigroup Nov 14, 2024
2e30270
refactor(IntervalUploadJob): Perform light refactorings.
binh-dam-ibigroup Nov 14, 2024
e2c2982
refactor(IntervalUploadJob): Add method for marking uploads complete.
binh-dam-ibigroup Nov 14, 2024
ed17dbb
refactor(IntervalUpload): Rename method.
binh-dam-ibigroup Nov 14, 2024
d746c8d
refactor(TripHistoryUpload): Remove redundant code
binh-dam-ibigroup Nov 14, 2024
8c5ecf0
style(ConnectedDataPlatformTest): Fix indent.
binh-dam-ibigroup Dec 2, 2024
6accf8a
refactor(ConnectedDataPlatformTest): Add comments regarding TypeForm …
binh-dam-ibigroup Dec 2, 2024
0b10963
refactor(IntervalUpload): Fix comment typo.
binh-dam-ibigroup Dec 2, 2024
1323648
refactor: Move TypeForm types to higher-level package.
binh-dam-ibigroup Dec 2, 2024
c27a5de
refactor: Move TypeForm-specific logic to TypeFormDispatcher.
binh-dam-ibigroup Dec 2, 2024
79b4c7a
refactor(TypeFormDispatcherTest): Clarify comments about expected tim…
binh-dam-ibigroup Dec 2, 2024
18ecd9f
refactor(TypeFormDispatcherTest): Inherit from OTPMiddlewareTestEnvir…
binh-dam-ibigroup Dec 3, 2024
65b64c6
refactor(TypeFormDispatcherTest): Add comment for supporting over 100…
binh-dam-ibigroup Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ The special E2E client settings should be defined in `env.yml`:
| TRIP_INSTRUCTION_UPCOMING_RADIUS | integer | Optional | 10 | The radius in meters under which an upcoming instruction is given. |
| TRIP_SURVEY_ID | string | Optional | abcdef123y | The ID of a survey (on the platform of your choice) for trip-related feedback. |
| TRIP_SURVEY_SUBDOMAIN | string | Optional | abcabc12a | The subdomain of a website where the trip-related surveys are administered. |
| TRIP_SURVEY_API_TOKEN | string | Optional | abcdef123y | The token for the survey API for downloading responses. |
| TWILIO_ACCOUNT_SID | string | Optional | your-account-sid | Twilio settings available at: https://twilio.com/user/account |
| TRUSTED_COMPANION_CONFIRMATION_PAGE_URL | string | Optional | https://otp-server.example.com/trusted/confirmation | URL to the trusted companion confirmation page. This page should support handling an error URL parameter. |
| TWILIO_AUTH_TOKEN | string | Optional | your-auth-token | Twilio settings available at: https://twilio.com/user/account |
Expand Down
2 changes: 2 additions & 0 deletions configurations/default/env.yml.tmp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ TRIP_INSTRUCTION_UPCOMING_RADIUS: 10
# Survey ID and domain that is offered after users complete certain trips.
TRIP_SURVEY_ID: abcdef123y
TRIP_SURVEY_SUBDOMAIN: abcabc12a
# Survey API credentials to download responses
TRIP_SURVEY_API_TOKEN: token-12345c

US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_URL: https://bus.notifier.example.com
US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_KEY: your-key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opentripplanner.middleware.persistence.Persistence;
import org.opentripplanner.middleware.tripmonitor.jobs.MonitorAllTripsJob;
import org.opentripplanner.middleware.triptracker.TripSurveySenderJob;
import org.opentripplanner.middleware.triptracker.TripSurveyUploadJob;
import org.opentripplanner.middleware.utils.ConfigUtils;
import org.opentripplanner.middleware.utils.HttpUtils;
import org.opentripplanner.middleware.utils.Scheduler;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class OtpMiddlewareMain {
public static final String API_PREFIX = "/api/";
public static boolean inTestEnvironment = false;

public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws IOException {
// Load configuration.
ConfigUtils.loadConfig(args);

Expand All @@ -77,8 +78,9 @@ public static void main(String[] args) throws IOException, InterruptedException
// Schedule trip history uploads.
ConnectedDataManager.scheduleTripHistoryUploadJob();

// Schedule recurring Monitor All Trips Job.
// Schedule recurring jobs.
// TODO: Determine whether this should go in some other process.

MonitorAllTripsJob monitorAllTripsJob = new MonitorAllTripsJob();
Scheduler.scheduleJob(
monitorAllTripsJob,
Expand All @@ -87,19 +89,28 @@ public static void main(String[] args) throws IOException, InterruptedException
TimeUnit.MINUTES
);

// Schedule recurring job for post-trip surveys, once every half-hour to catch recently completed trips.
// TODO: Determine whether this should go in some other process.
TripSurveySenderJob tripSurveySenderJob = new TripSurveySenderJob();
Scheduler.scheduleJob(
tripSurveySenderJob,
0,
30,
TimeUnit.MINUTES
);

JymDyerIBI marked this conversation as resolved.
Show resolved Hide resolved
if (TripSurveyUploadJob.isConfigured()) {
LOG.info("Scheduling trip survey upload every day");
TripSurveyUploadJob tripSurveyUploadJob = new TripSurveyUploadJob();
Scheduler.scheduleJob(
tripSurveyUploadJob,
0,
1,
TimeUnit.DAYS
);
}
}
}

private static void initializeHttpEndpoints() throws IOException, InterruptedException {
private static void initializeHttpEndpoints() throws IOException {
// Must start spark explicitly to use spark-swagger.
// https://github.com/manusant/spark-swagger#endpoints-binding
Service spark = Service.ignite().port(Service.SPARK_DEFAULT_PORT);
Expand All @@ -120,7 +131,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
new CDPFilesController(API_PREFIX),
new OtpRequestProcessor("/otp", OtpVersion.OTP2),
new OtpRequestProcessor("/otp2", OtpVersion.OTP2)
// TODO Add other models.
// Add other endpoints as needed.
))
// Spark-swagger auto-generates a swagger document at localhost:4567/doc.yaml.
// (That path is not configurable.)
Expand All @@ -138,7 +149,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
return Files.readString(publicDocPath);
});

/**
/*
* End point to receive project errors as soon as they are processed by Bugsnag. Information on Bugsnag's
* webhook can be found here: https://docs.bugsnag.com/product/integrations/data-forwarding/webhook/
*
Expand All @@ -153,7 +164,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
return "";
});

/**
/*
* End point to handle redirecting to the correct registration page from Auth0 as described here:
*
* https://auth0.com/docs/auth0-email-services/customize-email-templates#dynamic-redirect-to-urls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.bson.conversions.Bson;
import org.opentripplanner.middleware.bugsnag.BugsnagReporter;
import org.opentripplanner.middleware.controllers.api.OtpRequestProcessor;
import org.opentripplanner.middleware.models.IntervalUpload;
import org.opentripplanner.middleware.models.TripHistoryUpload;
import org.opentripplanner.middleware.models.TripRequest;
import org.opentripplanner.middleware.models.TripSummary;
Expand All @@ -19,17 +20,14 @@
import org.opentripplanner.middleware.utils.DateTimeUtils;
import org.opentripplanner.middleware.utils.FileUtils;
import org.opentripplanner.middleware.utils.JsonUtils;
import org.opentripplanner.middleware.utils.S3Utils;
import org.opentripplanner.middleware.utils.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -133,7 +131,9 @@ public static void removeUsersTripHistory(String userId) {
}
// Get all hourly windows that have already been earmarked for uploading.
Set<LocalDateTime> incompleteUploadHours = new HashSet<>();
getIncompleteUploads().forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour));
IntervalUpload
.getIncompleteUploads(Persistence.tripHistoryUploads)
.forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour));
// Save all new hourly windows for uploading.
Set<LocalDateTime> newHourlyWindows = Sets.difference(userTripHourlyWindows, incompleteUploadHours);
TripHistoryUpload first = TripHistoryUpload.getFirst();
Expand Down Expand Up @@ -383,26 +383,25 @@ public static int compileAndUploadTripHistory(

// Not null because ReportedEntities only contains entries that correspond to persistenceMap.
TypedPersistence<?> typedPersistence = ReportedEntities.persistenceMap.get(entityName);
String filePrefix = getFilePrefix(reportingInterval, periodStart, entityName);
String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath();

String zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION);
String tempZipFile = String.join(File.separator, tempFileFolder, zipFileName);
String coreFileName = entityName;
boolean anonymize = isAnonymizedInterval(reportingMode);
boolean isTripRequest = "TripRequest".equals(entityName);
if (isTripRequest && anonymize) {
// Anonymized trip requests are stored under a special file name.
coreFileName = ANON_TRIP_FILE_NAME;
}

String jsonFileName = String.join(".", filePrefix, JSON_FILE_EXTENSION);
String tempDataFile = String.join(File.separator, tempFileFolder, jsonFileName);
IntervalUploadFiles uploadFiles = new IntervalUploadFiles(
getFilePrefix(reportingInterval, periodStart, coreFileName),
JSON_FILE_EXTENSION,
isTest
);

try {
try (uploadFiles) {
String tempDataFile = uploadFiles.getTempDataFile();
int recordsWritten = Integer.MIN_VALUE;

if ("TripRequest".equals(entityName)) {
// Anonymized trip requests are stored under a special file name.
boolean anonymize = isAnonymizedInterval(reportingMode);
if (anonymize) {
tempDataFile = tempDataFile.replace("TripRequest.json", ANON_TRIP_JSON_FILE_NAME);
tempZipFile = tempZipFile.replace("TripRequest.zip", ANON_TRIP_ZIP_FILE_NAME);
}

if (isTripRequest) {
// TripRequests must be processed separately because they must be combined, one per batchId.
// Note: Anonymized trips include TripRequest and TripSummary in the same entity.
recordsWritten = streamTripsToFile(tempDataFile, periodStart, reportingInterval, anonymize);
Expand All @@ -423,43 +422,19 @@ public static int compileAndUploadTripHistory(

if (recordsWritten > 0 || "true".equals(CONNECTED_DATA_PLATFORM_UPLOAD_BLANK_FILES)) {
// Upload the file if records were written or config setting requires uploading blank files.
FileUtils.addSingleFileToZip(tempDataFile, tempZipFile);
S3Utils.putObject(
CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME,
String.format(
"%s/%s",
getUploadFolderName(
CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME,
CONNECTED_DATA_PLATFORM_FOLDER_GROUPING,
periodStart.toLocalDate()
),
zipFileName
),
new File(tempZipFile)
);
uploadFiles.compressAndUpload(getUploadFolderName(
CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME,
CONNECTED_DATA_PLATFORM_FOLDER_GROUPING,
periodStart.toLocalDate()
));
}
allRecordsWritten += recordsWritten;
} catch (Exception e) {
} catch (IOException e) {
BugsnagReporter.reportErrorToBugsnag(
String.format("Failed to process trip data for (%s)", periodStart),
String.format("Failed to write trip data for (%s)", periodStart),
e
);
return Integer.MIN_VALUE;
} finally {
// Delete the temporary files. This is done here in case the S3 upload fails.
try {
LOG.error("Deleting CDP zip file {} as an error occurred while processing the data it was supposed to contain.", tempZipFile);
FileUtils.deleteFile(tempDataFile);
if (!isTest) {
FileUtils.deleteFile(tempZipFile);
} else {
LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.",
tempZipFile
);
}
} catch (IOException e) {
LOG.error("Failed to delete temp files", e);
}
}
}

Expand All @@ -471,37 +446,25 @@ public static boolean isAnonymizedInterval(String reportingMode) {
return reportingModes.contains("interval") && reportingModes.contains("anonymized");
}

/**
* Get all incomplete trip history uploads.
*/
public static List<TripHistoryUpload> getIncompleteUploads() {
FindIterable<TripHistoryUpload> tripHistoryUploads = Persistence.tripHistoryUploads.getFiltered(
Filters.ne("status", TripHistoryUploadStatus.COMPLETED.getValue())
);
return tripHistoryUploads.into(new ArrayList<>());
public static String getDailyFileName(LocalDateTime date, String fileNameSuffix) {
return formatFileName(date, "yyyy-MM-dd", fileNameSuffix);
}

public static String getHourlyFileName(LocalDateTime date, String fileNameSuffix) {
final String DEFAULT_DATE_FORMAT_PATTERN = "yyyy-MM-dd-HH";
return String.format(
"%s-%s",
getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN),
fileNameSuffix
);
return formatFileName(date, "yyyy-MM-dd-HH", fileNameSuffix);
}

/**
* Produce file name without path or extension.
*/
public static String getFilePrefix(ReportingInterval reportingInterval, LocalDateTime date, String entityName) {
final String DEFAULT_DATE_FORMAT_PATTERN = isReportingDaily(reportingInterval)
? "yyyy-MM-dd"
: "yyyy-MM-dd-HH";
return String.format(
"%s-%s",
getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN),
entityName
);
return isReportingDaily(reportingInterval)
? getDailyFileName(date, entityName)
: getHourlyFileName(date, entityName);
}

public static String formatFileName(LocalDateTime date, String datePattern, String suffix) {
return String.format("%s-%s", getStringFromDate(date, datePattern), suffix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opentripplanner.middleware.connecteddataplatform;

import org.opentripplanner.middleware.bugsnag.BugsnagReporter;
import org.opentripplanner.middleware.utils.FileUtils;
import org.opentripplanner.middleware.utils.S3Exception;
import org.opentripplanner.middleware.utils.S3Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME;
import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.ZIP_FILE_EXTENSION;

/**
* Helper class for upload job file handling.
* When used in try-with-resource blocks, temp files will be automatically deleted.
*/
public class IntervalUploadFiles implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(IntervalUploadFiles.class);

private final boolean isTest;

private final String zipFileName;

private final String tempZipFile;

private final String tempDataFile;

public IntervalUploadFiles(String filePrefix, String coreExtension, boolean isTest) {
this.isTest = isTest;

zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION);
String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath();
tempZipFile = String.join(File.separator, tempFileFolder, zipFileName);
tempDataFile = String.join(File.separator, tempFileFolder, String.join(".", filePrefix, coreExtension));
}

public String getTempDataFile() {
return tempDataFile;
}

/**
* Compress and upload the data file.
*/
public void compressAndUpload(String folder) throws IOException {
FileUtils.addSingleFileToZip(tempDataFile, tempZipFile);
try {
S3Utils.putObject(
CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME,
String.format("%s/%s", folder, zipFileName),
new File(tempZipFile)
);
} catch (S3Exception e) {
String message = String.format("Error uploading (%s) to S3", zipFileName);
LOG.error(message);
BugsnagReporter.reportErrorToBugsnag(message, e);
}
}

@Override
public void close() throws IOException {
// Delete the temporary files here, to cover S3 upload success or failure.
try {
LOG.info("Deleting zip file {}.", tempZipFile);
FileUtils.deleteFile(tempDataFile);
if (!isTest) {
FileUtils.deleteFile(tempZipFile);
} else {
LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.",
tempZipFile
);
}
} catch (IOException e) {
LOG.error("Failed to delete temp files", e);
throw e;
}
}
}
Loading
Loading