Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add email queue to handle email sending failures #1587

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/components/user/request-delete-user.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { TrashIcon } from "../icons/library";

const Schema = z.object({
email: z.string().email(),
reason: z.string().min(10, "Reason is a required field"),
reason: z.string().min(3, "Reason is a required field"),
});

export const RequestDeleteUser = () => {
Expand Down
63 changes: 63 additions & 0 deletions app/emails/email.worker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { transporter } from "~/emails/transporter.server";
import { QueueNames, scheduler } from "~/utils/scheduler.server";
import type { EmailPayloadType } from "./types";
import { SMTP_FROM } from "../utils/env";
import { ShelfError } from "../utils/error";

// every node will execute 5 jobs(teamSize) every 3 minutes(newJobCheckIntervalSeconds),
// increase teamSize if you need better concurrency
// but keep email provider rate limiting and a potential n/w throughput load on postgress in mind
// teamSize of 20-25 is a good limit if we need to scale email throughput in the future
export const registerEmailWorkers = async () => {
await scheduler.work<EmailPayloadType>(
QueueNames.emailQueue,
{ newJobCheckIntervalSeconds: 60 * 3, teamSize: 5 },
async (job) => {
await triggerEmail(job.data);
}
);
};

export const triggerEmail = async ({
to,
subject,
text,
html,
from,
replyTo,
}: EmailPayloadType) => {
try {
// send mail with defined transport object
await transporter.sendMail({
from: from || SMTP_FROM || `"Shelf" <[email protected]>`, // sender address
replyTo: replyTo || "[email protected]", // reply to
to, // list of receivers
subject, // Subject line
text, // plain text body
html: html || "", // html body
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Unable to send email",
additionalData: { to, subject, from },
label: "Email",
});
}

// verify connection configuration
// transporter.verify(function (error) {
// if (error) {
// // eslint-disable-next-line no-console
// console.log(error);
// } else {
// // eslint-disable-next-line no-console
// console.log("Server is ready to take our messages");
// }
// });

// Message sent: <[email protected]>

// Preview only available when sending through an Ethereal account
// console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
};
131 changes: 33 additions & 98 deletions app/emails/mail.server.ts
Original file line number Diff line number Diff line change
@@ -1,104 +1,39 @@
import type { Attachment } from "nodemailer/lib/mailer";
import { transporter } from "~/emails/transporter.server";
import { SMTP_FROM } from "../utils/env";
import { ShelfError } from "../utils/error";

export const sendEmail = async ({
to,
subject,
text,
html,
attachments,
from,
replyTo,
}: {
/** Email address of recipient */
to: string;

/** Subject of email */
subject: string;

/** Text content of email */
text: string;

/** HTML content of email */
html?: string;

attachments?: Attachment[];

/** Override the default sender */
from?: string;
import { Logger } from "~/utils/logger";
import { QueueNames, scheduler } from "~/utils/scheduler.server";
import { triggerEmail } from "./email.worker.server";
import type { EmailPayloadType } from "./types";

export const sendEmail = (payload: EmailPayloadType) => {
// attempt to send email, push to the queue if it fails
triggerEmail(payload).catch((err) => {
Logger.warn({
DonKoko marked this conversation as resolved.
Show resolved Hide resolved
err,
details: {
to: payload.to,
subject: payload.subject,
from: payload.from,
},
message: "email sending failed, pushing to the queue",
});
void addToQueue(payload);
});
};

/** Override the default reply to email address */
replyTo?: string;
}) => {
const addToQueue = async (payload: EmailPayloadType) => {
try {
// send mail with defined transport object
await transporter.sendMail({
from: from || SMTP_FROM || `"Shelf" <[email protected]>`, // sender address
replyTo: replyTo || "[email protected]", // reply to
to, // list of receivers
subject, // Subject line
text, // plain text body
html: html || "", // html body
attachments: [...(attachments || [])],
await scheduler.send(QueueNames.emailQueue, payload, {
retryLimit: 5,
retryDelay: 5,
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Unable to send email",
additionalData: { to, subject, from },
label: "Email",
} catch (err) {
Logger.warn({
err,
details: {
to: payload.to,
subject: payload.subject,
from: payload.from,
},
message: "Failed to push email payload to queue",
});
}

// verify connection configuration
// transporter.verify(function (error) {
// if (error) {
// // eslint-disable-next-line no-console
// console.log(error);
// } else {
// // eslint-disable-next-line no-console
// console.log("Server is ready to take our messages");
// }
// });

// Message sent: <[email protected]>

// Preview only available when sending through an Ethereal account
// console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
};

/** Utility function to add delay between operations */
async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/** Process emails in batches with rate limiting
* @param emails - Array of email configurations to send
* @param batchSize - Number of emails to process per batch (default: 2)
* @param delayMs - Milliseconds to wait between batches (default: 1000ms)
*/
export async function sendEmailsWithRateLimit(
emails: Array<{
to: string;
subject: string;
text: string;
html: string;
}>,
batchSize = 2,
delayMs = 1100
): Promise<void> {
for (let i = 0; i < emails.length; i += batchSize) {
// Process emails in batches of specified size
const batch = emails.slice(i, i + batchSize);

// Send emails in current batch concurrently
await Promise.all(batch.map((email) => sendEmail(email)));

// If there are more emails to process, add delay before next batch
if (i + batchSize < emails.length) {
await delay(delayMs);
}
}
}
20 changes: 20 additions & 0 deletions app/emails/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,23 @@ export type BookingForEmail = Prisma.BookingGetPayload<{
};
};
}>;

export type EmailPayloadType = {
/** Email address of recipient */
to: string;

/** Subject of email */
subject: string;

/** Text content of email */
text: string;

/** HTML content of email */
html?: string;

/** Override the default sender */
from?: string;

/** Override the default reply to email address */
replyTo?: string;
};
10 changes: 10 additions & 0 deletions app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RemixServer } from "@remix-run/react";
import * as Sentry from "@sentry/remix";
import { isbot } from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { registerEmailWorkers } from "./emails/email.worker.server";
import { registerBookingWorkers } from "./modules/booking/worker.server";
import { ShelfError } from "./utils/error";
import { Logger } from "./utils/logger";
Expand All @@ -26,6 +27,15 @@ schedulerService
})
);
});
await registerEmailWorkers().catch((cause) => {
Logger.error(
new ShelfError({
cause,
message: "Something went wrong while registering email workers.",
label: "Scheduler",
})
);
});
})
.finally(() => {
// eslint-disable-next-line no-console
Expand Down
3 changes: 0 additions & 3 deletions app/modules/booking/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
export const schedulerKeys = {
bookingQueue: "booking-queue",
};
export enum bookingSchedulerEventsEnum {
checkoutReminder = `booking-checkout-reminder`,
checkinReminder = `booking-checkin-reminder`,
Expand Down
60 changes: 25 additions & 35 deletions app/modules/booking/email-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { BookingForEmail } from "~/emails/types";
import { getDateTimeFormatFromHints } from "~/utils/client-hints";
import { getTimeRemainingMessage } from "~/utils/date-fns";
import { SERVER_URL } from "~/utils/env";
import { ShelfError } from "~/utils/error";
import type { ClientHint } from "./types";

/**
Expand Down Expand Up @@ -156,44 +155,35 @@ export const checkinReminderEmailContent = ({
)}.`,
});

export async function sendCheckinReminder(
export function sendCheckinReminder(
DonKoko marked this conversation as resolved.
Show resolved Hide resolved
booking: BookingForEmail,
assetCount: number,
hints: ClientHint
) {
try {
await sendEmail({
to: booking.custodianUser!.email,
subject: `Checkin reminder (${booking.name}) - shelf.nu`,
text: checkinReminderEmailContent({
hints,
bookingName: booking.name,
assetsCount: assetCount,
custodian:
`${booking.custodianUser!.firstName} ${booking.custodianUser
?.lastName}` || (booking.custodianTeamMember?.name as string),
from: booking.from!,
to: booking.to!,
bookingId: booking.id,
}),
html: bookingUpdatesTemplateString({
booking,
heading: `Your booking is due for checkin in ${getTimeRemainingMessage(
new Date(booking.to!),
new Date()
)}.`,
assetCount,
hints,
}),
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Something went wrong while sending the checkin reminder email",
additionalData: { booking },
label: "Booking",
});
}
sendEmail({
DonKoko marked this conversation as resolved.
Show resolved Hide resolved
to: booking.custodianUser!.email,
subject: `🔔 Checkin reminder (${booking.name}) - shelf.nu`,
text: checkinReminderEmailContent({
hints,
bookingName: booking.name,
assetsCount: assetCount,
custodian:
`${booking.custodianUser!.firstName} ${booking.custodianUser
?.lastName}` || (booking.custodianTeamMember?.name as string),
from: booking.from!,
to: booking.to!,
bookingId: booking.id,
}),
html: bookingUpdatesTemplateString({
booking,
heading: `Your booking is due for checkin in ${getTimeRemainingMessage(
new Date(booking.to!),
new Date()
)}.`,
assetCount,
hints,
}),
});
}

/**
Expand Down
Loading
Loading