NATS メッセージングシステム入門 - 軽量高速な分散通信基盤
NATSは、クラウドネイティブアプリケーション向けの軽量・高速・シンプルなメッセージングシステムです。マイクロサービス間の通信、イベント駆動アーキテクチャ、リアルタイムデータ配信に最適です。
本記事では、NATSの基本概念からPub/Sub、Request/Reply、JetStreamによる永続化、実際のNode.js/Go実装まで詳しく解説します。
NATSとは?
NATSは、**Cloud Native Computing Foundation (CNCF)**のインキュベーティングプロジェクトである、オープンソースのメッセージングシステムです。
主な特徴
- 超軽量: 単一バイナリ、メモリ使用量わずか数MB
- 高速: 100万メッセージ/秒以上の処理能力
- シンプル: 学習曲線が緩やか、設定がほぼ不要
- クラウドネイティブ: Kubernetes、Docker環境に最適化
- 多言語対応: 40以上のクライアントライブラリ
- At-Most-Once / At-Least-Once: 配信保証の選択が可能
他のメッセージングシステムとの比較
| 機能 | NATS | RabbitMQ | Kafka |
|---|---|---|---|
| 速度 | ✅ 最高速 | ⚠️ 高速 | ⚠️ 高速 |
| シンプルさ | ✅ 非常に簡単 | ⚠️ 中程度 | ❌ 複雑 |
| メモリ | ✅ 数MB | ⚠️ 数百MB | ❌ 数GB |
| 永続化 | ✅ JetStream | ✅ デフォルト | ✅ デフォルト |
| スケーラビリティ | ✅ 優秀 | ⚠️ 良好 | ✅ 最高 |
NATSはシンプルさと速度を重視し、Kafkaは大規模データストリームに最適化されています。
NATSのセットアップ
Docker での起動
最も簡単な方法:
# NATS サーバーを起動
docker run -p 4222:4222 -p 8222:8222 nats:latest
# JetStream有効化
docker run -p 4222:4222 -p 8222:8222 nats:latest -js
Docker Compose
docker-compose.yml:
version: '3.8'
services:
nats:
image: nats:2.10-alpine
container_name: nats
ports:
- "4222:4222" # クライアント接続
- "8222:8222" # HTTPモニタリング
- "6222:6222" # クラスタリング
command:
- "-js" # JetStream有効化
- "-m" # モニタリング有効化
- "8222"
- "--store_dir=/data"
volumes:
- nats_data:/data
restart: unless-stopped
volumes:
nats_data:
起動:
docker compose up -d
バイナリインストール
macOS:
brew install nats-server
nats-server -js
Linux:
# ダウンロード
wget https://github.com/nats-io/nats-server/releases/download/v2.10.7/nats-server-v2.10.7-linux-amd64.tar.gz
tar -xzf nats-server-v2.10.7-linux-amd64.tar.gz
sudo mv nats-server-v2.10.7-linux-amd64/nats-server /usr/local/bin/
# 起動
nats-server -js
接続確認
ブラウザで http://localhost:8222 にアクセスすると、NATSのモニタリング画面が表示されます。
基本パターン
1. Publish/Subscribe (Pub/Sub)
1対多の配信。1つのメッセージを複数の購読者が受信します。
Node.js実装
// publisher.js
import { connect } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
// メッセージを配信
nc.publish('news.updates', JSON.stringify({
title: 'Breaking News',
content: 'Something happened!',
timestamp: Date.now()
}));
console.log('メッセージを配信しました');
await nc.close();
// subscriber.js
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
// サブスクライバー1
const sub1 = nc.subscribe('news.updates');
(async () => {
for await (const msg of sub1) {
const data = JSON.parse(sc.decode(msg.data));
console.log('[購読者1] 受信:', data);
}
})();
// サブスクライバー2
const sub2 = nc.subscribe('news.updates');
(async () => {
for await (const msg of sub2) {
const data = JSON.parse(sc.decode(msg.data));
console.log('[購読者2] 受信:', data);
}
})();
console.log('ニュースを購読中...');
Go実装
// publisher.go
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
type NewsUpdate struct {
Title string `json:"title"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
news := NewsUpdate{
Title: "Breaking News",
Content: "Something happened!",
Timestamp: time.Now().Unix(),
}
data, _ := json.Marshal(news)
nc.Publish("news.updates", data)
log.Println("メッセージを配信しました")
}
// subscriber.go
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
nc.Subscribe("news.updates", func(msg *nats.Msg) {
var news NewsUpdate
json.Unmarshal(msg.Data, &news)
log.Printf("[購読] %s: %s\n", news.Title, news.Content)
})
log.Println("ニュースを購読中...")
select {} // 無限待機
}
2. Request/Reply
同期的なリクエスト・レスポンスパターン。RPCのような通信を実現します。
Node.js実装
// server.js (レスポンダー)
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
// リクエストに応答
nc.subscribe('user.get', {
callback: (err, msg) => {
const userId = sc.decode(msg.data);
console.log(`[サーバー] ユーザーID ${userId} のリクエストを受信`);
const user = {
id: userId,
name: 'John Doe',
email: 'john@example.com'
};
msg.respond(sc.encode(JSON.stringify(user)));
}
});
console.log('ユーザーサービスが起動しました');
// client.js (リクエスター)
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
// リクエストを送信(タイムアウト: 5秒)
const response = await nc.request('user.get', sc.encode('123'), {
timeout: 5000
});
const user = JSON.parse(sc.decode(response.data));
console.log('[クライアント] ユーザー情報:', user);
await nc.close();
Go実装
// server.go
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
nc.Subscribe("user.get", func(msg *nats.Msg) {
userId := string(msg.Data)
log.Printf("[サーバー] ユーザーID %s のリクエスト\n", userId)
user := User{
ID: userId,
Name: "John Doe",
Email: "john@example.com",
}
data, _ := json.Marshal(user)
msg.Respond(data)
})
log.Println("ユーザーサービスが起動しました")
select {}
}
3. Queue Groups
負荷分散。複数のワーカーが1つのキューを共有し、メッセージが均等に分散されます。
Node.js実装
// worker.js
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
const workerName = process.env.WORKER_NAME || 'worker';
// キューグループ "workers" に参加
nc.subscribe('tasks.process', {
queue: 'workers',
callback: (err, msg) => {
const task = JSON.parse(sc.decode(msg.data));
console.log(`[${workerName}] タスク処理中:`, task);
// 処理をシミュレート
setTimeout(() => {
console.log(`[${workerName}] タスク完了:`, task.id);
}, Math.random() * 3000);
}
});
console.log(`${workerName} が起動しました`);
// producer.js
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
// タスクを複数送信
for (let i = 1; i <= 10; i++) {
const task = {
id: i,
type: 'data-processing',
payload: `Task ${i}`
};
nc.publish('tasks.process', sc.encode(JSON.stringify(task)));
console.log(`タスク ${i} を送信`);
}
await nc.close();
複数のワーカーを起動:
WORKER_NAME=worker1 node worker.js &
WORKER_NAME=worker2 node worker.js &
WORKER_NAME=worker3 node worker.js &
node producer.js
各ワーカーがタスクを均等に分散して処理します。
JetStream: 永続化とストリーミング
JetStreamは、NATSの永続化・ストリーミング機能です。
Streamの作成
import { connect, AckPolicy } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const jsm = await nc.jetstreamManager();
// Streamを作成
await jsm.streams.add({
name: 'ORDERS',
subjects: ['orders.*'],
retention: 'limits',
max_age: 86400000000000, // 1日 (ナノ秒)
max_msgs: 10000,
storage: 'file'
});
console.log('Stream "ORDERS" を作成しました');
メッセージの永続化
// producer.js
const js = nc.jetstream();
// 永続化メッセージを送信
const pa = await js.publish('orders.created', JSON.stringify({
orderId: '12345',
amount: 9999,
timestamp: Date.now()
}));
console.log('メッセージが永続化されました:', pa.seq);
Consumer (購読者)
// consumer.js
const js = nc.jetstream();
// Consumerを作成
const consumer = await js.consumers.get('ORDERS', 'order-processor');
// メッセージを消費
const messages = await consumer.consume();
for await (const msg of messages) {
const order = JSON.parse(msg.string());
console.log('注文を処理:', order);
// 処理完了を確認
msg.ack();
}
At-Least-Once 配信保証
await jsm.consumers.add('ORDERS', {
durable_name: 'processor',
ack_policy: AckPolicy.Explicit,
max_deliver: 3, // 最大3回再配信
ack_wait: 30000000000 // 30秒待機 (ナノ秒)
});
実践例: マイクロサービス通信
サービス構成
User Service (Port 3001)
↓ (nats)
Order Service (Port 3002)
↓ (nats)
Notification Service (Port 3003)
User Service
// user-service.js
import { connect, StringCodec } from 'nats';
import express from 'express';
const app = express();
app.use(express.json());
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
const js = nc.jetstream();
// ユーザー作成
app.post('/users', async (req, res) => {
const user = {
id: Date.now().toString(),
...req.body
};
// JetStreamでイベント配信
await js.publish('users.created', sc.encode(JSON.stringify(user)));
res.json(user);
});
app.listen(3001, () => console.log('User Service on port 3001'));
Order Service
// order-service.js
import { connect, StringCodec } from 'nats';
import express from 'express';
const app = express();
app.use(express.json());
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
const js = nc.jetstream();
// ユーザー作成イベントを購読
const consumer = await js.consumers.get('USERS', 'order-service');
(async () => {
const messages = await consumer.consume();
for await (const msg of messages) {
const user = JSON.parse(sc.decode(msg.data));
console.log('[Order Service] 新規ユーザー:', user);
msg.ack();
}
})();
// 注文作成
app.post('/orders', async (req, res) => {
const order = {
id: Date.now().toString(),
...req.body
};
// イベント配信
await js.publish('orders.created', sc.encode(JSON.stringify(order)));
res.json(order);
});
app.listen(3002, () => console.log('Order Service on port 3002'));
Notification Service
// notification-service.js
import { connect, StringCodec } from 'nats';
const nc = await connect({ servers: 'localhost:4222' });
const sc = StringCodec();
const js = nc.jetstream();
// 複数のイベントを購読
const orderConsumer = await js.consumers.get('ORDERS', 'notifier');
(async () => {
const messages = await orderConsumer.consume();
for await (const msg of messages) {
const order = JSON.parse(sc.decode(msg.data));
console.log('[通知] 新規注文:', order.id);
// メール送信などの処理
msg.ack();
}
})();
console.log('Notification Service が起動しました');
モニタリングとデバッグ
NATS CLI
# NATS CLIのインストール
brew install nats-io/nats-tools/nats
# サーバー情報
nats server info
# Streamの確認
nats stream ls
nats stream info ORDERS
# Consumerの確認
nats consumer ls ORDERS
# メッセージの送信
nats pub orders.created '{"orderId": "123"}'
# メッセージの購読
nats sub orders.created
HTTPモニタリング
ブラウザで http://localhost:8222 にアクセス:
/varz: サーバー変数/connz: 接続情報/routez: ルート情報/subsz: サブスクリプション情報
まとめ
NATSは軽量・高速・シンプルなメッセージングシステムとして、マイクロサービスやイベント駆動アーキテクチャに最適です。
NATSの強み
- 超高速: 100万メッセージ/秒以上
- 軽量: メモリ使用量わずか数MB
- シンプル: 学習コストが低い
- 多様なパターン: Pub/Sub、Request/Reply、Queue Groups
- 永続化: JetStreamで信頼性の高い配信
適用場面
- マイクロサービス間の通信
- イベント駆動アーキテクチャ
- リアルタイムデータ配信
- IoTデバイスの通信
- チャットアプリケーション
学習リソース
NATSは、複雑なメッセージングを驚くほどシンプルに実現します。次のプロジェクトでぜひ活用してみてください。