Amazon EventBridgeは、イベント駆動アーキテクチャの中核となるサービスです。本記事では、運用記事で触れなかった高度な機能を解説します。
EventBridgeアーキテクチャ
flowchart TB
subgraph EventBridge["EventBridge"]
EventBus["イベントバス"]
Rules["ルール"]
Scheduler["Scheduler"]
Pipes["Pipes"]
Schema["Schema Registry"]
Archive["アーカイブ"]
end
subgraph Sources["ソース"]
AWS["AWSサービス"]
SaaS["SaaS"]
Custom["カスタムアプリ"]
end
subgraph Targets["ターゲット"]
Lambda["Lambda"]
StepFunctions["Step Functions"]
SQS["SQS"]
API["API Destination"]
end
Sources --> EventBus
EventBus --> Rules
Rules --> Targets
Scheduler --> Targets
Pipes --> Targets
style EventBridge fill:#3b82f6,color:#fff
イベントパターンマッチング
基本パターン
# 完全一致
BasicMatchRule:
Type: AWS::Events::Rule
Properties:
EventBusName: !Ref CustomEventBus
EventPattern:
source:
- "orders.api"
detail-type:
- "Order Created"
detail:
status:
- "pending"
# 配列内のいずれかにマッチ
ArrayMatchRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- "orders.api"
- "inventory.api"
detail:
priority:
- "high"
- "critical"
高度なパターン
# プレフィックスマッチ
PrefixMatchRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- prefix: "aws."
detail:
eventName:
- prefix: "Create"
# サフィックスマッチ
SuffixMatchRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
resourceArn:
- suffix: "-production"
# ワイルドカード
WildcardRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
instanceId:
- wildcard: "i-*"
# 数値比較
NumericMatchRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
amount:
- numeric:
- ">="
- 1000
quantity:
- numeric:
- ">"
- 0
- "<="
- 100
# 存在チェック
ExistsRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
errorCode:
- exists: true
successMessage:
- exists: false
# 否定マッチ
NegationRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
status:
- anything-but: "test"
environment:
- anything-but:
prefix: "dev-"
複合パターン
# OR条件($or)
OrConditionRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- "orders.api"
"$or":
- detail:
status:
- "failed"
- detail:
retryCount:
- numeric:
- ">="
- 3
# 複雑な条件の組み合わせ
ComplexRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- "payment.service"
detail-type:
- "Transaction Completed"
detail:
transactionType:
- "purchase"
- "refund"
amount:
- numeric:
- ">="
- 100
currency:
- "USD"
- "JPY"
metadata:
region:
- prefix: "ap-"
EventBridge Scheduler
概要
flowchart LR
subgraph Scheduler["EventBridge Scheduler"]
Schedule["スケジュール定義"]
Flexible["フレキシブルウィンドウ"]
Target["ターゲット"]
end
Schedule --> |"cron/rate"| Flexible
Flexible --> Target
style Scheduler fill:#f59e0b,color:#000
スケジュール設定
# 1回限りのスケジュール
OneTimeSchedule:
Type: AWS::Scheduler::Schedule
Properties:
Name: one-time-report
ScheduleExpression: "at(2024-12-31T23:59:00)"
ScheduleExpressionTimezone: Asia/Tokyo
FlexibleTimeWindow:
Mode: "OFF"
Target:
Arn: !GetAtt ReportFunction.Arn
RoleArn: !GetAtt SchedulerRole.Arn
Input: '{"type": "year-end-report"}'
# 定期実行スケジュール
RecurringSchedule:
Type: AWS::Scheduler::Schedule
Properties:
Name: daily-cleanup
ScheduleExpression: "cron(0 2 * * ? *)" # 毎日AM2時
ScheduleExpressionTimezone: Asia/Tokyo
FlexibleTimeWindow:
Mode: FLEXIBLE
MaximumWindowInMinutes: 15
Target:
Arn: !GetAtt CleanupFunction.Arn
RoleArn: !GetAtt SchedulerRole.Arn
RetryPolicy:
MaximumEventAgeInSeconds: 3600
MaximumRetryAttempts: 3
# Rate式スケジュール
RateSchedule:
Type: AWS::Scheduler::Schedule
Properties:
Name: health-check
ScheduleExpression: "rate(5 minutes)"
FlexibleTimeWindow:
Mode: FLEXIBLE
MaximumWindowInMinutes: 2
Target:
Arn: !GetAtt HealthCheckFunction.Arn
RoleArn: !GetAtt SchedulerRole.Arn
State: ENABLED
スケジュールグループ
ScheduleGroup:
Type: AWS::Scheduler::ScheduleGroup
Properties:
Name: production-schedules
Tags:
- Key: Environment
Value: Production
# グループ内のスケジュール
GroupedSchedule:
Type: AWS::Scheduler::Schedule
Properties:
GroupName: !Ref ScheduleGroup
Name: batch-job
ScheduleExpression: "cron(0 */6 * * ? *)"
FlexibleTimeWindow:
Mode: FLEXIBLE
MaximumWindowInMinutes: 30
Target:
Arn: !Ref BatchJobStateMachine
RoleArn: !GetAtt SchedulerRole.Arn
EventBridge Pipes
概要
flowchart LR
subgraph Pipes["EventBridge Pipes"]
Source["ソース<br/>SQS/Kinesis/DynamoDB"]
Filter["フィルタリング"]
Enrichment["エンリッチメント"]
Target["ターゲット"]
end
Source --> Filter
Filter --> Enrichment
Enrichment --> Target
style Pipes fill:#8b5cf6,color:#fff
Pipe設定
# SQSからLambdaへのPipe
SQSToLambdaPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: orders-processing-pipe
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt OrdersQueue.Arn
SourceParameters:
SqsQueueParameters:
BatchSize: 10
MaximumBatchingWindowInSeconds: 30
FilterCriteria:
Filters:
- Pattern: '{"body": {"priority": ["high", "critical"]}}'
Enrichment: !GetAtt EnrichmentFunction.Arn
EnrichmentParameters:
InputTemplate: '{"orderId": <$.body.orderId>, "customerId": <$.body.customerId>}'
Target: !GetAtt ProcessingFunction.Arn
TargetParameters:
LambdaFunctionParameters:
InvocationType: REQUEST_RESPONSE
# DynamoDB StreamsからStep Functionsへ
DynamoDBToStepFunctionsPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: order-state-change-pipe
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt OrdersTable.StreamArn
SourceParameters:
DynamoDBStreamParameters:
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
StartingPosition: LATEST
MaximumRetryAttempts: 3
DeadLetterConfig:
Arn: !GetAtt DLQ.Arn
FilterCriteria:
Filters:
- Pattern: |
{
"eventName": ["MODIFY"],
"dynamodb": {
"NewImage": {
"status": {"S": ["completed", "cancelled"]}
}
}
}
Target: !Ref OrderWorkflow
TargetParameters:
StepFunctionStateMachineParameters:
InvocationType: FIRE_AND_FORGET
# Kinesis Data StreamsからEventBridge API Destinationへ
KinesisToApiPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: events-to-external-api
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt EventsStream.Arn
SourceParameters:
KinesisStreamParameters:
BatchSize: 50
StartingPosition: LATEST
MaximumBatchingWindowInSeconds: 10
Enrichment: !GetAtt TransformFunction.Arn
Target: !GetAtt ExternalApiDestination.Arn
TargetParameters:
HttpParameters:
HeaderParameters:
Content-Type: application/json
X-Api-Key: !Sub "{{resolve:secretsmanager:api-key}}"
Schema Registry
スキーマ検出
SchemaDiscoverer:
Type: AWS::EventSchemas::Discoverer
Properties:
SourceArn: !GetAtt CustomEventBus.Arn
Description: Auto-discover schemas from custom events
# カスタムスキーマ定義
OrderSchema:
Type: AWS::EventSchemas::Schema
Properties:
RegistryName: discovered-schemas
SchemaName: orders@OrderCreated
Type: OpenApi3
Content: |
{
"openapi": "3.0.0",
"info": {
"title": "OrderCreated",
"version": "1.0.0"
},
"paths": {},
"components": {
"schemas": {
"OrderCreated": {
"type": "object",
"required": ["orderId", "customerId", "items", "total"],
"properties": {
"orderId": {"type": "string"},
"customerId": {"type": "string"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"productId": {"type": "string"},
"quantity": {"type": "integer"},
"price": {"type": "number"}
}
}
},
"total": {"type": "number"},
"currency": {"type": "string", "default": "JPY"},
"createdAt": {"type": "string", "format": "date-time"}
}
}
}
}
}
コード生成
# スキーマからコード生成
aws schemas get-code-binding-source \
--registry-name discovered-schemas \
--schema-name orders@OrderCreated \
--language Python36 \
--schema-version 1 \
--output code.zip
# TypeScript用
aws schemas get-code-binding-source \
--registry-name discovered-schemas \
--schema-name orders@OrderCreated \
--language TypeScript3 \
--schema-version 1 \
--output code.zip
アーカイブとリプレイ
アーカイブ設定
EventArchive:
Type: AWS::Events::Archive
Properties:
ArchiveName: orders-archive
SourceArn: !GetAtt CustomEventBus.Arn
Description: Archive all order events
EventPattern:
source:
- "orders.api"
RetentionDays: 90
# 全イベントアーカイブ
FullArchive:
Type: AWS::Events::Archive
Properties:
ArchiveName: full-archive
SourceArn: !GetAtt CustomEventBus.Arn
RetentionDays: 365
リプレイ実行
# リプレイの開始
aws events start-replay \
--replay-name order-replay-2024-01-15 \
--event-source-arn arn:aws:events:ap-northeast-1:123456789012:archive/orders-archive \
--destination '{"Arn": "arn:aws:events:ap-northeast-1:123456789012:event-bus/orders"}' \
--event-start-time 2024-01-15T00:00:00Z \
--event-end-time 2024-01-15T23:59:59Z
# リプレイ状態確認
aws events describe-replay --replay-name order-replay-2024-01-15
API Destinations
外部API連携
# コネクション(認証設定)
ApiConnection:
Type: AWS::Events::Connection
Properties:
Name: external-api-connection
AuthorizationType: API_KEY
AuthParameters:
ApiKeyAuthParameters:
ApiKeyName: X-API-Key
ApiKeyValue: !Sub "{{resolve:secretsmanager:external-api-key}}"
# OAuth認証
OAuthConnection:
Type: AWS::Events::Connection
Properties:
Name: oauth-connection
AuthorizationType: OAUTH_CLIENT_CREDENTIALS
AuthParameters:
OAuthParameters:
AuthorizationEndpoint: https://auth.example.com/oauth/token
ClientParameters:
ClientID: !Sub "{{resolve:secretsmanager:oauth-client-id}}"
ClientSecret: !Sub "{{resolve:secretsmanager:oauth-client-secret}}"
HttpMethod: POST
OAuthHttpParameters:
BodyParameters:
- Key: grant_type
Value: client_credentials
IsValueSecret: false
# API Destination
ApiDestination:
Type: AWS::Events::ApiDestination
Properties:
Name: external-webhook
ConnectionArn: !GetAtt ApiConnection.Arn
InvocationEndpoint: https://api.example.com/webhook
HttpMethod: POST
InvocationRateLimitPerSecond: 100
# ルール
WebhookRule:
Type: AWS::Events::Rule
Properties:
EventBusName: !Ref CustomEventBus
EventPattern:
source:
- "orders.api"
detail-type:
- "Order Completed"
Targets:
- Id: ExternalWebhook
Arn: !GetAtt ApiDestination.Arn
RoleArn: !GetAtt EventBridgeRole.Arn
HttpParameters:
HeaderParameters:
Content-Type: application/json
QueryStringParameters:
source: aws
InputTransformer:
InputPathsMap:
orderId: "$.detail.orderId"
total: "$.detail.total"
InputTemplate: |
{
"event": "order_completed",
"data": {
"order_id": "<orderId>",
"amount": <total>
}
}
RetryPolicy:
MaximumEventAgeInSeconds: 3600
MaximumRetryAttempts: 3
DeadLetterConfig:
Arn: !GetAtt DLQ.Arn
クロスアカウント/リージョン
クロスアカウントイベント
# ソースアカウント
CrossAccountRule:
Type: AWS::Events::Rule
Properties:
EventBusName: default
EventPattern:
source:
- "aws.ec2"
detail-type:
- "EC2 Instance State-change Notification"
Targets:
- Id: CentralEventBus
Arn: arn:aws:events:ap-northeast-1:999999999999:event-bus/central
RoleArn: !GetAtt CrossAccountRole.Arn
# ターゲットアカウントのイベントバスポリシー
CentralEventBusPolicy:
Type: AWS::Events::EventBusPolicy
Properties:
EventBusName: central
StatementId: AllowCrossAccount
Statement:
Effect: Allow
Principal:
AWS:
- "111111111111"
- "222222222222"
Action: events:PutEvents
Resource: !GetAtt CentralEventBus.Arn
Condition:
StringEquals:
events:source:
- "aws.ec2"
ベストプラクティス
flowchart TB
subgraph BestPractices["Best Practices"]
Schema["スキーマ定義でイベント契約"]
Archive["重要イベントのアーカイブ"]
DLQ["DLQで失敗イベント捕捉"]
Pipes["Pipesで変換処理統合"]
end
style BestPractices fill:#22c55e,color:#fff
| カテゴリ | 項目 |
|---|---|
| 設計 | Schema Registryでイベント契約 |
| 信頼性 | アーカイブで監査・リプレイ対応 |
| 運用 | DLQで失敗イベント管理 |
| 効率化 | Pipesでストリーム処理統合 |
まとめ
| 機能 | 用途 |
|---|---|
| イベントパターン | 柔軟なマッチング |
| Scheduler | 時間ベースのトリガー |
| Pipes | ストリーム処理の統合 |
| API Destinations | 外部API連携 |
EventBridgeの高度な機能を活用することで、柔軟なイベント駆動アーキテクチャを構築できます。