DynamoDB Streams
Process DynamoDB stream events in AWS Lambda functions with type-safe entity deserialization and fluent event handling.
Installation
dotnet add package Oproto.FluentDynamoDb.Streams
Overview
The Streams package provides a fluent API for processing DynamoDB stream records. It integrates with the source generator for type-safe entity deserialization and supports both single-entity and multi-entity (single-table design) processing.
Single-Entity Processing
Use Process<TEntity>() to handle stream records for a single entity type:
using Oproto.FluentDynamoDb.Streams;
using Amazon.Lambda.DynamoDBEvents;
public class OrderStreamHandler
{
public async Task HandleAsync(DynamoDBEvent dynamoEvent)
{
foreach (var record in dynamoEvent.Records)
{
await record.Process<Order>()
.OnInsert(async (_, newOrder) =>
{
await _emailService.SendOrderConfirmation(newOrder);
})
.OnUpdate(async (oldOrder, newOrder) =>
{
if (oldOrder.Status != newOrder.Status)
{
await _notificationService.NotifyStatusChange(newOrder);
}
})
.OnDelete(async (deletedOrder, _) =>
{
await _searchIndex.RemoveOrder(deletedOrder.OrderId);
})
.ProcessAsync();
}
}
}
Event Handlers
| Handler | Event | Parameters |
|---|---|---|
OnInsert | New item added | (TEntity? old, TEntity new) — old is always null |
OnUpdate | Item modified | (TEntity old, TEntity new) — both populated |
OnDelete | Item removed | (TEntity old, TEntity? new) — new is always null |
OnTtlDelete | TTL expiration | (TEntity old, TEntity? new) — TTL-triggered only |
OnNonTtlDelete | Manual deletion | (TEntity old, TEntity? new) — non-TTL only |
Filtering
Entity-Level Filtering (After Deserialization)
Use Where() to filter based on entity properties:
await record.Process<Order>()
.Where(o => o.Status == "active")
.Where(o => o.Total > 100) // Multiple filters use AND logic
.OnInsert(async (_, order) => await ProcessHighValueOrder(order))
.ProcessAsync();
Key-Level Filtering (Before Deserialization)
Use WhereKey() for performance — filters are evaluated before deserialization:
await record.Process<Order>()
.WhereKey(keys => keys["pk"].S.StartsWith("ORDER#"))
.WhereKey(keys => keys["sk"].S.StartsWith("2024"))
.Where(o => o.Total > 100) // Entity filter applied after deserialization
.OnInsert(async (_, order) => await ProcessOrder(order))
.ProcessAsync();
TTL vs Manual Delete Detection
Distinguish between items deleted by TTL expiration and manual deletions:
await record.Process<Session>()
.OnTtlDelete(async (session, _) =>
{
// Session expired automatically (DynamoDB TTL)
await _auditLog.LogSessionExpiry(session.UserId);
})
.OnNonTtlDelete(async (session, _) =>
{
// User explicitly logged out
await _auditLog.LogLogout(session.UserId);
await _notificationService.SendLogoutConfirmation(session.UserId);
})
.ProcessAsync();
TTL deletions are identified by UserIdentity.Type == "Service" and UserIdentity.PrincipalId == "dynamodb.amazonaws.com".
Multi-Entity Processing (Single-Table Design)
For tables with multiple entity types, use discriminator-based routing:
await record.Process()
.WithDiscriminator("EntityType")
.For<User>("User")
.Where(u => u.Status == "active")
.OnInsert(async (_, user) => await IndexUser(user))
.OnUpdate(async (oldUser, newUser) =>
{
if (oldUser.Email != newUser.Email)
await _emailService.SendEmailChangeNotification(newUser);
})
.For<Order>("Order")
.Where(o => o.Total > 100)
.OnInsert(async (_, order) => await ProcessOrder(order))
.OnUnknownType(async unknownRecord =>
{
_logger.LogWarning("Unknown entity type in stream");
})
.ProcessAsync();
Pattern-Based Discriminators
Use wildcards for sort key-based discrimination:
await record.Process()
.WithDiscriminator("SK")
.For<User>("USER#*") // Prefix match
.OnInsert(async (_, user) => await IndexUser(user))
.For<Order>("ORDER#*") // Prefix match
.OnInsert(async (_, order) => await ProcessOrder(order))
.For<Invoice>("*#INVOICE") // Suffix match
.OnInsert(async (_, invoice) => await ProcessInvoice(invoice))
.ProcessAsync();
Pattern matching rules:
"USER"— Exact match"USER#*"— Prefix match (starts with "USER#")"*#USER"— Suffix match (ends with "#USER")"*#USER#*"— Contains match
Table-Integrated Processing
When using the generated table class, discriminator values are resolved automatically:
// Discriminator looked up from entity configuration
await _myTable.OnStream(record)
.For<User>() // No discriminator value needed
.OnInsert(async (_, user) => await ProcessUser(user))
.For<Order>() // Resolved from [DynamoDbTable] attribute
.OnInsert(async (_, order) => await ProcessOrder(order))
.ProcessAsync();
Entity Requirements
Entities must have the [GenerateStreamConversion] attribute for stream deserialization:
[DynamoDbTable("orders")]
[GenerateStreamConversion]
public partial class Order
{
[PartitionKey]
[DynamoDbAttribute("pk")]
public string Pk { get; set; } = string.Empty;
[SortKey]
[DynamoDbAttribute("sk")]
public string Sk { get; set; } = string.Empty;
[DynamoDbAttribute("status")]
public string Status { get; set; } = string.Empty;
}
This generates a FromDynamoDbStream method that maps Lambda AttributeValue types to your entity.
Complete Lambda Function Example
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Oproto.FluentDynamoDb.Streams;
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
public class Function
{
private readonly IEmailService _emailService;
private readonly ISearchIndex _searchIndex;
public Function()
{
_emailService = new EmailService();
_searchIndex = new SearchIndex();
}
public async Task FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
foreach (var record in dynamoEvent.Records)
{
try
{
await record.Process<Order>()
.WhereKey(keys => keys["pk"].S.StartsWith("ORDER#"))
.OnInsert(async (_, order) =>
{
await _emailService.SendOrderConfirmation(order);
await _searchIndex.IndexOrder(order);
})
.OnUpdate(async (oldOrder, newOrder) =>
{
if (oldOrder.Status != newOrder.Status)
{
await _emailService.SendStatusUpdate(newOrder);
}
await _searchIndex.UpdateOrder(newOrder);
})
.OnDelete(async (order, _) =>
{
await _searchIndex.RemoveOrder(order.OrderId);
})
.ProcessAsync();
}
catch (Exception ex)
{
context.Logger.LogError($"Error processing record: {ex.Message}");
throw; // Re-throw to trigger Lambda retry
}
}
}
}
See Also
- CRUD Operations — Basic DynamoDB operations
- TTL Support — Automatic item expiration with TTL delete detection
- Transactions — Transactional write operations