Temporalでワークフロー自動化 - 複雑な非同期処理を確実に実行【2026年版】


Temporalは、複雑で長時間実行される非同期処理を確実に実行するためのワークフローエンジンです。Netflix、Stripe、Datadog、Snap、HashiCorpなど、多くの企業が本番環境で採用しています。

Temporalとは

従来の問題

非同期処理で以下のような問題に直面したことはありませんか?

  • APIコールが途中で失敗したらどうする?
  • サーバーが再起動したらジョブは?
  • 1時間後に実行したい処理は?
  • 複数のマイクロサービスを連携させるには?

従来の解決策(Redis Queue、Celery、Bull等)では、これらを完全に解決するのは困難です。

Temporalの解決策

Temporalは、これらの問題をワークフローという概念で解決します。

主な特徴:

  • 自動リトライ: 失敗しても自動で再試行
  • 永続性: サーバー再起動してもワークフロー継続
  • タイムアウト管理: 複雑なタイムアウト処理が簡単
  • 可視性: 全ワークフローの状態をWeb UIで確認
  • バージョニング: ワークフローコードの安全な更新

セットアップ

Temporal Serverの起動

開発環境ではDocker Composeを使用:

git clone https://github.com/temporalio/docker-compose.git temporal
cd temporal
docker-compose up

Web UI: http://localhost:8233

TypeScript SDKのインストール

npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity

基本概念

1. Workflow(ワークフロー)

ビジネスロジックのオーケストレーション。決定論的(deterministic)である必要があります。

2. Activity(アクティビティ)

外部APIコール、DB操作など、非決定論的な処理。失敗する可能性のある処理はすべてActivityに。

3. Worker(ワーカー)

WorkflowとActivityを実行するプロセス。

4. Client(クライアント)

Workflowを開始したり、状態を取得するためのクライアント。

最初のワークフロー

Activityの定義

// src/activities.ts
export async function sendEmail(to: string, subject: string, body: string) {
  console.log(`Sending email to ${to}: ${subject}`);

  // 実際のメール送信処理(例: SendGrid)
  const response = await fetch("https://api.sendgrid.com/v3/mail/send", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${process.env.SENDGRID_API_KEY}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      personalizations: [{ to: [{ email: to }] }],
      from: { email: "noreply@example.com" },
      subject,
      content: [{ type: "text/plain", value: body }],
    }),
  });

  if (!response.ok) {
    throw new Error(`Failed to send email: ${response.statusText}`);
  }

  return { success: true, sentAt: new Date().toISOString() };
}

export async function chargeCustomer(customerId: string, amount: number) {
  console.log(`Charging customer ${customerId} for ${amount}`);

  // Stripe APIを使った課金処理
  const stripe = new Stripe(process.env.STRIPE_SECRET_KEY!);

  const paymentIntent = await stripe.paymentIntents.create({
    amount: amount * 100, // セント単位
    currency: "jpy",
    customer: customerId,
  });

  return { paymentIntentId: paymentIntent.id };
}

Workflowの定義

// src/workflows.ts
import { proxyActivities, sleep } from "@temporalio/workflow";
import type * as activities from "./activities";

// Activityのプロキシを作成(タイムアウト設定)
const { sendEmail, chargeCustomer } = proxyActivities<typeof activities>({
  startToCloseTimeout: "1 minute",
  retry: {
    maximumAttempts: 3,
  },
});

export async function subscriptionSignupWorkflow(
  email: string,
  customerId: string
): Promise<string> {
  // 1. ウェルカムメール送信
  await sendEmail(
    email,
    "ご登録ありがとうございます",
    "サービスへようこそ!"
  );

  // 2. 7日間のトライアル期間
  await sleep("7 days");

  // 3. トライアル終了メール
  await sendEmail(
    email,
    "トライアル期間終了のお知らせ",
    "明日から課金が開始されます。"
  );

  // 4. 1日待機
  await sleep("1 day");

  // 5. 初回課金
  const result = await chargeCustomer(customerId, 1000);

  // 6. 課金完了メール
  await sendEmail(
    email,
    "お支払いありがとうございます",
    `お支払いが完了しました。領収書ID: ${result.paymentIntentId}`
  );

  return "Subscription workflow completed";
}

注目ポイント:

  • sleep("7 days") で7日間待機できる
  • サーバーが再起動しても、7日後に自動で再開される
  • 各ステップが失敗しても自動リトライ

Workerの起動

// src/worker.ts
import { Worker } from "@temporalio/worker";
import * as activities from "./activities";

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve("./workflows"),
    activities,
    taskQueue: "default",
  });

  await worker.run();
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});
npm run worker

Workflowの開始

// src/start-workflow.ts
import { Client } from "@temporalio/client";
import { subscriptionSignupWorkflow } from "./workflows";

async function run() {
  const client = new Client();

  const handle = await client.workflow.start(subscriptionSignupWorkflow, {
    args: ["user@example.com", "cus_xxxxx"],
    taskQueue: "default",
    workflowId: "subscription-user-123",
  });

  console.log(`Started workflow ${handle.workflowId}`);

  // ワークフローの完了を待つ(オプション)
  const result = await handle.result();
  console.log("Workflow result:", result);
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});
npm run start-workflow

実践例: 注文処理ワークフロー

ECサイトの注文処理を例にします。

// src/workflows/order-workflow.ts
import { proxyActivities, sleep, condition } from "@temporalio/workflow";
import type * as activities from "../activities";

const {
  reserveInventory,
  chargePayment,
  sendOrderConfirmation,
  shipOrder,
  sendShippingNotification,
  refundPayment,
  restoreInventory,
} = proxyActivities<typeof activities>({
  startToCloseTimeout: "2 minutes",
  retry: {
    maximumAttempts: 5,
    backoffCoefficient: 2,
  },
});

export async function orderWorkflow(orderId: string): Promise<string> {
  let inventoryReserved = false;
  let paymentCharged = false;

  try {
    // 1. 在庫確保
    await reserveInventory(orderId);
    inventoryReserved = true;

    // 2. 決済処理
    await chargePayment(orderId);
    paymentCharged = true;

    // 3. 注文確認メール送信
    await sendOrderConfirmation(orderId);

    // 4. 発送処理(最大48時間待機)
    await sleep("48 hours");
    await shipOrder(orderId);

    // 5. 発送通知メール
    await sendShippingNotification(orderId);

    return "Order completed successfully";

  } catch (error) {
    // エラー発生時のロールバック処理
    console.error("Order workflow failed:", error);

    if (paymentCharged) {
      await refundPayment(orderId);
    }

    if (inventoryReserved) {
      await restoreInventory(orderId);
    }

    throw error;
  }
}

シグナルとクエリ

シグナル: 外部からワークフローに通知

// workflows.ts
import { defineSignal, setHandler } from "@temporalio/workflow";

export const cancelOrderSignal = defineSignal("cancelOrder");

export async function orderWorkflow(orderId: string): Promise<string> {
  let shouldCancel = false;

  setHandler(cancelOrderSignal, () => {
    shouldCancel = true;
  });

  // ... 処理 ...

  if (shouldCancel) {
    // キャンセル処理
    return "Order cancelled";
  }

  // ... 続き ...
}
// クライアントからシグナル送信
const handle = client.workflow.getHandle("order-123");
await handle.signal(cancelOrderSignal);

クエリ: ワークフローの状態を取得

// workflows.ts
import { defineQuery, setHandler } from "@temporalio/workflow";

export const orderStatusQuery = defineQuery<string>("orderStatus");

export async function orderWorkflow(orderId: string): Promise<string> {
  let currentStatus = "pending";

  setHandler(orderStatusQuery, () => currentStatus);

  currentStatus = "processing";
  await reserveInventory(orderId);

  currentStatus = "payment";
  await chargePayment(orderId);

  currentStatus = "shipping";
  await shipOrder(orderId);

  currentStatus = "completed";
  return "Order completed";
}
// クライアントからクエリ
const handle = client.workflow.getHandle("order-123");
const status = await handle.query(orderStatusQuery);
console.log("Current status:", status);

子ワークフローとパラレル実行

import { executeChild, ParentClosePolicy } from "@temporalio/workflow";

export async function bulkEmailWorkflow(userIds: string[]): Promise<void> {
  // 並列で子ワークフローを実行
  const promises = userIds.map((userId) =>
    executeChild(sendEmailWorkflow, {
      args: [userId],
      parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
    })
  );

  await Promise.all(promises);
}

スケジュール実行

定期実行(Cron的な使い方):

const handle = await client.workflow.start(dailyReportWorkflow, {
  taskQueue: "default",
  workflowId: "daily-report",
  cronSchedule: "0 9 * * *", // 毎日9時
});

まとめ

Temporalは、以下のようなユースケースで特に威力を発揮します:

  • 長時間実行ジョブ: 数時間〜数日かかる処理
  • 複雑なステートマシン: 多段階の承認フロー
  • マイクロサービス連携: 複数サービスをまたぐ処理
  • 確実性が必要な処理: 決済、注文、メール送信など

学習コストはありますが、一度理解すれば、非同期処理の実装が劇的に楽になります。公式ドキュメントとサンプルコードも充実しているので、ぜひ試してみてください。