Trigger.dev:TypeScriptバックグラウンドジョブ実行ガイド


Trigger.dev:TypeScriptバックグラウンドジョブ実行ガイド

モダンなWebアプリケーションでは、重い処理をバックグラウンドで実行することが不可欠です。メール送信、画像処理、データ同期、レポート生成など、ユーザーを待たせたくない処理を非同期で実行する必要があります。

Trigger.devは、TypeScriptでバックグラウンドジョブを簡単に構築できるプラットフォームです。この記事では、実践的なジョブ実装パターン、エラーハンドリング、スケジューリング、Webhook連携について詳しく解説します。

Trigger.devの強み

サーバーレスファースト

// インフラ管理不要で即座にスケール
import { TriggerClient } from "@trigger.dev/sdk";

const client = new TriggerClient({
  id: "my-app",
  apiKey: process.env.TRIGGER_API_KEY!,
});

// コードを書くだけでジョブが動く
client.defineJob({
  id: "send-welcome-email",
  name: "ウェルカムメール送信",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "user.signed_up",
  }),
  run: async (payload, io, ctx) => {
    await io.sendEmail("send-email", {
      to: payload.email,
      subject: "ようこそ!",
      body: "登録ありがとうございます",
    });
  },
});

自動リトライとエラーハンドリング

// リトライ戦略を細かく設定可能
client.defineJob({
  id: "process-payment",
  name: "決済処理",
  version: "1.0.0",
  trigger: eventTrigger({ name: "payment.requested" }),
  retry: {
    maxAttempts: 5,
    minTimeoutInSeconds: 1,
    maxTimeoutInSeconds: 300,
    factor: 2, // 指数バックオフ
  },
  run: async (payload, io, ctx) => {
    // io.runTaskは自動的にリトライされる
    const charge = await io.runTask("charge-card", async () => {
      return await stripe.charges.create({
        amount: payload.amount,
        currency: "jpy",
        source: payload.token,
      });
    });

    if (charge.status !== "succeeded") {
      // エラーをスローするとリトライされる
      throw new Error(`決済失敗: ${charge.failure_message}`);
    }

    return { chargeId: charge.id };
  },
});

リアルタイム監視

ダッシュボードで全ジョブの実行状況をリアルタイムで確認できます。

  • 実行中のジョブ
  • 失敗したジョブとエラー詳細
  • リトライ状況
  • 実行時間とパフォーマンス

実践的なジョブパターン

1. スケジュールジョブ

定期的に実行するジョブはcronTriggerまたはintervalTriggerを使用します。

import { cronTrigger, intervalTrigger } from "@trigger.dev/sdk";

// Cron式でスケジューリング
client.defineJob({
  id: "daily-report",
  name: "日次レポート生成",
  version: "1.0.0",
  trigger: cronTrigger({
    cron: "0 9 * * *", // 毎日9時
  }),
  run: async (payload, io, ctx) => {
    // 昨日のデータを集計
    const stats = await io.runTask("fetch-stats", async () => {
      const yesterday = new Date();
      yesterday.setDate(yesterday.getDate() - 1);
      return await db.analytics.aggregate({
        date: yesterday,
      });
    });

    // レポート生成
    const report = await io.runTask("generate-report", async () => {
      return generatePDF(stats);
    });

    // 管理者にメール送信
    await io.sendEmail("send-report", {
      to: "admin@example.com",
      subject: `日次レポート ${new Date().toLocaleDateString()}`,
      attachments: [{ filename: "report.pdf", content: report }],
    });

    return { statsCount: stats.length };
  },
});

// インターバルでスケジューリング
client.defineJob({
  id: "sync-inventory",
  name: "在庫同期",
  version: "1.0.0",
  trigger: intervalTrigger({
    seconds: 300, // 5分ごと
  }),
  run: async (payload, io, ctx) => {
    const products = await io.runTask("fetch-products", async () => {
      return await externalAPI.getProducts();
    });

    await io.runTask("update-database", async () => {
      return await db.products.bulkUpdate(products);
    });

    return { updated: products.length };
  },
});

2. Webhook連携

外部サービスからのWebhookをトリガーにジョブを実行します。

import { webhookEvent } from "@trigger.dev/sdk";

// Stripe Webhookの処理
client.defineJob({
  id: "stripe-webhook-handler",
  name: "Stripe Webhook処理",
  version: "1.0.0",
  trigger: webhookEvent({
    service: "stripe",
    eventName: "payment_intent.succeeded",
  }),
  integrations: { stripe },
  run: async (payload, io, ctx) => {
    const paymentIntent = payload.data.object;

    // ユーザーのサブスクリプションを更新
    await io.runTask("update-subscription", async () => {
      return await db.subscriptions.create({
        userId: paymentIntent.metadata.userId,
        stripePaymentIntentId: paymentIntent.id,
        amount: paymentIntent.amount,
        status: "active",
      });
    });

    // 確認メール送信
    await io.sendEmail("send-confirmation", {
      to: paymentIntent.receipt_email,
      subject: "お支払いが完了しました",
      template: "payment-confirmation",
      data: { amount: paymentIntent.amount },
    });

    // Slackに通知
    await io.slack.postMessage("notify-team", {
      channel: "#sales",
      text: `新規課金: ¥${paymentIntent.amount / 100}`,
    });

    return { success: true };
  },
});

// GitHub Webhookの処理
client.defineJob({
  id: "github-webhook-handler",
  name: "GitHub PR作成時処理",
  version: "1.0.0",
  trigger: webhookEvent({
    service: "github",
    eventName: "pull_request.opened",
  }),
  run: async (payload, io, ctx) => {
    const pr = payload.pull_request;

    // コードレビュー分析
    const analysis = await io.runTask("analyze-pr", async () => {
      const diff = await github.pulls.get({
        owner: payload.repository.owner.login,
        repo: payload.repository.name,
        pull_number: pr.number,
      });
      return analyzeDiff(diff.data);
    });

    // コメント投稿
    await io.runTask("post-comment", async () => {
      return await github.issues.createComment({
        owner: payload.repository.owner.login,
        repo: payload.repository.name,
        issue_number: pr.number,
        body: formatAnalysis(analysis),
      });
    });

    return { analyzed: true };
  },
});

3. バッチ処理

大量のデータを処理する場合は、チャンク分割とバッチ処理を活用します。

import { batchTrigger } from "@trigger.dev/sdk";

// 大量ユーザーへのメール送信
client.defineJob({
  id: "bulk-email-sender",
  name: "一括メール送信",
  version: "1.0.0",
  trigger: eventTrigger({ name: "campaign.send" }),
  run: async (payload, io, ctx) => {
    // 全ユーザーを取得
    const users = await io.runTask("fetch-users", async () => {
      return await db.users.findMany({
        where: { subscribed: true },
      });
    });

    // 1000件ずつバッチ処理
    const batchSize = 1000;
    const batches = Math.ceil(users.length / batchSize);

    for (let i = 0; i < batches; i++) {
      const batch = users.slice(i * batchSize, (i + 1) * batchSize);

      await io.runTask(`send-batch-${i}`, async () => {
        return await Promise.all(
          batch.map((user) =>
            sendEmail({
              to: user.email,
              subject: payload.subject,
              body: payload.body,
            })
          )
        );
      });

      // レート制限対策で少し待つ
      if (i < batches - 1) {
        await io.wait("rate-limit-wait", 5);
      }
    }

    return { sent: users.length };
  },
});

// 画像一括処理
client.defineJob({
  id: "bulk-image-processor",
  name: "画像一括最適化",
  version: "1.0.0",
  trigger: eventTrigger({ name: "images.optimize" }),
  run: async (payload, io, ctx) => {
    const images = await io.runTask("fetch-images", async () => {
      return await db.images.findMany({
        where: { optimized: false },
      });
    });

    // 並列処理(最大10並列)
    const concurrency = 10;
    const results = [];

    for (let i = 0; i < images.length; i += concurrency) {
      const chunk = images.slice(i, i + concurrency);

      const chunkResults = await Promise.all(
        chunk.map((image, index) =>
          io.runTask(`optimize-${i + index}`, async () => {
            const optimized = await sharp(image.url)
              .resize(1920, 1080, { fit: "inside" })
              .webp({ quality: 80 })
              .toBuffer();

            const uploadedUrl = await uploadToS3(optimized);

            await db.images.update({
              where: { id: image.id },
              data: { optimized: true, optimizedUrl: uploadedUrl },
            });

            return { id: image.id, url: uploadedUrl };
          })
        )
      );

      results.push(...chunkResults);
    }

    return { processed: results.length };
  },
});

4. チェーンジョブ

複数のジョブを連鎖させて実行します。

// ジョブ1: データ収集
const collectDataJob = client.defineJob({
  id: "collect-data",
  name: "データ収集",
  version: "1.0.0",
  trigger: eventTrigger({ name: "pipeline.start" }),
  run: async (payload, io, ctx) => {
    const data = await io.runTask("scrape-data", async () => {
      return await scrapeWebsite(payload.url);
    });

    // 次のジョブをトリガー
    await io.sendEvent("trigger-processing", {
      name: "data.collected",
      payload: { data, sourceUrl: payload.url },
    });

    return { itemCount: data.length };
  },
});

// ジョブ2: データ処理
const processDataJob = client.defineJob({
  id: "process-data",
  name: "データ処理",
  version: "1.0.0",
  trigger: eventTrigger({ name: "data.collected" }),
  run: async (payload, io, ctx) => {
    const processed = await io.runTask("process", async () => {
      return payload.data.map(cleanAndTransform);
    });

    // データベース保存
    await io.runTask("save", async () => {
      return await db.items.createMany({ data: processed });
    });

    // 次のジョブをトリガー
    await io.sendEvent("trigger-notification", {
      name: "data.processed",
      payload: { count: processed.length },
    });

    return { processed: processed.length };
  },
});

// ジョブ3: 通知送信
const notifyJob = client.defineJob({
  id: "notify",
  name: "完了通知",
  version: "1.0.0",
  trigger: eventTrigger({ name: "data.processed" }),
  run: async (payload, io, ctx) => {
    await io.slack.postMessage("notify", {
      channel: "#data-pipeline",
      text: `データパイプライン完了: ${payload.count}件処理しました`,
    });

    return { notified: true };
  },
});

エラーハンドリング

カスタムリトライロジック

client.defineJob({
  id: "api-with-custom-retry",
  name: "カスタムリトライAPI呼び出し",
  version: "1.0.0",
  trigger: eventTrigger({ name: "api.call" }),
  run: async (payload, io, ctx) => {
    const result = await io.runTask(
      "call-api",
      async () => {
        return await externalAPI.call(payload.endpoint);
      },
      {
        retry: {
          maxAttempts: 3,
          factor: 1.5,
          minTimeoutInSeconds: 2,
          maxTimeoutInSeconds: 30,
        },
      }
    );

    return result;
  },
});

条件付きリトライ

client.defineJob({
  id: "conditional-retry",
  name: "条件付きリトライ",
  version: "1.0.0",
  trigger: eventTrigger({ name: "task.execute" }),
  run: async (payload, io, ctx) => {
    try {
      const result = await io.runTask("risky-operation", async () => {
        return await riskyOperation();
      });

      return result;
    } catch (error) {
      // 特定のエラーのみリトライ
      if (error.code === "RATE_LIMIT") {
        // レート制限の場合は60秒待ってリトライ
        await io.wait("rate-limit-wait", 60);
        throw error; // リトライされる
      } else if (error.code === "TEMPORARY_ERROR") {
        // 一時的なエラーはリトライ
        throw error;
      } else {
        // その他のエラーはリトライしない
        await io.logger.error("致命的エラー", { error });
        return { success: false, error: error.message };
      }
    }
  },
});

Dead Letter Queue(DLQ)パターン

// メインジョブ
client.defineJob({
  id: "main-job",
  name: "メインジョブ",
  version: "1.0.0",
  trigger: eventTrigger({ name: "task.process" }),
  retry: { maxAttempts: 3 },
  onFailure: async (payload, error, io, ctx) => {
    // 失敗時にDLQに送信
    await io.sendEvent("send-to-dlq", {
      name: "dlq.failed_job",
      payload: {
        originalPayload: payload,
        error: error.message,
        attempts: ctx.attempts,
      },
    });
  },
  run: async (payload, io, ctx) => {
    // メイン処理
    return await processTask(payload);
  },
});

// DLQ処理ジョブ
client.defineJob({
  id: "dlq-handler",
  name: "失敗ジョブ処理",
  version: "1.0.0",
  trigger: eventTrigger({ name: "dlq.failed_job" }),
  run: async (payload, io, ctx) => {
    // 失敗をログに記録
    await io.runTask("log-failure", async () => {
      return await db.failedJobs.create({
        data: {
          payload: payload.originalPayload,
          error: payload.error,
          attempts: payload.attempts,
          timestamp: new Date(),
        },
      });
    });

    // 管理者に通知
    await io.sendEmail("notify-admin", {
      to: "admin@example.com",
      subject: "ジョブ失敗通知",
      body: `ジョブが失敗しました: ${payload.error}`,
    });

    return { logged: true };
  },
});

テストとデバッグ

ローカルテスト

// test/jobs.test.ts
import { createTestClient } from "@trigger.dev/sdk/test";

describe("ジョブテスト", () => {
  it("メール送信ジョブが正しく動作する", async () => {
    const testClient = createTestClient();

    const result = await testClient.runJob("send-welcome-email", {
      email: "test@example.com",
      name: "テストユーザー",
    });

    expect(result.ok).toBe(true);
    expect(result.output).toMatchObject({
      sent: true,
    });
  });
});

本番環境でのデバッグ

client.defineJob({
  id: "debug-job",
  name: "デバッグ付きジョブ",
  version: "1.0.0",
  trigger: eventTrigger({ name: "task.debug" }),
  run: async (payload, io, ctx) => {
    // ログ出力
    await io.logger.info("ジョブ開始", { payload });

    const step1 = await io.runTask("step-1", async () => {
      await io.logger.debug("ステップ1実行中");
      return await doStep1();
    });

    await io.logger.info("ステップ1完了", { result: step1 });

    const step2 = await io.runTask("step-2", async () => {
      await io.logger.debug("ステップ2実行中");
      return await doStep2(step1);
    });

    await io.logger.info("ジョブ完了", { step1, step2 });

    return { step1, step2 };
  },
});

まとめ

Trigger.devを使えば、以下のようなバックグラウンドジョブを簡単に実装できます。

  • スケジュールジョブ: Cron式やインターバルで定期実行
  • Webhook連携: 外部サービスのイベントに反応
  • バッチ処理: 大量データの効率的な処理
  • チェーンジョブ: 複数ジョブの連鎖実行
  • エラーハンドリング: 自動リトライとDLQパターン

TypeScriptの型安全性、充実した開発ツール、リアルタイム監視により、信頼性の高いバックグラウンド処理を構築できます。

次のステップとして、実際のプロジェクトでTrigger.devを導入し、ユーザー体験を向上させるバックグラウンド処理を実装してみてください。