Implement a Distributed State Machine with Redis to Migrate Billions of Records
As a start-up company, we started with a simple architecture: All data were stored in a single centralized Postgres instance, shared by a few services. It has been working well and allows us to move fast, deliver features, enter the market, acquire clients, and grow exponentially.
Fast forward a couple of years, today we have thousands of clients and billions of records, it’s time to migrate the top tables from Postgres to a better solution. We choose DynamoDB, a key-value datastore by AWS, with high availability, low-latency performance, and see the queries reduce to single-digit milliseconds.
As with any decent migration plan, we carefully analyze usages, define schema and indexes, start with double-write, switch read queries to DynamoDB with fallback to Postgres, import all records to DynamoDB, and finally stop querying from Postgres.
This article focuses on the step of importing records for the room_events
table: Scan all the records of this table and write them to DynamoDB. The goal is to ensure a complete migration, with no events missed. The migration script must be fast, capable of running in parallel, and able to stop and resume from its last state. It should also be resilient, handling network errors or interruptions and resuming seamlessly.
Letβs see how we can do this! π₯Έ
Schema
Here is the simplified schema of the room_events
table in Postgres:
room_events | ||
---|---|---|
id | ULID | primary key |
room_id | UUID | references rooms.id |
data | JSON |
- The
id
column is ULID, which has a time component. - It has billions of records.
A Naive Approach
Let’s write a simple version of the migration script: Run in a single loop, scan all records, and write to DynamoDB. Also, include a few trivial things to not have to worry about them later:
- Scan with cursor pagination based on the
id
column. - For each write, use a batch of max 25 records (the maximum number of items allowed by BatchWriteItem).
- Scan backward from the double-write date to the earliest date. So we have the most recent records available first.
import "connectly.ai/go/pkgs/ulid"
var startDate = parseTime("2020-01-01T00:00:00Z")
var endDate = parseTime("2024-10-20T00:00:00Z")
var pgBatchSize = 1000
var dynBatchSize = 25
type Migrator struct {
*logger
pg PostgresClient
dyn DynamoClient
}
type QueryRoomEventsRequest struct {
BeforeID ulid.ULID
Limit int
}
type QueryRoomEventsResponse struct {
Items []RoomEvents
lastID ulid.ULID
}
func (m *Migrator) Migrate(ctx context.Context) error {
lastID, count := ulid.FromTime(endDate), 0
for {
req := QueryRoomEventsRequest{
BeforeID: lastID,
Limit: pgBatchSize
}
res, err := m.queryRoomEvents(ctx, req)
m.Logger(ctx).Must(err, "failed to query room events")
count += res.Items
if len(res.Items) == 0 {
m.Logger(ctx).Infof("MIGRATION DONE: count=%v", count)
return nil
}
for i := 0; i < len(res.Items); i += dynBatchSize {
j := min(i + dynBatchSize, len(res.Items))
batch := res.Items[i, j]
_, err = m.batchWriteDynamo(ctx, batch)
m.Logger(ctx).Must(err, "failed to write to dynamodb")
}
}
}
func (m *Migrator) queryRoomEvents( /* ... */ ) { /* ... */ }
func (m *Migrator) batchWriteDynamo(/* ... */ ) { /* ... */ }
Real-World Approach
This naive script will operate under some assumptions:
- Calls to
queryRoomEvents()
andbatchWriteDynamo()
always success. - The number of records is small enough, to complete in a single run on a single machine.
However, in the real world, we can not rely on those assumptions:
- Requests may timeout or fail randomly.
- Databases may reach its throughput or connection limits.
- Pods may get restarted due to failures or new deployments.
- Huge amount of records may make the single-thread approach impractical, taking ages to complete.
To handle these challenges, we need to improve our script and implement a more robust, distributed solution:
Data Splitting and Partitioning:
- Split the data by grouping room events into 1-hour
Timeslot
. Each slot should represent a manageable unit of work that is processed independently. This allows for better load balancing and improved parallelism.
Parallel Processing with Multiple Workers:
- Run multiple workers across multiple pods, with each pod managing multiple goroutines.
- Each worker will pick a
Timeslot
, migrate it, and then move on to the next available slot, ensuring that all workers operate concurrently.
States Management in Redis:
- Workers will acquire a lock for each
Timeslot
in Redis before processing it, to ensure that only a single worker can handle a slot at a time. - After processing a
Timeslot
, the worker will update the slot’s status toFINISHED
. - If a worker fail, the lock will be released or expired, allowing another worker to take over and continue processing.
Keeper for Centralized Progress Tracking:
- Introduce a
Keeper
to monitor the progress of all workers, maintain a global overview of the migration, and log the global progress. - It will keep track of the progress and status of each
Timeslot
, ensure that no slot is missed.
Timeout Handling and Retry Mechanism:
- Implement a retry mechanism with exponential backoff for both PostgreSQL reads and DynamoDB writes, making the script resilient to transient network failures or timeouts.
- And another layer of retry and error handling to prevent any panic or unexpected failure, ensuring that the script can gracefully recover, with no data loss or corrupted states.
Data Verification:
- After the migration, run a verification script to ensure that all room events have been successfully migrated, with no missing records.
- The verification process should compare the record count and sample data between PostgreSQL and DynamoDB.
Leveraging Idempotency in Writes:
Another important point is that the write step for each record is idempotent. This means that writing the same record to DynamoDB multiple times will simply overwrite the previous version, ensuring no duplicates.
We can use this to our advantage:
- If bugs are found in the script and a new version needs to be run, we can reset all states and start from scratch without deleting existing data in DynamoDB.
- If the script stops for any reason and resumes later, it might overwrite a few records in DynamoDB. This is acceptable, as occasional re-writes are harmless and prevent the need to redo large portions of already completed work.
Architecture
With those concepts in mind, let’s start thinking about the architecture.
Store States in Redis
Given that we already split the records into Timeslot
s, each consists of all room events for one hour. We need to keep track of the progress of these slots in Redis:
- Prefix all keys with
mgre:
β to be able to scan and delete all of them. mgre:status
β the global migration status:NOT_STARTED
,TO_START
,IN_PROGRESS
,TO_STOP
,STOPPED
,FINISHED
.mgre:{TIMESLOT}:worker
β the current worker working on the slot, an exclusive lock with short TTL (for example 15-30 sec). When a worker fails to release the lock, it will automatically expire so another worker can take over later.mgre:{TIMESLOT}:states
β a JSON storing states of the slot, with long TTL (for example, 1 week) and properties:.status
β empty,IN_PROGRESS
orFINISHED
. Note that there is noERROR
status. All room events and all slots must be migrated successfully! π.last_id
β the last migrated room events id, to be able to resume the progress.
So we need 2 keys for each slot. Multiply with 3 years of data, there will be 52.560 keys (3 * 365 * 24 * 2). That’s a lot!
We can optimize the Redis keys by cleaning the group of consecutive FINISHED
slots and replacing them with a single mgre:last_slot
key. This way, we only need to keep track of a small amount of keys. Of course, this is based on the assumption that the slots have a similar number of room events, so each worker can take a similar amount of time to finish.
Manager, Keepers, and Workers
The script runs in multiple pods. Each pod consists of a Manager
, a Keeper
, and multiple Worker
s. Each run in its own goroutine.
A Manager
manages all workers in a pod:
- Each pod has a single
Manager
. - It loops through all the
Timeslot
s. - For each slot, it verifies the corresponding status and lock in Redis by checking
mgre:{TIMESLOT}:states
andmgre:{TIMESLOT}:worker
. - Then dispatch the slot to workers by sending it to a Go channel
slotCh
.
A Worker
migrate room events by each Timeslot
:
- There are multiple workers, each running in a separate goroutine.
- A worker receives
Timeslot
s from the channelslotCh
, one at a time. - It verifies the status in
mgre:{TIMESLOT}:states
again. - And acquire the lock by a
SET NX
command onmgre:{TIMESLOT}:worker
. - If all success, it will start working on the slot.
- It will occasionally save the progress by updating
mgre:{TIMESLOT}:states
. - And should be able to resume from the saved progress.
- If it fails, the lock will eventually expire and another worker can take over.
A Keeper
overview the whole migration progress:
- There should be only one active
Keeper
for all pods. - We should have a lock
mgre:keeper
with short TTL, so if aKeeper
from a pod fails, anotherKeeper
can take over. - A failed
Keeper
will return to stand-by state, and continue monitoring the lock to see if it can become active again. - Every few sec, the active Keeper scan from
mgre:last_slot
to find consequent FINISHEDTimelot
s. It will then markmgre:last_slot
at the latest consequent FINISHED slot, and clean them. - When all remaining slots are FINISHED, it marks the global
mgre:status
as FINISHED. All the managers look at that status and will then stop.
func (m *Migrator) runManager(ctx context.Context) {
// ... recover, logs, metrics ...
// ... loop and call runManagerStep (more on this later) ...
}
func (m *Migrator) runManagerStep(ctx context.Context) Status {
// ... recover, logs, metrics ...
for {
mustRefresh(ctx, m, m.globalStatus, m.lastSlot)
if m.globalStatus.Load().Is(SKIPPED, FINISHED) { return /*...*/ }
for slot := m.lastSlot.Load(); slot.Before(endSlot); slot.Next() {
slotStates := m.getSlotStates(slot)
mustRefresh(ctx, m, slotStates)
if slotStates.Status.Load().Is(SKIPPED, FINISHED) { continue }
slotWorkerID := mustAcquireLock(ctx, m, slot.Worker)
if !slotWorkerID.Load().IsPod(m.PodID) { continue }
select {
case <- ctx.Done():
return IN_PROGRESS // π stop when the pod stops
case m.slotCh <- slot:
continue // π send to slot channel
}
}
}
// π still IN_PROGRESS, only Keeper can verify all slots are FINISHED
return IN_PROGRESS
}
func (m *Migrator) runWorker(ctx context.Context) {
// ... recover, logs, metrics ...
for {
select {
case <- ctx.Done():
return
case slot := <- m.slotCh
s.runTimeslot(ctx, slot)
}
}
}
func (m *Migrator) runTimeslot(ctx context.Context, slot Timeslot) {
// ... recover, logs, metrics ...
t := time.NewTimer(0)
for {
select {
case <- ctx.Done():
return
case <- t.C
status := s.runTimeslotStep(ctx, slot)
if status.Is(SKIPPED, FINISHED) { return }
t.Reset(3 * time.Second)
}
}
}
func (m *Migrator) runTimeslotStep(ctx context.Context, slot Timeslot) Status {
// ... recover, logs, metrics ...
st := m.getSlotStates(slot)
for {
// π refresh the states
mustRefresh(ctx, m, st.States)
// π verify the status, return if already FINISHED
if st.States.Status.Load().Is(FINISHED) { return FINISHED }
// π another worker is working on the slot, SKIPPED
if !mustAcquireLock(ctx, m, st.Worker, workerID) { return SKIPPED }
lastID := st.States.LastID
for lastID.Before(startID) {
req := m.QueryRoomEventsRequest{ BeforeID: lastID; Limit: ... }
res := mustRetry(ctx, m.strategy, msgf("query room events"),
func() (QueryRoomEventsResponse, error) {
return m.queryRoomEvents(ctx, req)
}
// π ... save to DynamoDB
// π ... save progress to Redis
lastID = res.LastID
}
}
}
func (m *Migrator) runKeeper(ctx context.Context) {
// ... recover, logs, metrics ...
// ... loop and call runKeeperStep (more on this later) ...
}
func (m *Migrator) runKeeperStep(ctx context.Context) Status {
// ... recover, logs, metrics ...
for {
// π acquire lock and refresh states
// only a single active Keeper across all pods
if !mustAcquireLock(ctx, m, m.keeper) { return SKIPPED }
mustRefresh(ctx, m, m.globalStatus, m.lastSlot)
if m.globalStatus.Load().Is(FINISHED) { return FINISHED }
// π find the last consecutive FINISHED slot
newLastSlot := states.LastSlot
for slot := states.LastSlot; slot.Before(endSlot); slot.Next() {
slotStates := m.getSlotStates(slot)
err := tryRefresh(ctx, m, slotStates)
if err != nil { break }
if !slotStates.Status.Load().Is(FINISHED) { break }
newLastSlot = slot
tryClean(ctx, m, slotStates) // π clean FINISHED slot
}
// π save the mgre:last_slot state
if newLastSlot != states.LastSlot {
mustUpdate(ctx, m, m.lastSlot, newLastSlot)
}
}
}
Retry Mechanism
The plan looks good, right? No, not yet. What happens if any of the above steps fail?
Each Manager
, Worker
, Keeper
run in a goroutine and always retry itself:
- Always have
recover()
, because any non-recover panic in a goroutine can stop the whole process. - There are 2 main loops to able to restart whenever there is problem:
- The outer loop is responsible for restarting the inner loop. It only contains simple statements to ensure that it will never panic.
- The inner loop is responsible for handling the logic: load states, query data, etc.
- For
Worker
, there is one extra outermost loop to receive slots and pass them to outer loop then inner loop to get executed. - And a
retry
function to execute and retry every query a few more times.
func (m *Migrator) initAndRun(ctx context.Context) {
go m.runManager(ctx)
go m.runKeeper(ctx)
for i := 0; i < numWorkers; i++ {
go m.runWorker(ctx)
}
}
func (m *Migrator) runWorker(ctx context.Context) {
defer func() {
r := recover()
if r != nil { log(ctx).Errorf("panic in the outermost layer, will stop") }
}
// π the outermost loop to receive the next slot
// it only contains simple statements to ensure that it never panics
for {
select {
case <- ctx.Done():
return
case slot := <- m.slotCh // π receive slots from channel
s.runTimeslot(ctx, slot) // and execute them one by one
}
}
}
func (m *Migrator) runTimeslot(ctx context.Context) {
defer func() {
r := recover()
if r != nil { log(ctx).Errorf("panic in the outer layer, will stop") }
}
// π the outer loop to retry the migration logic
// it only contains simple statements to ensure that it never panics
t := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return // π stop when the pod stops
case <-t.C:
status := m.runTimeslotStep()
if status.Is(FINISHED, SKIPPED) {
return // π stop when FINISHED or SKIPPED
}
t.Reset(3 * time.Second) // π retry after a few sec
}
}
}
func (m *Migrator) runTimeslotStep(ctx context.Context) (Status) {
defer func() {
r := recover()
if r != nil { logger(ctx, "panic in the inner layer, will retry") }
}
// π the inner loop to execute the migration logic
for {
// ... load states, progress, acquire lock...
// π query database
res, err := retry(ctx, m.strategy, msgf("query room events"),
func() (QueryRoomEventsResponse, error) {
return m.queryRoomEvents(/* ... */)
})
// π even if there is panic, the runManagerStep will recover, stop
// and the outer loop (runManager) will continue retry after few sec
must(err)
// ... save states, progress, refresh lock
}
// ... save status as FINISHED
return FINISHED // π tell the outer loop to stop
}
When any error happens, for example, network timeout:
- The
retry
function will retry a few times. - If it fails, the inner loop will stop and give control back to the outer loop.
- The outer loop now waits for a few secs and start inner loop again.
- The inner loop then loads the previous states, progress, acquires lock, and continues.
This ensure that the code always run until all records are migrated, or the pod restarts. In the later case, it will resume the migration progress next time.
Other Notes
API to control the migration:
- We should expose an API to control the migration: start and stop all managers, keepers, workers from all pods.
- When receive START command, the API set
mgre:status
toTO_START
orIN_PROGRESS
. - When managers from all pods see the status, they will start or resume the migration.
Stop all using context.WithCancel()
:
- Use a single
context.Context
and pass it to all manager, keeper, and workers in the same pod. - When managers see the status change to
TO_STOP
,STOPPED
, they will cancel the context, make keeper and all workers stop gracefully.
Save progress as states and refresh the lock periodically:
- While each worker or keeper is working on its stuff, it should save the progress into Redis, and refresh the lock, to be able to resume later and prevent other workers from taking the slot.
Manager
periodically check the mgre:last_slot
in Redis:
- Even if the slot is already taken by a worker, there is chance that the worker stops and the slot is not FINISHED.
- While each
Manager
loops through all the slots, after a while, it needs to restart the loop and check themgre:last_slot
status in Redis, then resume from there. It’s to ensure that we won’t leave any slots unfinished.
Log and report the progress:
- The
Keeper
is responsible to overview the progress, clean up finished records, and updatemgre:last_slot
. - During the job, the
Keeper
should periodically log and report progress to provide visibility into the migration status and allow real-time tracking of overall progress.
Monitor the resources and rate limit:
- The migration may cost all resources on the database. We may want to monitor and add rate limit, fine tune parameters when necessary, to ensure everything go as smooth as possible.
Implementation
Timeslot
Each Timeslot
represents all room events in one hour. We can implement it as a time.Time
and store it in Redis as a string with the format 20241020.02
.
type Timeslot struct { time.Time }
const slotDuration = time.Hour
func newTimeSlot(t time.Time) Timeslot {
if t.IsZero() { return Timeslot{t} }
// π each slot is an hour
t = t.In(time.UTC).Truncate(kSlotDuration)
return Timeslot{t}
}
func (t Timeslot) String() string {
if t.Time.IsZero() { return "" }
return t.Time.Format("20060102.15")
}
func (t Timeslot) Range() (start, end ulid.ULID) {
return ulid.FromTime(t), ulid.FromTime(t.Add(1))
}
func (t Timeslot) Add(i int) Timeslot {
return Timeslot{t.Time.Add(slotDuration)}
}
func (t Timeslot) Next() Timeslot {
return t.Add(-1) // π we are going from latest to earliest
}
func (t Timeslot) Sub(x Timeslot) int {
return Timeslot{t.Time.Sub(kSlotDuration)}
}
Retry() Queries
As discussed before, we have 2 loops to handle panic, retry, and resume states. So for this retry()
function, we only need to retry the query a couple of times, to be able to tolerate some network failures:
// π this will retry 3 times
strategy := NewSimpleStrategy(
100*time.Millisecond, 200*time.Millisecond, 500*time.Millisecond)
// π call the QueryRoomEvents with retry-ability
func retry(ctx, strategy, msgf("query database"),
func() (QueryRoomEventsRequest, error) {
return m.QueryRoomEvents(ctx, req)
})
// π if all retries failed, stop, and give control back to outer loop
// to try again after a few sec
func mustRetry( /* ... */ ) { /* ... */ }
We can implement a simple retry logic:
func retry[T any](
ctx context.Context, strategy RetryStrategy,
msg fmt.Stringer, fn func() (T, error),
) (T, error) {
for count := 0; ; count++ {
x, err := fn()
if err == nil { return x, err }
if next := strategy.Next(); next > 0 {
logger(ctx).Warnf("failed to %v (attempt %v)", msg, count)
time.Sleep(next)
} else {
logger(ctx).Errorf("failed to %v (attempt %v)", msg, count)
return x, err
}
}
}
An example implementation of retry strategy, which each retry happens after a pre-defined duration:
type RetryStrategy func() time.Duration
func (f RetryStrategy) Next() time.Duration { return f() }
func NewSimpleStrategy(at []time.Duration) RetryStrategy {
i := -1
return func() time.Duration {
i++
if i < len(at) { return i }
return -1; // no more retry
}
}
And implementation of msgf()
func which can be used to quickly return a fmt.Stringer
:
type StringFunc func() string
func (f StringFunc) String() { return f() }
func msgf(msg string, args ...any) StringFunc {
return func() string {
return fmt.Sprintf(msg, args...)
}
}
States
There are many states: global status, global progress, slot status, slot progress, locks, etc. For each state, we need to refresh, update, or delete. Each action also needs to be able to retry:
globalStatus := mustRetry(ctx, m.strategy, msgf("load status")
func() (Status, error) {
str, err := m.redisClient.GetString(ctx, "mgre:status")
if err != nil { return 0, err }
if str == "" { return NOT_STARTED }
return parseStatus(str)
})
slotWorker := mustRetry(ctx, m.strategy, msgf("load last slot")
func() (string, error) {
return m.redisClient.GetString(ctx, "mgre:", kSlotWorker(slot))
})
mustRetry(ctx, m.strategy, msg("save status")
func() (int, error) {
str := encodeStatus(newStatus)
err := m.redisClient.SetStringTTL(ctx, "mgre:status", str)
return 0, err
})
func kSlotWorker(slot fmt.Stringer) string {
return fmt.Sprintf("mgre:%v:worker", slot)
}
The code will quickly become too verbose. We can encapsulate the key, including encoding logic, and other configs, in a State
struct:
type State[T any] struct {
v atomic.Value
key string
ttl time.Duration
retry RetryStrategy
parse func(string) (T, error)
encode func(T) (string, error)
}
func NewState[T any](
key string, ttl time.Duration, strategy RetryStrategy,
parse func(string) (T, error),
encode func(T) (string, error)
) *State[T] {
var zero T
st := &State[T]{ /* ... */ }
st.v.Store(zero)
return st
}
func (s *State[T]) Load() T {
return s.v.Load().(T)
}
func (s *State[T]) Refresh(ctx context.Context, redis RedisClient) error {
str, err:= retry(ctx, msgf("get key %q", s.key),
func() (string, error) {
return redisClient.Get(ctx, key)
})
if err != nil { return err }
v, err := encode(str)
if err != nil { return err }
s.v.Store(v)
return nil
}
func (s *State[T]) Save(ctx context.Context, redis RedisClient, v T) error {
str, err := s.encode(v)
if err != nil { return err }
return retry(ctx, msgf("set key %q", s.key),
func() (int, error) {
_, err := redisClient.Set(ctx, key, str, s.ttl)
return 0, err
})
}
func (s *State[T]) AcquireLock(ctx context.Context, redis RedisClient, v T) error {
// π similar to Save(), use SetNX instead ...
}
Then implement a few helpers to quickly access them:
type DepsI interface { _redis() RedisClient }
type StateI interface { _key() string; _ttl() time.Duration; /* ... */ }
func mustRefresh(ctx context.Context, deps DepsI, states StateI, msg fmt.Stringer) {
/* ... */
}
func mustSet[T any](ctx context.Context, deps DepsI, state State[T], v T) {
/* ... */
}
Finally, we can simplify the usage:
func (m *Migrator) exampleInit(slot Slot) {
m.globalStatus = NewState(kStatus, longTTL, m.strategy, parseStatus, encodeStatus)
m.slotWorker = NewState(kSlotWorker(slot), shortTTL, m.strategy, parseStr, encodeStr)
// ...
}
func (m *Migrator) exampleRefreshStates() {
mustRefresh(ctx, m, []StateI{m.globalStatus, m.lastSlot}, msgf("load states"))
mustSet(ctx, m, m.globalStatus, FINISHED, msgf("save status"))
// ...
}
That’s much better!
SlotStates
The Timeslot
struct only contains definition for time slot. We need another struct to encapsulate its states:
type SlotStates struct {
Timeslot // π embedded Timeslot to quickly access methods
status State[Status] // π mgre:TIMESLOT:status
worker State[string] // π mgre:TIMESLOT:worker
}
func newSlotStates(slot Timeslot) *SlotStates {
return &SlotStates{
Timeslot: slot,
status: NewState(kStatus, longTTL, /* ... */),
worker: NewState(kWorker, shortTTL, /* ... */),
}
}
There should be only a single SlotStates
for each Timeslot
in a pod, shared among manager, keeper, and workers. So it’s better to have a centralized place to init and store them:
type Migrator {
// ...
slots map[string]*SlotStates
mu sync.RWMutex
}
func (m *Migrator) getSlotStates(slot Timeslot) *SlotStates {
if st := m._getSlotStates(); st != nil { return st }
m.mu.Lock()
defer m.mu.Unlock()
if st := m.slots[slot.String()]; st != nil { return st }
st := newSlotStates(slot)
s.slots[slot.String()] = st
return st
}
func (m *Migrator) _getSlotStates() *SlotStates {
m.mu.RLock()
defer m.mu.RUnlock()
return m.slots[slot.String()]
}
Conclusion
Phew! That’s a lot!! I’m happy that you are still here!
Migrating large volumes of data in a real-world environment is far more complex than a simple script can handle. By carefully designing and implementing the migration code with partitioning, parallel processing, state management, fault tolerance, idempotency, and centralized progress tracking, we can achieve a reliable migration process, maintain data integrity, and minimize downtime.
And sleep well at night too! π
Let's stay connected!
Author
I'm Oliver Nguyen. A software maker working mostly in Go and JavaScript. I enjoy learning and seeing a better version of myself each day. Occasionally spin off new open source projects. Share knowledge and thoughts during my journey. Connect with me on , , , , or subscribe to my posts.