File

apps/recallassess/recallassess-api/src/api/shared/scheduled-email/scheduled-email-processor.service.ts

Description

Scheduled Email Processor Service

The hourly cron used to process all PENDING email_log rows whose scheduled_date has elapsed. That cron was removed during cleanup and remains disabled here on purpose — re-enabling needs to be a deliberate decision.

What this service DOES still provide is a single-row send method: sendPendingEmailById(emailLogId)

That method takes an existing PENDING email_log row, loads the right attachments (e.g. per-course PDFs for course.hundred.day.journey.emailN), and sends via SES SendRawEmail when attachments are needed. Wire this to whatever you use to actually trigger sends — admin endpoint, queue worker, manual ops script, or a re-enabled cron.

Index

Properties
Methods

Constructor

constructor(prisma: BNestPrismaService, emailSender: BNestEmailSenderService, journeyAttachments: JourneyEmailAttachmentsService)
Parameters :
Name Type Optional
prisma BNestPrismaService No
emailSender BNestEmailSenderService No
journeyAttachments JourneyEmailAttachmentsService No

Methods

Async processScheduledEmails
processScheduledEmails(batchLimit: number)

Find PENDING email logs whose scheduled_date is in the past and send them. Each row is atomically claimed (PENDING → PROCESSING) before send so concurrent ticks / API instances don't double-send. The send itself is delegated to BNestEmailSenderService.sendEmail with existingEmailLogId, which updates the row to SENT/FAILED via the audit writer.

Parameters :
Name Type Optional Default value
batchLimit number No ScheduledEmailProcessorService.DEFAULT_BATCH_LIMIT
Returns : Promise<literal type>
Async processScheduledEmailsNow
processScheduledEmailsNow()

Manual trigger (e.g. admin debug endpoint or unit-test driver).

Returns : Promise<literal type>
Private Async resolveLateBoundPlaceholders
resolveLateBoundPlaceholders(content: string, companyId: number | null)

Resolve placeholders that depend on the current state of the world at SEND time (not creation time). Currently:

  • {{subscription.expired_banner_html}} — looks up the company's most recent subscription. If ACTIVE → empty string (no banner). Else → red expired banner.

The banner injection is intentionally late-bound because journey emails are scheduled days or months in advance; the company's subscription status when the email FIRES is what should drive whether the banner appears, not the status when the email_log row was originally created.

Parameters :
Name Type Optional
content string No
companyId number | null No
Returns : Promise<string>
Async runScheduledEmailCron
runScheduledEmailCron()
Decorators :
@Cron('* * * * *')
Returns : Promise<void>
Async sendPendingEmailById
sendPendingEmailById(emailLogId: number)

Send a single PENDING email_log row.

This is the integration point for whatever triggers actual sends in your environment (admin button, queue worker, cron). It:

  1. Looks up the email_log row by id.
  2. If it's a journey email (course.hundred.day.journey.emailN), loads the per-course PDF attachments associated with that email number.
  3. Calls BNestEmailSenderService.sendEmail with attachments — which will switch to SES SendRawEmail under the hood when attachments are present.

The pre-rendered subject and content from the email_log row are reused (variables/skeleton were already baked in when the row was created).

Parameters :
Name Type Optional
emailLogId number No
Returns : Promise<void>
Private stripUnresolvedPlaceholders
stripUnresolvedPlaceholders(content: string)

Strip ANY remaining {{...}} placeholder tokens from the email content. Used as a final safety net before SES dispatch so an unresolved variable (regardless of where the resolution failed) is replaced with empty string rather than shown literally to the recipient.

Pattern matches {{ key.with.dots }} with optional whitespace inside the braces. Conditional control tokens like {{#if}}, {{else}}, {{/if}}, {{#unless}}, {{/unless}} are NOT stripped — those are handled by the renderEmailTemplate util earlier in the pipeline and must be present for proper conditional resolution. If they leak this far, that's a separate bug we want to surface (the literal {{else}} would be visible and loudly broken in tests, prompting investigation).

Parameters :
Name Type Optional
content string No
Returns : string

Properties

Private Static Readonly DEFAULT_BATCH_LIMIT
Type : number
Default value : 50

Cap per tick so a backlog can't blow up SES quota / event loop in one shot.

Private isProcessing
Type : unknown
Default value : false

Re-entrancy guard so a slow tick doesn't overlap with the next minute's tick.

Private Readonly logger
Type : unknown
Default value : new Logger(ScheduledEmailProcessorService.name)
import { JourneyEmailAttachmentsService } from "@api/shared/email/services/journey-email-attachments.service";
import {
  isCompanySubscriptionActiveForEmailBanner,
  loadSubscriptionExpiredBannerHtml,
} from "@api/shared/email/subscription-expired-banner-html.util";
import { BNestEmailSenderService } from "@bish-nest/core";
import { BNestPrismaService } from "@bish-nest/core/services";
import { Injectable, Logger } from "@nestjs/common";
import { Cron } from "@nestjs/schedule";

/**
 * Scheduled Email Processor Service
 *
 * The hourly cron used to process all PENDING email_log rows whose scheduled_date
 * has elapsed. That cron was removed during cleanup and remains disabled here on
 * purpose — re-enabling needs to be a deliberate decision.
 *
 * What this service DOES still provide is a single-row send method:
 *   sendPendingEmailById(emailLogId)
 *
 * That method takes an existing PENDING email_log row, loads the right attachments
 * (e.g. per-course PDFs for course.hundred.day.journey.emailN), and sends via SES
 * SendRawEmail when attachments are needed. Wire this to whatever you use to
 * actually trigger sends — admin endpoint, queue worker, manual ops script, or a
 * re-enabled cron.
 */
@Injectable()
export class ScheduledEmailProcessorService {
  private readonly logger = new Logger(ScheduledEmailProcessorService.name);

  /** Cap per tick so a backlog can't blow up SES quota / event loop in one shot. */
  private static readonly DEFAULT_BATCH_LIMIT = 50;

  /** Re-entrancy guard so a slow tick doesn't overlap with the next minute's tick. */
  private isProcessing = false;

  constructor(
    private readonly prisma: BNestPrismaService,
    private readonly emailSender: BNestEmailSenderService,
    private readonly journeyAttachments: JourneyEmailAttachmentsService,
  ) {}

  @Cron("* * * * *")
  async runScheduledEmailCron(): Promise<void> {
    if (this.isProcessing) {
      this.logger.debug("Scheduled email tick skipped: previous tick still running.");
      return;
    }
    this.isProcessing = true;
    try {
      await this.processScheduledEmails();
    } catch (err) {
      this.logger.error(
        "Scheduled email processing tick failed",
        err instanceof Error ? err.stack : String(err),
      );
    } finally {
      this.isProcessing = false;
    }
  }

  /**
   * Find PENDING email logs whose scheduled_date is in the past and send them.
   * Each row is atomically claimed (PENDING → PROCESSING) before send so
   * concurrent ticks / API instances don't double-send. The send itself is
   * delegated to {@link BNestEmailSenderService.sendEmail} with
   * `existingEmailLogId`, which updates the row to SENT/FAILED via the audit
   * writer.
   */
  async processScheduledEmails(
    batchLimit: number = ScheduledEmailProcessorService.DEFAULT_BATCH_LIMIT,
  ): Promise<{ picked: number; sent: number; failed: number }> {
    const now = new Date();

    const due = await this.prisma.client.emailLog.findMany({
      where: {
        status: "PENDING",
        scheduled_date: { lte: now },
      },
      orderBy: { scheduled_date: "asc" },
      take: batchLimit,
      select: {
        id: true,
        recipient_email: true,
        subject: true,
        content: true,
        email_template_id: true,
        metadata: true,
        // Needed for journey-email attachment loading and subscription-banner resolution.
        course_id: true,
        company_id: true,
        emailTemplate: { select: { template_key: true } },
      },
    });

    if (due.length === 0) {
      this.logger.debug("Scheduled email tick: no PENDING rows due.");
      return { picked: 0, sent: 0, failed: 0 };
    }

    this.logger.log(`Scheduled email tick: ${due.length} PENDING row(s) due for sending.`);

    let sent = 0;
    let failed = 0;

    for (const row of due) {
      // Atomic claim: only one worker gets to flip PENDING → PROCESSING.
      const claim = await this.prisma.client.emailLog.updateMany({
        where: { id: row.id, status: "PENDING" },
        data: { status: "PROCESSING" },
      });
      if (claim.count === 0) {
        this.logger.debug(`Email log ${row.id} already claimed by another worker; skipping.`);
        continue;
      }

      try {
        // Resolve last-mile, dynamic placeholders that the email-creation paths
        // can't bake in (because they depend on company subscription status at
        // SEND time, not creation time, which can be days/weeks earlier).
        const enrichedContent = await this.resolveLateBoundPlaceholders(
          row.content,
          (row as { company_id?: number | null }).company_id ?? null,
        );

        // Load attachments for journey emails (PDFs from Course.hundredDjEmailNDocuments).
        // Returns [] for non-journey emails — zero overhead.
        const templateKey =
          (row as { emailTemplate?: { template_key: string } | null }).emailTemplate?.template_key ?? "";
        const attachments = await this.journeyAttachments.loadForTemplateAndCourse(
          templateKey,
          (row as { course_id?: number | null }).course_id ?? null,
          row.metadata,
        );

        await this.emailSender.sendEmail({
          to: row.recipient_email,
          subject: row.subject,
          content: enrichedContent,
          templateId: row.email_template_id ?? undefined,
          metadata:
            row.metadata && typeof row.metadata === "object" && !Array.isArray(row.metadata)
              ? (row.metadata as Record<string, unknown>)
              : undefined,
          // Tells the sender to reuse this row instead of creating a new audit log.
          // The sender's audit writer will update status to SENT/FAILED on completion.
          existingEmailLogId: row.id,
          attachments,
        });
        sent += 1;
      } catch (err) {
        // The sender's audit writer already tried to mark the row FAILED with
        // error_message. This catch is just for counting and visibility; a
        // throw here would abort the whole tick and starve later rows.
        failed += 1;
        this.logger.warn(
          `Scheduled email ${row.id} to ${row.recipient_email} failed: ${
            err instanceof Error ? err.message : String(err)
          }`,
        );
      }
    }

    this.logger.log(
      `Scheduled email tick complete. picked=${due.length} sent=${sent} failed=${failed}`,
    );
    return { picked: due.length, sent, failed };
  }

  /** Manual trigger (e.g. admin debug endpoint or unit-test driver). */
  async processScheduledEmailsNow(): Promise<{ picked: number; sent: number; failed: number }> {
    this.logger.log("Manually triggering scheduled email processing...");
    return this.processScheduledEmails();
  }

  /**
   * Send a single PENDING email_log row.
   *
   * This is the integration point for whatever triggers actual sends in your
   * environment (admin button, queue worker, cron). It:
   *   1. Looks up the email_log row by id.
   *   2. If it's a journey email (course.hundred.day.journey.emailN), loads the
   *      per-course PDF attachments associated with that email number.
   *   3. Calls BNestEmailSenderService.sendEmail with attachments — which will
   *      switch to SES SendRawEmail under the hood when attachments are present.
   *
   * The pre-rendered subject and content from the email_log row are reused
   * (variables/skeleton were already baked in when the row was created).
   */
  async sendPendingEmailById(emailLogId: number): Promise<void> {
    const log = await this.prisma.client.emailLog.findUnique({
      where: { id: emailLogId },
      include: {
        emailTemplate: { select: { template_key: true } },
      },
    });

    if (!log) {
      throw new Error(`email_log #${emailLogId} not found`);
    }
    if (log.status !== "PENDING") {
      this.logger.warn(
        `email_log #${emailLogId} is not PENDING (status=${log.status}) — skipping`,
      );
      return;
    }

    const templateKey =
      (log as { emailTemplate?: { template_key: string } | null }).emailTemplate?.template_key ?? "";

    // Load journey-email PDF attachments. Returns [] for non-journey emails.
    const attachments = await this.journeyAttachments.loadForTemplateAndCourse(
      templateKey,
      (log as { course_id?: number | null }).course_id ?? null,
      (log as { metadata?: unknown }).metadata,
    );

    // Resolve {{subscription.expired_banner_html}} based on the current
    // subscription status (status may have changed since the email_log row
    // was created days/weeks ago).
    const enrichedContent = await this.resolveLateBoundPlaceholders(
      log.content,
      (log as { company_id?: number | null }).company_id ?? null,
    );

    // Hand off to the sender. Passing existingEmailLogId so the sender updates
    // status on the same row instead of creating a new audit row.
    await this.emailSender.sendEmail({
      to: log.recipient_email,
      subject: log.subject,
      content: enrichedContent,
      templateId: log.email_template_id ?? undefined,
      metadata: (log as { metadata?: Record<string, unknown> }).metadata,
      existingEmailLogId: emailLogId,
      attachments,
    });
  }

  /**
   * Resolve placeholders that depend on the current state of the world at SEND time
   * (not creation time). Currently:
   *   - `{{subscription.expired_banner_html}}` — looks up the company's most recent
   *      subscription. If ACTIVE → empty string (no banner). Else → red expired banner.
   *
   * The banner injection is intentionally late-bound because journey emails are
   * scheduled days or months in advance; the company's subscription status when the
   * email FIRES is what should drive whether the banner appears, not the status
   * when the email_log row was originally created.
   */
  private async resolveLateBoundPlaceholders(
    content: string,
    companyId: number | null,
  ): Promise<string> {
    // Whitespace-tolerant detection: matches {{subscription.expired_banner_html}},
    // {{ subscription.expired_banner_html }}, and any spacing in between.
    // The earlier strict-match version (no whitespace allowed) failed to detect
    // editor-mangled placeholders, leaving the literal `{{...}}` text visible
    // in clients like Outlook for users on Test Three Limited.
    const placeholderRegex = /\{\{\s*subscription\.expired_banner_html\s*\}\}/g;

    // Decoded-and-tolerant detection covers HTML-entity encoded variants too,
    // since some WYSIWYG editors emit `&#123;&#123;` for `{{`. We don't bother
    // matching the encoded form by regex — instead, we run a second normalize
    // pass at the end that catches any leftover unresolved placeholder.

    if (!placeholderRegex.test(content)) {
      // Even if our specific placeholder isn't present, run the safety strip
      // below so other unresolved {{...}} tokens don't leak to recipients.
      return this.stripUnresolvedPlaceholders(content);
    }
    // Reset lastIndex since /g regex .test() advances it.
    placeholderRegex.lastIndex = 0;

    let bannerHtml = "";
    if (companyId != null) {
      try {
        const isActive = await isCompanySubscriptionActiveForEmailBanner(
          this.prisma.client,
          companyId,
        );
        if (!isActive) {
          // Look up contact email (mail.contact_email setting) for the banner's mailto link.
          const setting = await (this.prisma.client as any).systemSetting?.findFirst?.({
            where: { key: "mail.contact_email" },
            select: { value: true },
          });
          const contactEmail =
            (typeof setting?.value === "string" && setting.value.trim()) || "enquiries@recallsolutions.ai";
          bannerHtml = await loadSubscriptionExpiredBannerHtml(this.prisma.client, contactEmail);
        }
      } catch (err) {
        this.logger.warn(
          `Failed to resolve subscription banner for company ${companyId}: ${
            err instanceof Error ? err.message : String(err)
          } — sending without banner.`,
        );
      }
    }

    let resolved = content.replace(placeholderRegex, bannerHtml);

    // Final safety strip — catches any other unresolved {{...}} tokens that
    // might have leaked from elsewhere (templates with placeholders we don't
    // recognize, missing variables in upstream substitution, encoded variants,
    // etc.). Better to send a blank space than literal `{{key}}` text to
    // a real recipient.
    resolved = this.stripUnresolvedPlaceholders(resolved);
    return resolved;
  }

  /**
   * Strip ANY remaining `{{...}}` placeholder tokens from the email content.
   * Used as a final safety net before SES dispatch so an unresolved variable
   * (regardless of where the resolution failed) is replaced with empty
   * string rather than shown literally to the recipient.
   *
   * Pattern matches `{{ key.with.dots }}` with optional whitespace inside the
   * braces. Conditional control tokens like `{{#if}}`, `{{else}}`, `{{/if}}`,
   * `{{#unless}}`, `{{/unless}}` are NOT stripped — those are handled by the
   * renderEmailTemplate util earlier in the pipeline and must be present for
   * proper conditional resolution. If they leak this far, that's a separate
   * bug we want to surface (the literal `{{else}}` would be visible and
   * loudly broken in tests, prompting investigation).
   */
  private stripUnresolvedPlaceholders(content: string): string {
    // Match {{ key.with.dots_and_underscores }} with optional whitespace.
    // Excludes anything starting with `#`, `/`, or `else` (Handlebars control).
    return content.replace(/\{\{\s*([\w][\w.]*)\s*\}\}/g, (match, key: string) => {
      const trimmed = key.trim();
      // Defensive: skip Handlebars control tokens even though our regex
      // shouldn't match them (they have `#` or `/` prefix, or are `else`).
      if (
        trimmed.startsWith("#") ||
        trimmed.startsWith("/") ||
        trimmed === "else"
      ) {
        return match;
      }
      // Anything else — an unresolved placeholder — gets stripped to empty.
      this.logger.warn(
        `Stripped unresolved email placeholder: ${match}. Investigate why this variable wasn't substituted upstream.`,
      );
      return "";
    });
  }
}

results matching ""

    No results matching ""