Skip to main content

DynamoDB Streams

Process DynamoDB stream events in AWS Lambda functions with type-safe entity deserialization and fluent event handling.

Installation

dotnet add package Oproto.FluentDynamoDb.Streams

Overview

The Streams package provides a fluent API for processing DynamoDB stream records. It integrates with the source generator for type-safe entity deserialization and supports both single-entity and multi-entity (single-table design) processing.

Single-Entity Processing

Use Process<TEntity>() to handle stream records for a single entity type:

using Oproto.FluentDynamoDb.Streams;
using Amazon.Lambda.DynamoDBEvents;

public class OrderStreamHandler
{
public async Task HandleAsync(DynamoDBEvent dynamoEvent)
{
foreach (var record in dynamoEvent.Records)
{
await record.Process<Order>()
.OnInsert(async (_, newOrder) =>
{
await _emailService.SendOrderConfirmation(newOrder);
})
.OnUpdate(async (oldOrder, newOrder) =>
{
if (oldOrder.Status != newOrder.Status)
{
await _notificationService.NotifyStatusChange(newOrder);
}
})
.OnDelete(async (deletedOrder, _) =>
{
await _searchIndex.RemoveOrder(deletedOrder.OrderId);
})
.ProcessAsync();
}
}
}

Event Handlers

HandlerEventParameters
OnInsertNew item added(TEntity? old, TEntity new) — old is always null
OnUpdateItem modified(TEntity old, TEntity new) — both populated
OnDeleteItem removed(TEntity old, TEntity? new) — new is always null
OnTtlDeleteTTL expiration(TEntity old, TEntity? new) — TTL-triggered only
OnNonTtlDeleteManual deletion(TEntity old, TEntity? new) — non-TTL only

Filtering

Entity-Level Filtering (After Deserialization)

Use Where() to filter based on entity properties:

await record.Process<Order>()
.Where(o => o.Status == "active")
.Where(o => o.Total > 100) // Multiple filters use AND logic
.OnInsert(async (_, order) => await ProcessHighValueOrder(order))
.ProcessAsync();

Key-Level Filtering (Before Deserialization)

Use WhereKey() for performance — filters are evaluated before deserialization:

await record.Process<Order>()
.WhereKey(keys => keys["pk"].S.StartsWith("ORDER#"))
.WhereKey(keys => keys["sk"].S.StartsWith("2024"))
.Where(o => o.Total > 100) // Entity filter applied after deserialization
.OnInsert(async (_, order) => await ProcessOrder(order))
.ProcessAsync();

TTL vs Manual Delete Detection

Distinguish between items deleted by TTL expiration and manual deletions:

await record.Process<Session>()
.OnTtlDelete(async (session, _) =>
{
// Session expired automatically (DynamoDB TTL)
await _auditLog.LogSessionExpiry(session.UserId);
})
.OnNonTtlDelete(async (session, _) =>
{
// User explicitly logged out
await _auditLog.LogLogout(session.UserId);
await _notificationService.SendLogoutConfirmation(session.UserId);
})
.ProcessAsync();

TTL deletions are identified by UserIdentity.Type == "Service" and UserIdentity.PrincipalId == "dynamodb.amazonaws.com".

Multi-Entity Processing (Single-Table Design)

For tables with multiple entity types, use discriminator-based routing:

await record.Process()
.WithDiscriminator("EntityType")
.For<User>("User")
.Where(u => u.Status == "active")
.OnInsert(async (_, user) => await IndexUser(user))
.OnUpdate(async (oldUser, newUser) =>
{
if (oldUser.Email != newUser.Email)
await _emailService.SendEmailChangeNotification(newUser);
})
.For<Order>("Order")
.Where(o => o.Total > 100)
.OnInsert(async (_, order) => await ProcessOrder(order))
.OnUnknownType(async unknownRecord =>
{
_logger.LogWarning("Unknown entity type in stream");
})
.ProcessAsync();

Pattern-Based Discriminators

Use wildcards for sort key-based discrimination:

await record.Process()
.WithDiscriminator("SK")
.For<User>("USER#*") // Prefix match
.OnInsert(async (_, user) => await IndexUser(user))
.For<Order>("ORDER#*") // Prefix match
.OnInsert(async (_, order) => await ProcessOrder(order))
.For<Invoice>("*#INVOICE") // Suffix match
.OnInsert(async (_, invoice) => await ProcessInvoice(invoice))
.ProcessAsync();

Pattern matching rules:

  • "USER" — Exact match
  • "USER#*" — Prefix match (starts with "USER#")
  • "*#USER" — Suffix match (ends with "#USER")
  • "*#USER#*" — Contains match

Table-Integrated Processing

When using the generated table class, discriminator values are resolved automatically:

// Discriminator looked up from entity configuration
await _myTable.OnStream(record)
.For<User>() // No discriminator value needed
.OnInsert(async (_, user) => await ProcessUser(user))
.For<Order>() // Resolved from [DynamoDbTable] attribute
.OnInsert(async (_, order) => await ProcessOrder(order))
.ProcessAsync();

Entity Requirements

Entities must have the [GenerateStreamConversion] attribute for stream deserialization:

[DynamoDbTable("orders")]
[GenerateStreamConversion]
public partial class Order
{
[PartitionKey]
[DynamoDbAttribute("pk")]
public string Pk { get; set; } = string.Empty;

[SortKey]
[DynamoDbAttribute("sk")]
public string Sk { get; set; } = string.Empty;

[DynamoDbAttribute("status")]
public string Status { get; set; } = string.Empty;
}

This generates a FromDynamoDbStream method that maps Lambda AttributeValue types to your entity.

Complete Lambda Function Example

using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Oproto.FluentDynamoDb.Streams;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

public class Function
{
private readonly IEmailService _emailService;
private readonly ISearchIndex _searchIndex;

public Function()
{
_emailService = new EmailService();
_searchIndex = new SearchIndex();
}

public async Task FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
foreach (var record in dynamoEvent.Records)
{
try
{
await record.Process<Order>()
.WhereKey(keys => keys["pk"].S.StartsWith("ORDER#"))
.OnInsert(async (_, order) =>
{
await _emailService.SendOrderConfirmation(order);
await _searchIndex.IndexOrder(order);
})
.OnUpdate(async (oldOrder, newOrder) =>
{
if (oldOrder.Status != newOrder.Status)
{
await _emailService.SendStatusUpdate(newOrder);
}
await _searchIndex.UpdateOrder(newOrder);
})
.OnDelete(async (order, _) =>
{
await _searchIndex.RemoveOrder(order.OrderId);
})
.ProcessAsync();
}
catch (Exception ex)
{
context.Logger.LogError($"Error processing record: {ex.Message}");
throw; // Re-throw to trigger Lambda retry
}
}
}
}

See Also