Tinybirdでリアルタイムアナリティクスパイプライン構築


Tinybirdとは

Tinybirdは、リアルタイムデータアナリティクスを簡単に構築できるデータプラットフォームです。SQLでデータパイプラインを定義するだけで、高速なAPIエンドポイントを自動生成し、数ミリ秒のレスポンスタイムでデータを提供できます。

Tinybirdの主な特徴

  • SQLベースのデータパイプライン: 学習コストが低く、すぐに始められる
  • リアルタイム取り込み: イベントストリーミングとバッチ両方に対応
  • 高速クエリ: ClickHouseベースで大量データも高速処理
  • 自動API生成: SQLクエリが即座にRESTful APIエンドポイントに
  • バージョン管理: Git統合でCI/CD可能
  • スケーラブル: 自動スケーリングで大規模データに対応

なぜTinybirdを使うのか

従来のアナリティクス構築では、データウェアハウス、ETLツール、APIサーバー、キャッシュレイヤーなど、複数のコンポーネントが必要でした。Tinybirdはこれらを統合し、SQLだけでエンドツーエンドのアナリティクスパイプラインを構築できます。

従来の構成:
データソース → Kafka → Spark → データウェアハウス → APIサーバー → キャッシュ → クライアント

Tinybirdの構成:
データソース → Tinybird (SQLパイプライン) → クライアント

セットアップ

アカウント作成

# Tinybirdにサインアップ
# https://www.tinybird.co/signup

# CLIツールをインストール
curl https://cli.tinybird.co/install.sh | sh

# または npm経由で
npm install -g @tinybirdco/cli

# または pipenv経由で
pip install tinybird-cli

# 認証
tb auth

# ワークスペース作成
tb workspace create my-analytics

プロジェクト初期化

# プロジェクトディレクトリを作成
mkdir my-analytics-project
cd my-analytics-project

# Tinybirdプロジェクトを初期化
tb init

# 生成されるファイル構造:
# .
# ├── .tinyb
# ├── datasources/
# ├── pipes/
# └── endpoints/

データソースの定義

Tinybirdでは、まずデータソース(Data Source)を定義します。

イベントトラッキング用データソース

-- datasources/events.datasource

SCHEMA >
    `timestamp` DateTime,
    `event_name` String,
    `user_id` String,
    `session_id` String,
    `properties` String,
    `page_url` String,
    `user_agent` String,
    `ip_address` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp, user_id, event_name"

アプリケーションログ用データソース

-- datasources/app_logs.datasource

SCHEMA >
    `timestamp` DateTime,
    `level` LowCardinality(String),
    `service` LowCardinality(String),
    `message` String,
    `error_code` Nullable(String),
    `user_id` Nullable(String),
    `request_id` String,
    `duration_ms` UInt32,
    `metadata` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp, service, level"
ENGINE_TTL "timestamp + INTERVAL 30 DAY"

CSVファイルからデータソース作成

# CSVファイルをアップロード
tb datasource append users_data ./data/users.csv

# または、データソース定義とともに
tb push datasources/users.datasource --csv ./data/users.csv

パイプラインの構築

パイプ(Pipe)は、データソースを変換・集計するSQLクエリです。

基本的なパイプ

-- pipes/events_last_hour.pipe

SELECT
    event_name,
    count() as count,
    uniq(user_id) as unique_users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY event_name
ORDER BY count DESC

複雑な集計パイプ

-- pipes/user_analytics.pipe

SELECT
    toStartOfHour(timestamp) as hour,
    event_name,
    count() as event_count,
    uniq(user_id) as unique_users,
    uniq(session_id) as unique_sessions,
    avg(JSONExtractFloat(properties, 'duration')) as avg_duration
FROM events
WHERE
    timestamp >= {{DateTime(start_date, '2024-01-01 00:00:00')}}
    AND timestamp < {{DateTime(end_date, '2024-01-31 23:59:59')}}
    {% if defined(event_filter) %}
        AND event_name = {{String(event_filter)}}
    {% end %}
GROUP BY hour, event_name
ORDER BY hour DESC, event_count DESC

マテリアライズドビュー

頻繁に使うクエリはマテリアライズドビューとして事前計算できます。

-- pipes/hourly_metrics.pipe

MATERIALIZED VIEW hourly_metrics
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, event_name)
AS SELECT
    toStartOfHour(timestamp) as hour,
    event_name,
    count() as count,
    uniq(user_id) as unique_users
FROM events
GROUP BY hour, event_name

APIエンドポイントの作成

パイプをAPIエンドポイントとして公開します。

基本的なエンドポイント

-- endpoints/events_stats.endpoint

SELECT * FROM events_last_hour

DESCRIPTION >
    過去1時間のイベント統計を取得
# デプロイ
tb push endpoints/events_stats.endpoint

# 生成されるURL:
# https://api.tinybird.co/v0/pipes/events_stats.json?token=YOUR_TOKEN

パラメータ付きエンドポイント

-- endpoints/user_analytics_api.endpoint

SELECT * FROM user_analytics

DESCRIPTION >
    ユーザーアナリティクスデータを期間指定で取得

PARAMS >
    start_date: DateTime = '2024-01-01 00:00:00',
    end_date: DateTime = '2024-01-31 23:59:59',
    event_filter: String = ''
# APIリクエスト例
curl "https://api.tinybird.co/v0/pipes/user_analytics_api.json?token=YOUR_TOKEN&start_date=2024-02-01&end_date=2024-02-05&event_filter=page_view"

実践例: Webアナリティクスダッシュボード

実際のWebアナリティクスシステムを構築してみましょう。

1. イベント収集スクリプト

// frontend/analytics.js

class TinybirdAnalytics {
  constructor(dataSourceUrl, token) {
    this.url = dataSourceUrl;
    this.token = token;
    this.sessionId = this.generateSessionId();
  }

  generateSessionId() {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  async track(eventName, properties = {}) {
    const event = {
      timestamp: new Date().toISOString(),
      event_name: eventName,
      user_id: this.getUserId(),
      session_id: this.sessionId,
      properties: JSON.stringify(properties),
      page_url: window.location.href,
      user_agent: navigator.userAgent,
      ip_address: '', // サーバー側で付与
    };

    try {
      await fetch(this.url, {
        method: 'POST',
        headers: {
          'Authorization': `Bearer ${this.token}`,
          'Content-Type': 'application/json',
        },
        body: JSON.stringify(event),
      });
    } catch (error) {
      console.error('Analytics tracking error:', error);
    }
  }

  getUserId() {
    let userId = localStorage.getItem('analytics_user_id');
    if (!userId) {
      userId = `user-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
      localStorage.setItem('analytics_user_id', userId);
    }
    return userId;
  }

  // ページビュートラッキング
  trackPageView() {
    this.track('page_view', {
      title: document.title,
      referrer: document.referrer,
    });
  }

  // クリックイベント
  trackClick(element, label) {
    this.track('click', {
      element: element,
      label: label,
    });
  }

  // カスタムイベント
  trackCustom(eventName, data) {
    this.track(eventName, data);
  }
}

// 初期化
const analytics = new TinybirdAnalytics(
  'https://api.tinybird.co/v0/events?name=events',
  'YOUR_INGESTION_TOKEN'
);

// 自動ページビュートラッキング
analytics.trackPageView();

2. パイプライン定義

-- pipes/page_views_by_page.pipe

SELECT
    page_url,
    count() as views,
    uniq(user_id) as unique_visitors,
    uniq(session_id) as unique_sessions,
    avg(if(properties != '', JSONExtractFloat(properties, 'load_time'), 0)) as avg_load_time
FROM events
WHERE
    event_name = 'page_view'
    AND timestamp >= now() - INTERVAL {{Int32(days, 7)}} DAY
GROUP BY page_url
ORDER BY views DESC
LIMIT {{Int32(limit, 100)}}
-- pipes/user_journey.pipe

SELECT
    user_id,
    session_id,
    arraySort(groupArray(tuple(timestamp, event_name, page_url))) as journey
FROM events
WHERE
    user_id = {{String(user_id)}}
    AND timestamp >= now() - INTERVAL 30 DAY
GROUP BY user_id, session_id
ORDER BY session_id DESC
-- pipes/realtime_dashboard.pipe

SELECT
    toStartOfMinute(timestamp) as minute,
    count() as events,
    uniq(user_id) as active_users,
    countIf(event_name = 'page_view') as page_views,
    countIf(event_name = 'click') as clicks,
    countIf(event_name = 'conversion') as conversions
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute DESC

3. Next.jsダッシュボード

// app/dashboard/page.tsx

import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card';

interface AnalyticsData {
  page_url: string;
  views: number;
  unique_visitors: number;
  unique_sessions: number;
  avg_load_time: number;
}

async function getAnalytics(days: number = 7): Promise<AnalyticsData[]> {
  const response = await fetch(
    `https://api.tinybird.co/v0/pipes/page_views_by_page.json?token=${process.env.TINYBIRD_TOKEN}&days=${days}`,
    { next: { revalidate: 60 } } // 1分キャッシュ
  );

  const data = await response.json();
  return data.data;
}

export default async function DashboardPage() {
  const analytics = await getAnalytics(7);

  const totalViews = analytics.reduce((sum, item) => sum + item.views, 0);
  const totalVisitors = analytics.reduce((sum, item) => sum + item.unique_visitors, 0);

  return (
    <div className="space-y-6">
      <div className="grid gap-4 md:grid-cols-3">
        <Card>
          <CardHeader>
            <CardTitle>総ページビュー</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="text-3xl font-bold">{totalViews.toLocaleString()}</div>
          </CardContent>
        </Card>

        <Card>
          <CardHeader>
            <CardTitle>ユニークビジター</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="text-3xl font-bold">{totalVisitors.toLocaleString()}</div>
          </CardContent>
        </Card>

        <Card>
          <CardHeader>
            <CardTitle>平均読み込み時間</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="text-3xl font-bold">
              {(analytics.reduce((sum, item) => sum + item.avg_load_time, 0) / analytics.length).toFixed(2)}ms
            </div>
          </CardContent>
        </Card>
      </div>

      <Card>
        <CardHeader>
          <CardTitle>人気ページ</CardTitle>
        </CardHeader>
        <CardContent>
          <table className="w-full">
            <thead>
              <tr>
                <th className="text-left">ページ</th>
                <th className="text-right">ビュー</th>
                <th className="text-right">ユニーク訪問者</th>
              </tr>
            </thead>
            <tbody>
              {analytics.slice(0, 10).map((item) => (
                <tr key={item.page_url}>
                  <td>{item.page_url}</td>
                  <td className="text-right">{item.views}</td>
                  <td className="text-right">{item.unique_visitors}</td>
                </tr>
              ))}
            </tbody>
          </table>
        </CardContent>
      </Card>
    </div>
  );
}

リアルタイムデータ取り込み

Events API経由

# 単一イベント
curl -X POST "https://api.tinybird.co/v0/events?name=events" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d '{
    "timestamp": "2024-02-05 10:30:00",
    "event_name": "purchase",
    "user_id": "user123",
    "session_id": "session456",
    "properties": "{\"amount\": 99.99, \"product_id\": \"prod789\"}",
    "page_url": "https://example.com/checkout",
    "user_agent": "Mozilla/5.0...",
    "ip_address": "192.168.1.1"
  }'

# バッチイベント(NDJSON形式)
curl -X POST "https://api.tinybird.co/v0/events?name=events" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d '{"timestamp": "2024-02-05 10:30:00", "event_name": "page_view", "user_id": "user1"}
{"timestamp": "2024-02-05 10:31:00", "event_name": "click", "user_id": "user2"}
{"timestamp": "2024-02-05 10:32:00", "event_name": "conversion", "user_id": "user3"}'

Kafkaからのストリーミング取り込み

-- datasources/kafka_events.datasource

CONNECTOR "kafka"
CONNECTOR_TOPIC "analytics-events"
CONNECTOR_BOOTSTRAP_SERVERS "kafka.example.com:9092"

SCHEMA >
    `timestamp` DateTime,
    `event_name` String,
    `user_id` String,
    `properties` String

ENGINE "MergeTree"
ENGINE_SORTING_KEY "timestamp, user_id"

パフォーマンス最適化

1. 適切なソーティングキー

-- 悪い例: よく使うフィルタがソーティングキーに含まれていない
ENGINE_SORTING_KEY "timestamp"

-- 良い例: フィルタ条件を含める
ENGINE_SORTING_KEY "timestamp, user_id, event_name"

2. パーティショニング

-- 月次パーティション(長期保存用)
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"

-- 日次パーティション(リアルタイム分析用)
ENGINE_PARTITION_KEY "toYYYYMMDD(timestamp)"

3. TTL設定

-- 30日後に自動削除
ENGINE_TTL "timestamp + INTERVAL 30 DAY"

-- 段階的削除(7日後に詳細データ削除、30日後に完全削除)
ENGINE_TTL "
    properties + INTERVAL 7 DAY,
    timestamp + INTERVAL 30 DAY
"

CI/CDパイプライン

# .github/workflows/tinybird.yml

name: Deploy Tinybird

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Install Tinybird CLI
        run: curl https://cli.tinybird.co/install.sh | sh

      - name: Deploy to Tinybird
        env:
          TB_TOKEN: ${{ secrets.TINYBIRD_ADMIN_TOKEN }}
        run: |
          tb auth --token $TB_TOKEN
          tb push --force

      - name: Run tests
        run: tb test

まとめ

Tinybirdは、SQLだけでリアルタイムアナリティクスパイプラインを構築できる強力なプラットフォームです。以下のようなユースケースに最適です:

  • Webアナリティクス: ページビュー、ユーザー行動分析
  • プロダクトアナリティクス: 機能利用状況、ユーザーエンゲージメント
  • アプリケーションモニタリング: ログ集計、エラートラッキング
  • IoTデータ処理: センサーデータのリアルタイム分析
  • 広告レポーティング: インプレッション、クリック、コンバージョン追跡

従来のデータウェアハウスと比較して、セットアップが簡単で、リアルタイム性が高く、コストも抑えられます。SQLの知識があれば今すぐ始められるので、ぜひTinybirdを試してみてください!