apps/recallassess/recallassess-api/src/api/shared/scheduled-email/scheduled-email-processor.service.ts
Scheduled Email Processor Service
The hourly cron used to process all PENDING email_log rows whose scheduled_date has elapsed. That cron was removed during cleanup and remains disabled here on purpose — re-enabling needs to be a deliberate decision.
What this service DOES still provide is a single-row send method: sendPendingEmailById(emailLogId)
That method takes an existing PENDING email_log row, loads the right attachments (e.g. per-course PDFs for course.hundred.day.journey.emailN), and sends via SES SendRawEmail when attachments are needed. Wire this to whatever you use to actually trigger sends — admin endpoint, queue worker, manual ops script, or a re-enabled cron.
Properties |
|
Methods |
|
constructor(prisma: BNestPrismaService, emailSender: BNestEmailSenderService, journeyAttachments: JourneyEmailAttachmentsService)
|
||||||||||||
|
Parameters :
|
| Async processScheduledEmails | ||||||||
processScheduledEmails(batchLimit: number)
|
||||||||
|
Find PENDING email logs whose scheduled_date is in the past and send them.
Each row is atomically claimed (PENDING → PROCESSING) before send so
concurrent ticks / API instances don't double-send. The send itself is
delegated to BNestEmailSenderService.sendEmail with
Parameters :
Returns :
Promise<literal type>
|
| Async processScheduledEmailsNow |
processScheduledEmailsNow()
|
|
Manual trigger (e.g. admin debug endpoint or unit-test driver).
Returns :
Promise<literal type>
|
| Private Async resolveLateBoundPlaceholders | |||||||||
resolveLateBoundPlaceholders(content: string, companyId: number | null)
|
|||||||||
|
Resolve placeholders that depend on the current state of the world at SEND time (not creation time). Currently:
The banner injection is intentionally late-bound because journey emails are scheduled days or months in advance; the company's subscription status when the email FIRES is what should drive whether the banner appears, not the status when the email_log row was originally created.
Parameters :
Returns :
Promise<string>
|
| Async runScheduledEmailCron |
runScheduledEmailCron()
|
Decorators :
@Cron('* * * * *')
|
|
Returns :
Promise<void>
|
| Async sendPendingEmailById | ||||||
sendPendingEmailById(emailLogId: number)
|
||||||
|
Send a single PENDING email_log row. This is the integration point for whatever triggers actual sends in your environment (admin button, queue worker, cron). It:
The pre-rendered subject and content from the email_log row are reused (variables/skeleton were already baked in when the row was created).
Parameters :
Returns :
Promise<void>
|
| Private stripUnresolvedPlaceholders | ||||||
stripUnresolvedPlaceholders(content: string)
|
||||||
|
Strip ANY remaining Pattern matches
Parameters :
Returns :
string
|
| Private Static Readonly DEFAULT_BATCH_LIMIT |
Type : number
|
Default value : 50
|
|
Cap per tick so a backlog can't blow up SES quota / event loop in one shot. |
| Private isProcessing |
Type : unknown
|
Default value : false
|
|
Re-entrancy guard so a slow tick doesn't overlap with the next minute's tick. |
| Private Readonly logger |
Type : unknown
|
Default value : new Logger(ScheduledEmailProcessorService.name)
|
import { JourneyEmailAttachmentsService } from "@api/shared/email/services/journey-email-attachments.service";
import {
isCompanySubscriptionActiveForEmailBanner,
loadSubscriptionExpiredBannerHtml,
} from "@api/shared/email/subscription-expired-banner-html.util";
import { BNestEmailSenderService } from "@bish-nest/core";
import { BNestPrismaService } from "@bish-nest/core/services";
import { Injectable, Logger } from "@nestjs/common";
import { Cron } from "@nestjs/schedule";
/**
* Scheduled Email Processor Service
*
* The hourly cron used to process all PENDING email_log rows whose scheduled_date
* has elapsed. That cron was removed during cleanup and remains disabled here on
* purpose — re-enabling needs to be a deliberate decision.
*
* What this service DOES still provide is a single-row send method:
* sendPendingEmailById(emailLogId)
*
* That method takes an existing PENDING email_log row, loads the right attachments
* (e.g. per-course PDFs for course.hundred.day.journey.emailN), and sends via SES
* SendRawEmail when attachments are needed. Wire this to whatever you use to
* actually trigger sends — admin endpoint, queue worker, manual ops script, or a
* re-enabled cron.
*/
@Injectable()
export class ScheduledEmailProcessorService {
private readonly logger = new Logger(ScheduledEmailProcessorService.name);
/** Cap per tick so a backlog can't blow up SES quota / event loop in one shot. */
private static readonly DEFAULT_BATCH_LIMIT = 50;
/** Re-entrancy guard so a slow tick doesn't overlap with the next minute's tick. */
private isProcessing = false;
constructor(
private readonly prisma: BNestPrismaService,
private readonly emailSender: BNestEmailSenderService,
private readonly journeyAttachments: JourneyEmailAttachmentsService,
) {}
@Cron("* * * * *")
async runScheduledEmailCron(): Promise<void> {
if (this.isProcessing) {
this.logger.debug("Scheduled email tick skipped: previous tick still running.");
return;
}
this.isProcessing = true;
try {
await this.processScheduledEmails();
} catch (err) {
this.logger.error(
"Scheduled email processing tick failed",
err instanceof Error ? err.stack : String(err),
);
} finally {
this.isProcessing = false;
}
}
/**
* Find PENDING email logs whose scheduled_date is in the past and send them.
* Each row is atomically claimed (PENDING → PROCESSING) before send so
* concurrent ticks / API instances don't double-send. The send itself is
* delegated to {@link BNestEmailSenderService.sendEmail} with
* `existingEmailLogId`, which updates the row to SENT/FAILED via the audit
* writer.
*/
async processScheduledEmails(
batchLimit: number = ScheduledEmailProcessorService.DEFAULT_BATCH_LIMIT,
): Promise<{ picked: number; sent: number; failed: number }> {
const now = new Date();
const due = await this.prisma.client.emailLog.findMany({
where: {
status: "PENDING",
scheduled_date: { lte: now },
},
orderBy: { scheduled_date: "asc" },
take: batchLimit,
select: {
id: true,
recipient_email: true,
subject: true,
content: true,
email_template_id: true,
metadata: true,
// Needed for journey-email attachment loading and subscription-banner resolution.
course_id: true,
company_id: true,
emailTemplate: { select: { template_key: true } },
},
});
if (due.length === 0) {
this.logger.debug("Scheduled email tick: no PENDING rows due.");
return { picked: 0, sent: 0, failed: 0 };
}
this.logger.log(`Scheduled email tick: ${due.length} PENDING row(s) due for sending.`);
let sent = 0;
let failed = 0;
for (const row of due) {
// Atomic claim: only one worker gets to flip PENDING → PROCESSING.
const claim = await this.prisma.client.emailLog.updateMany({
where: { id: row.id, status: "PENDING" },
data: { status: "PROCESSING" },
});
if (claim.count === 0) {
this.logger.debug(`Email log ${row.id} already claimed by another worker; skipping.`);
continue;
}
try {
// Resolve last-mile, dynamic placeholders that the email-creation paths
// can't bake in (because they depend on company subscription status at
// SEND time, not creation time, which can be days/weeks earlier).
const enrichedContent = await this.resolveLateBoundPlaceholders(
row.content,
(row as { company_id?: number | null }).company_id ?? null,
);
// Load attachments for journey emails (PDFs from Course.hundredDjEmailNDocuments).
// Returns [] for non-journey emails — zero overhead.
const templateKey =
(row as { emailTemplate?: { template_key: string } | null }).emailTemplate?.template_key ?? "";
const attachments = await this.journeyAttachments.loadForTemplateAndCourse(
templateKey,
(row as { course_id?: number | null }).course_id ?? null,
row.metadata,
);
await this.emailSender.sendEmail({
to: row.recipient_email,
subject: row.subject,
content: enrichedContent,
templateId: row.email_template_id ?? undefined,
metadata:
row.metadata && typeof row.metadata === "object" && !Array.isArray(row.metadata)
? (row.metadata as Record<string, unknown>)
: undefined,
// Tells the sender to reuse this row instead of creating a new audit log.
// The sender's audit writer will update status to SENT/FAILED on completion.
existingEmailLogId: row.id,
attachments,
});
sent += 1;
} catch (err) {
// The sender's audit writer already tried to mark the row FAILED with
// error_message. This catch is just for counting and visibility; a
// throw here would abort the whole tick and starve later rows.
failed += 1;
this.logger.warn(
`Scheduled email ${row.id} to ${row.recipient_email} failed: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
this.logger.log(
`Scheduled email tick complete. picked=${due.length} sent=${sent} failed=${failed}`,
);
return { picked: due.length, sent, failed };
}
/** Manual trigger (e.g. admin debug endpoint or unit-test driver). */
async processScheduledEmailsNow(): Promise<{ picked: number; sent: number; failed: number }> {
this.logger.log("Manually triggering scheduled email processing...");
return this.processScheduledEmails();
}
/**
* Send a single PENDING email_log row.
*
* This is the integration point for whatever triggers actual sends in your
* environment (admin button, queue worker, cron). It:
* 1. Looks up the email_log row by id.
* 2. If it's a journey email (course.hundred.day.journey.emailN), loads the
* per-course PDF attachments associated with that email number.
* 3. Calls BNestEmailSenderService.sendEmail with attachments — which will
* switch to SES SendRawEmail under the hood when attachments are present.
*
* The pre-rendered subject and content from the email_log row are reused
* (variables/skeleton were already baked in when the row was created).
*/
async sendPendingEmailById(emailLogId: number): Promise<void> {
const log = await this.prisma.client.emailLog.findUnique({
where: { id: emailLogId },
include: {
emailTemplate: { select: { template_key: true } },
},
});
if (!log) {
throw new Error(`email_log #${emailLogId} not found`);
}
if (log.status !== "PENDING") {
this.logger.warn(
`email_log #${emailLogId} is not PENDING (status=${log.status}) — skipping`,
);
return;
}
const templateKey =
(log as { emailTemplate?: { template_key: string } | null }).emailTemplate?.template_key ?? "";
// Load journey-email PDF attachments. Returns [] for non-journey emails.
const attachments = await this.journeyAttachments.loadForTemplateAndCourse(
templateKey,
(log as { course_id?: number | null }).course_id ?? null,
(log as { metadata?: unknown }).metadata,
);
// Resolve {{subscription.expired_banner_html}} based on the current
// subscription status (status may have changed since the email_log row
// was created days/weeks ago).
const enrichedContent = await this.resolveLateBoundPlaceholders(
log.content,
(log as { company_id?: number | null }).company_id ?? null,
);
// Hand off to the sender. Passing existingEmailLogId so the sender updates
// status on the same row instead of creating a new audit row.
await this.emailSender.sendEmail({
to: log.recipient_email,
subject: log.subject,
content: enrichedContent,
templateId: log.email_template_id ?? undefined,
metadata: (log as { metadata?: Record<string, unknown> }).metadata,
existingEmailLogId: emailLogId,
attachments,
});
}
/**
* Resolve placeholders that depend on the current state of the world at SEND time
* (not creation time). Currently:
* - `{{subscription.expired_banner_html}}` — looks up the company's most recent
* subscription. If ACTIVE → empty string (no banner). Else → red expired banner.
*
* The banner injection is intentionally late-bound because journey emails are
* scheduled days or months in advance; the company's subscription status when the
* email FIRES is what should drive whether the banner appears, not the status
* when the email_log row was originally created.
*/
private async resolveLateBoundPlaceholders(
content: string,
companyId: number | null,
): Promise<string> {
// Whitespace-tolerant detection: matches {{subscription.expired_banner_html}},
// {{ subscription.expired_banner_html }}, and any spacing in between.
// The earlier strict-match version (no whitespace allowed) failed to detect
// editor-mangled placeholders, leaving the literal `{{...}}` text visible
// in clients like Outlook for users on Test Three Limited.
const placeholderRegex = /\{\{\s*subscription\.expired_banner_html\s*\}\}/g;
// Decoded-and-tolerant detection covers HTML-entity encoded variants too,
// since some WYSIWYG editors emit `{{` for `{{`. We don't bother
// matching the encoded form by regex — instead, we run a second normalize
// pass at the end that catches any leftover unresolved placeholder.
if (!placeholderRegex.test(content)) {
// Even if our specific placeholder isn't present, run the safety strip
// below so other unresolved {{...}} tokens don't leak to recipients.
return this.stripUnresolvedPlaceholders(content);
}
// Reset lastIndex since /g regex .test() advances it.
placeholderRegex.lastIndex = 0;
let bannerHtml = "";
if (companyId != null) {
try {
const isActive = await isCompanySubscriptionActiveForEmailBanner(
this.prisma.client,
companyId,
);
if (!isActive) {
// Look up contact email (mail.contact_email setting) for the banner's mailto link.
const setting = await (this.prisma.client as any).systemSetting?.findFirst?.({
where: { key: "mail.contact_email" },
select: { value: true },
});
const contactEmail =
(typeof setting?.value === "string" && setting.value.trim()) || "enquiries@recallsolutions.ai";
bannerHtml = await loadSubscriptionExpiredBannerHtml(this.prisma.client, contactEmail);
}
} catch (err) {
this.logger.warn(
`Failed to resolve subscription banner for company ${companyId}: ${
err instanceof Error ? err.message : String(err)
} — sending without banner.`,
);
}
}
let resolved = content.replace(placeholderRegex, bannerHtml);
// Final safety strip — catches any other unresolved {{...}} tokens that
// might have leaked from elsewhere (templates with placeholders we don't
// recognize, missing variables in upstream substitution, encoded variants,
// etc.). Better to send a blank space than literal `{{key}}` text to
// a real recipient.
resolved = this.stripUnresolvedPlaceholders(resolved);
return resolved;
}
/**
* Strip ANY remaining `{{...}}` placeholder tokens from the email content.
* Used as a final safety net before SES dispatch so an unresolved variable
* (regardless of where the resolution failed) is replaced with empty
* string rather than shown literally to the recipient.
*
* Pattern matches `{{ key.with.dots }}` with optional whitespace inside the
* braces. Conditional control tokens like `{{#if}}`, `{{else}}`, `{{/if}}`,
* `{{#unless}}`, `{{/unless}}` are NOT stripped — those are handled by the
* renderEmailTemplate util earlier in the pipeline and must be present for
* proper conditional resolution. If they leak this far, that's a separate
* bug we want to surface (the literal `{{else}}` would be visible and
* loudly broken in tests, prompting investigation).
*/
private stripUnresolvedPlaceholders(content: string): string {
// Match {{ key.with.dots_and_underscores }} with optional whitespace.
// Excludes anything starting with `#`, `/`, or `else` (Handlebars control).
return content.replace(/\{\{\s*([\w][\w.]*)\s*\}\}/g, (match, key: string) => {
const trimmed = key.trim();
// Defensive: skip Handlebars control tokens even though our regex
// shouldn't match them (they have `#` or `/` prefix, or are `else`).
if (
trimmed.startsWith("#") ||
trimmed.startsWith("/") ||
trimmed === "else"
) {
return match;
}
// Anything else — an unresolved placeholder — gets stripped to empty.
this.logger.warn(
`Stripped unresolved email placeholder: ${match}. Investigate why this variable wasn't substituted upstream.`,
);
return "";
});
}
}