Implement a Distributed State Machine with Redis to Migrate Billions of Records

 Β·  subscribe to my posts
published at Connectly.ai

As a start-up company, we started with a simple architecture: All data were stored in a single centralized Postgres instance, shared by a few services. It has been working well and allows us to move fast, deliver features, enter the market, acquire clients, and grow exponentially.

Fast forward a couple of years, today we have thousands of clients and billions of records, it’s time to migrate the top tables from Postgres to a better solution. We choose DynamoDB, a key-value datastore by AWS, with high availability, low-latency performance, and see the queries reduce to single-digit milliseconds.

As with any decent migration plan, we carefully analyze usages, define schema and indexes, start with double-write, switch read queries to DynamoDB with fallback to Postgres, import all records to DynamoDB, and finally stop querying from Postgres.

This article focuses on the step of importing records for the room_events table: Scan all the records of this table and write them to DynamoDB. The goal is to ensure a complete migration, with no events missed. The migration script must be fast, capable of running in parallel, and able to stop and resume from its last state. It should also be resilient, handling network errors or interruptions and resuming seamlessly.

Let’s see how we can do this! πŸ₯Έ


Schema

Here is the simplified schema of the room_events table in Postgres:

room_events
idULIDprimary key
room_idUUIDreferences rooms.id
dataJSON
  • The id column is ULID, which has a time component.
  • It has billions of records.

A Naive Approach

Let’s write a simple version of the migration script: Run in a single loop, scan all records, and write to DynamoDB. Also, include a few trivial things to not have to worry about them later:

  • Scan with cursor pagination based on the id column.
  • For each write, use a batch of max 25 records (the maximum number of items allowed by BatchWriteItem).
  • Scan backward from the double-write date to the earliest date. So we have the most recent records available first.
import "connectly.ai/go/pkgs/ulid"

var startDate = parseTime("2020-01-01T00:00:00Z")
var endDate   = parseTime("2024-10-20T00:00:00Z")
var pgBatchSize  = 1000
var dynBatchSize = 25

type Migrator struct {
    *logger
    pg  PostgresClient
    dyn DynamoClient
}
type QueryRoomEventsRequest struct {
    BeforeID ulid.ULID
    Limit    int
}
type QueryRoomEventsResponse struct {
    Items  []RoomEvents
    lastID ulid.ULID
}

func (m *Migrator) Migrate(ctx context.Context) error {
    lastID, count := ulid.FromTime(endDate), 0
    for {
        req := QueryRoomEventsRequest{
            BeforeID: lastID,
            Limit:    pgBatchSize
        }
        res, err := m.queryRoomEvents(ctx, req)
        m.Logger(ctx).Must(err, "failed to query room events")

        count += res.Items
        if len(res.Items) == 0 {
            m.Logger(ctx).Infof("MIGRATION DONE: count=%v", count)
            return nil
        }
        for i := 0; i < len(res.Items); i += dynBatchSize {
            j := min(i + dynBatchSize, len(res.Items))
            batch := res.Items[i, j]
            _, err = m.batchWriteDynamo(ctx, batch)
            m.Logger(ctx).Must(err, "failed to write to dynamodb")
        }
    }
}
func (m *Migrator) queryRoomEvents( /* ... */ ) { /* ... */ }
func (m *Migrator) batchWriteDynamo(/* ... */ ) { /* ... */ }

Real-World Approach

This naive script will operate under some assumptions:

  • Calls to queryRoomEvents() and batchWriteDynamo() always success.
  • The number of records is small enough, to complete in a single run on a single machine.

However, in the real world, we can not rely on those assumptions:

  • Requests may timeout or fail randomly.
  • Databases may reach its throughput or connection limits.
  • Pods may get restarted due to failures or new deployments.
  • Huge amount of records may make the single-thread approach impractical, taking ages to complete.

To handle these challenges, we need to improve our script and implement a more robust, distributed solution:

Data Splitting and Partitioning:

  • Split the data by grouping room events into 1-hour Timeslot. Each slot should represent a manageable unit of work that is processed independently. This allows for better load balancing and improved parallelism.

Parallel Processing with Multiple Workers:

  • Run multiple workers across multiple pods, with each pod managing multiple goroutines.
  • Each worker will pick a Timeslot, migrate it, and then move on to the next available slot, ensuring that all workers operate concurrently.

States Management in Redis:

  • Workers will acquire a lock for each Timeslot in Redis before processing it, to ensure that only a single worker can handle a slot at a time.
  • After processing a Timeslot, the worker will update the slot’s status to FINISHED.
  • If a worker fail, the lock will be released or expired, allowing another worker to take over and continue processing.

Keeper for Centralized Progress Tracking:

  • Introduce a Keeper to monitor the progress of all workers, maintain a global overview of the migration, and log the global progress.
  • It will keep track of the progress and status of each Timeslot, ensure that no slot is missed.

Timeout Handling and Retry Mechanism:

  • Implement a retry mechanism with exponential backoff for both PostgreSQL reads and DynamoDB writes, making the script resilient to transient network failures or timeouts.
  • And another layer of retry and error handling to prevent any panic or unexpected failure, ensuring that the script can gracefully recover, with no data loss or corrupted states.

Data Verification:

  • After the migration, run a verification script to ensure that all room events have been successfully migrated, with no missing records.
  • The verification process should compare the record count and sample data between PostgreSQL and DynamoDB.

Leveraging Idempotency in Writes:

Another important point is that the write step for each record is idempotent. This means that writing the same record to DynamoDB multiple times will simply overwrite the previous version, ensuring no duplicates.

We can use this to our advantage:

  • If bugs are found in the script and a new version needs to be run, we can reset all states and start from scratch without deleting existing data in DynamoDB.
  • If the script stops for any reason and resumes later, it might overwrite a few records in DynamoDB. This is acceptable, as occasional re-writes are harmless and prevent the need to redo large portions of already completed work.

Architecture

With those concepts in mind, let’s start thinking about the architecture.

Store States in Redis

Given that we already split the records into Timeslots, each consists of all room events for one hour. We need to keep track of the progress of these slots in Redis:

  • Prefix all keys with mgre: β†’ to be able to scan and delete all of them.
  • mgre:status β†’ the global migration status: NOT_STARTED, TO_START, IN_PROGRESS, TO_STOP, STOPPED, FINISHED.
  • mgre:{TIMESLOT}:worker β†’ the current worker working on the slot, an exclusive lock with short TTL (for example 15-30 sec). When a worker fails to release the lock, it will automatically expire so another worker can take over later.
  • mgre:{TIMESLOT}:states β†’ a JSON storing states of the slot, with long TTL (for example, 1 week) and properties:
    • .status β†’ empty, IN_PROGRESS or FINISHED. Note that there is no ERROR status. All room events and all slots must be migrated successfully! 😎
    • .last_id β†’ the last migrated room events id, to be able to resume the progress.

So we need 2 keys for each slot. Multiply with 3 years of data, there will be 52.560 keys (3 * 365 * 24 * 2). That’s a lot!

We can optimize the Redis keys by cleaning the group of consecutive FINISHED slots and replacing them with a single mgre:last_slot key. This way, we only need to keep track of a small amount of keys. Of course, this is based on the assumption that the slots have a similar number of room events, so each worker can take a similar amount of time to finish.

Manager, Keepers, and Workers

The script runs in multiple pods. Each pod consists of a Manager, a Keeper, and multiple Workers. Each run in its own goroutine.

A Manager manages all workers in a pod:

  • Each pod has a single Manager.
  • It loops through all the Timeslots.
  • For each slot, it verifies the corresponding status and lock in Redis by checking mgre:{TIMESLOT}:states and mgre:{TIMESLOT}:worker.
  • Then dispatch the slot to workers by sending it to a Go channel slotCh.

A Worker migrate room events by each Timeslot:

  • There are multiple workers, each running in a separate goroutine.
  • A worker receives Timeslots from the channel slotCh, one at a time.
  • It verifies the status in mgre:{TIMESLOT}:states again.
  • And acquire the lock by a SET NX command on mgre:{TIMESLOT}:worker.
  • If all success, it will start working on the slot.
  • It will occasionally save the progress by updating mgre:{TIMESLOT}:states.
  • And should be able to resume from the saved progress.
  • If it fails, the lock will eventually expire and another worker can take over.

A Keeper overview the whole migration progress:

  • There should be only one active Keeper for all pods.
  • We should have a lock mgre:keeper with short TTL, so if a Keeper from a pod fails, another Keeper can take over.
  • A failed Keeper will return to stand-by state, and continue monitoring the lock to see if it can become active again.
  • Every few sec, the active Keeper scan from mgre:last_slot to find consequent FINISHED Timelots. It will then mark mgre:last_slot at the latest consequent FINISHED slot, and clean them.
  • When all remaining slots are FINISHED, it marks the global mgre:status as FINISHED. All the managers look at that status and will then stop.
func (m *Migrator) runManager(ctx context.Context) {
    // ... recover, logs, metrics ...
    // ... loop and call runManagerStep (more on this later) ...
}
func (m *Migrator) runManagerStep(ctx context.Context) Status {
    // ... recover, logs, metrics ...
    
    for {
        mustRefresh(ctx, m, m.globalStatus, m.lastSlot)
        if m.globalStatus.Load().Is(SKIPPED, FINISHED) { return /*...*/ }
        
        for slot := m.lastSlot.Load(); slot.Before(endSlot); slot.Next() {
            slotStates := m.getSlotStates(slot)
            mustRefresh(ctx, m, slotStates)
            if slotStates.Status.Load().Is(SKIPPED, FINISHED) { continue }
            
            slotWorkerID := mustAcquireLock(ctx, m, slot.Worker)
            if !slotWorkerID.Load().IsPod(m.PodID) { continue }

            select {
                case <- ctx.Done():
                    return IN_PROGRESS  // πŸ‘ˆ stop when the pod stops
                case m.slotCh <- slot: 
                    continue            // πŸ‘ˆ send to slot channel
            }
        }
    }
    // πŸ‘‡ still IN_PROGRESS, only Keeper can verify all slots are FINISHED
    return IN_PROGRESS
}

func (m *Migrator) runWorker(ctx context.Context) {
    // ... recover, logs, metrics ...

    for {
        select {
            case <- ctx.Done():
                return
            case slot := <- m.slotCh
                s.runTimeslot(ctx, slot)
        }
    }
}

func (m *Migrator) runTimeslot(ctx context.Context, slot Timeslot) {
    // ... recover, logs, metrics ...

    t := time.NewTimer(0)
    for {
        select {
            case <- ctx.Done():
                return
            case <- t.C
                status := s.runTimeslotStep(ctx, slot)
                if status.Is(SKIPPED, FINISHED) { return }
                t.Reset(3 * time.Second)
        }
    }
}

func (m *Migrator) runTimeslotStep(ctx context.Context, slot Timeslot) Status {
    // ... recover, logs, metrics ...

    st := m.getSlotStates(slot)
    for {
        // πŸ‘‡ refresh the states
        mustRefresh(ctx, m, st.States)
        // πŸ‘‡ verify the status, return if already FINISHED
        if st.States.Status.Load().Is(FINISHED) { return FINISHED }
        // πŸ‘‡ another worker is working on the slot, SKIPPED
        if !mustAcquireLock(ctx, m, st.Worker, workerID) { return SKIPPED }

        lastID := st.States.LastID
        for lastID.Before(startID) {
            req := m.QueryRoomEventsRequest{ BeforeID: lastID; Limit: ... }
            res := mustRetry(ctx, m.strategy, msgf("query room events"), 
                func() (QueryRoomEventsResponse, error) { 
                    return m.queryRoomEvents(ctx, req)
                }
            // πŸ‘‰ ... save to DynamoDB
            // πŸ‘‰ ... save progress to Redis
            lastID = res.LastID
        } 
    }
}

func (m *Migrator) runKeeper(ctx context.Context) {
    // ... recover, logs, metrics ...
    // ... loop and call runKeeperStep (more on this later) ...
}
							 
func (m *Migrator) runKeeperStep(ctx context.Context) Status {
    // ... recover, logs, metrics ...

    for {
        // πŸ‘‡ acquire lock and refresh states
        //    only a single active Keeper across all pods
        if !mustAcquireLock(ctx, m, m.keeper) { return SKIPPED }
        mustRefresh(ctx, m, m.globalStatus, m.lastSlot)
        if m.globalStatus.Load().Is(FINISHED) { return FINISHED }

        // πŸ‘‡ find the last consecutive FINISHED slot
        newLastSlot := states.LastSlot
        for slot := states.LastSlot; slot.Before(endSlot); slot.Next() {
            slotStates := m.getSlotStates(slot)
            err := tryRefresh(ctx, m, slotStates)
            if err != nil { break }
            if !slotStates.Status.Load().Is(FINISHED) { break }
            
            newLastSlot = slot
            tryClean(ctx, m, slotStates) // πŸ‘ˆ clean FINISHED slot
        }
        // πŸ‘‡ save the mgre:last_slot state
        if newLastSlot != states.LastSlot {
            mustUpdate(ctx, m, m.lastSlot, newLastSlot)
        }
    }
}

Retry Mechanism

The plan looks good, right? No, not yet. What happens if any of the above steps fail?

Each Manager, Worker, Keeper run in a goroutine and always retry itself:

  • Always have recover(), because any non-recover panic in a goroutine can stop the whole process.
  • There are 2 main loops to able to restart whenever there is problem:
    • The outer loop is responsible for restarting the inner loop. It only contains simple statements to ensure that it will never panic.
    • The inner loop is responsible for handling the logic: load states, query data, etc.
  • For Worker, there is one extra outermost loop to receive slots and pass them to outer loop then inner loop to get executed.
  • And a retry function to execute and retry every query a few more times.
func (m *Migrator) initAndRun(ctx context.Context) {
    go m.runManager(ctx)
    go m.runKeeper(ctx)
    for i := 0; i < numWorkers; i++ {
        go m.runWorker(ctx)
    }
}

func (m *Migrator) runWorker(ctx context.Context) {
    defer func() {
        r := recover()
        if r != nil { log(ctx).Errorf("panic in the outermost layer, will stop") }
    }
    // πŸ‘‡ the outermost loop to receive the next slot
    //    it only contains simple statements to ensure that it never panics
    for {
        select {
            case <- ctx.Done():
                return
            case slot := <- m.slotCh  // πŸ‘ˆ receive slots from channel
                s.runTimeslot(ctx, slot) // and execute them one by one
        }
    }
}

func (m *Migrator) runTimeslot(ctx context.Context) {
    defer func() {
        r := recover()
        if r != nil { log(ctx).Errorf("panic in the outer layer, will stop") }
    }
    // πŸ‘‡ the outer loop to retry the migration logic
    //    it only contains simple statements to ensure that it never panics
    t := time.NewTimer(0)
    for {
        select {
            case <-ctx.Done(): 
                return                // πŸ‘ˆ stop when the pod stops
            case <-t.C:
                status := m.runTimeslotStep()
                if status.Is(FINISHED, SKIPPED) { 
                    return            // πŸ‘ˆ stop when FINISHED or SKIPPED
                }
                t.Reset(3 * time.Second) // πŸ‘ˆ retry after a few sec
        }
    }
}

func (m *Migrator) runTimeslotStep(ctx context.Context) (Status) {
    defer func() {
        r := recover()
        if r != nil { logger(ctx, "panic in the inner layer, will retry") }
    }

    // πŸ‘‡ the inner loop to execute the migration logic
    for {
        // ... load states, progress, acquire lock...

        // πŸ‘‡ query database
        res, err := retry(ctx, m.strategy, msgf("query room events"), 
            func() (QueryRoomEventsResponse, error) {
                return m.queryRoomEvents(/* ... */)
            })
        // πŸ‘‡ even if there is panic, the runManagerStep will recover, stop
        //    and the outer loop (runManager) will continue retry after few sec
        must(err)
        
        // ... save states, progress, refresh lock
    }
    // ... save status as FINISHED
    return FINISHED // πŸ‘ˆ tell the outer loop to stop
}

When any error happens, for example, network timeout:

  • The retry function will retry a few times.
  • If it fails, the inner loop will stop and give control back to the outer loop.
  • The outer loop now waits for a few secs and start inner loop again.
  • The inner loop then loads the previous states, progress, acquires lock, and continues.

This ensure that the code always run until all records are migrated, or the pod restarts. In the later case, it will resume the migration progress next time.

Other Notes

API to control the migration:

  • We should expose an API to control the migration: start and stop all managers, keepers, workers from all pods.
  • When receive START command, the API set mgre:status to TO_START or IN_PROGRESS.
  • When managers from all pods see the status, they will start or resume the migration.

Stop all using context.WithCancel():

  • Use a single context.Context and pass it to all manager, keeper, and workers in the same pod.
  • When managers see the status change to TO_STOP, STOPPED, they will cancel the context, make keeper and all workers stop gracefully.

Save progress as states and refresh the lock periodically:

  • While each worker or keeper is working on its stuff, it should save the progress into Redis, and refresh the lock, to be able to resume later and prevent other workers from taking the slot.

Manager periodically check the mgre:last_slot in Redis:

  • Even if the slot is already taken by a worker, there is chance that the worker stops and the slot is not FINISHED.
  • While each Manager loops through all the slots, after a while, it needs to restart the loop and check the mgre:last_slot status in Redis, then resume from there. It’s to ensure that we won’t leave any slots unfinished.

Log and report the progress:

  • The Keeper is responsible to overview the progress, clean up finished records, and update mgre:last_slot.
  • During the job, the Keeper should periodically log and report progress to provide visibility into the migration status and allow real-time tracking of overall progress.

Monitor the resources and rate limit:

  • The migration may cost all resources on the database. We may want to monitor and add rate limit, fine tune parameters when necessary, to ensure everything go as smooth as possible.

Implementation

Timeslot

Each Timeslot represents all room events in one hour. We can implement it as a time.Time and store it in Redis as a string with the format 20241020.02.

type Timeslot struct { time.Time }

const slotDuration = time.Hour

func newTimeSlot(t time.Time) Timeslot {
    if t.IsZero() {	return Timeslot{t} }
    // πŸ‘‰ each slot is an hour
    t = t.In(time.UTC).Truncate(kSlotDuration)
    return Timeslot{t}
}
func (t Timeslot) String() string {  
    if t.Time.IsZero() { return "" }  
    return t.Time.Format("20060102.15")  
}
func (t Timeslot) Range() (start, end ulid.ULID) {
    return ulid.FromTime(t), ulid.FromTime(t.Add(1))
}
func (t Timeslot) Add(i int) Timeslot {
    return Timeslot{t.Time.Add(slotDuration)}
}
func (t Timeslot) Next() Timeslot {
    return t.Add(-1) // πŸ‘ˆ we are going from latest to earliest
}
func (t Timeslot) Sub(x Timeslot) int {
    return Timeslot{t.Time.Sub(kSlotDuration)}
}

Retry() Queries

As discussed before, we have 2 loops to handle panic, retry, and resume states. So for this retry() function, we only need to retry the query a couple of times, to be able to tolerate some network failures:

// πŸ‘‰ this will retry 3 times
strategy := NewSimpleStrategy(
    100*time.Millisecond, 200*time.Millisecond, 500*time.Millisecond)

// πŸ‘‰ call the QueryRoomEvents with retry-ability
func retry(ctx, strategy, msgf("query database"),
    func() (QueryRoomEventsRequest, error) {
        return m.QueryRoomEvents(ctx, req)
    })

// πŸ‘‰ if all retries failed, stop, and give control back to outer loop
//    to try again after a few sec
func mustRetry( /* ... */ ) { /* ... */ }

We can implement a simple retry logic:

func retry[T any](
    ctx context.Context, strategy RetryStrategy,
    msg fmt.Stringer, fn func() (T, error),
) (T, error) {
    for count := 0; ; count++ {
        x, err := fn()
        if err == nil { return x, err }
        if next := strategy.Next(); next > 0 {
            logger(ctx).Warnf("failed to %v (attempt %v)", msg, count)
            time.Sleep(next)
        } else {
            logger(ctx).Errorf("failed to %v (attempt %v)", msg, count)
            return x, err
        }
    }
}

An example implementation of retry strategy, which each retry happens after a pre-defined duration:

type RetryStrategy func() time.Duration

func (f RetryStrategy) Next() time.Duration { return f() }

func NewSimpleStrategy(at []time.Duration) RetryStrategy {
    i := -1
    return func() time.Duration {
        i++
        if i < len(at) { return i }
        return -1; // no more retry
    }
}

And implementation of msgf() func which can be used to quickly return a fmt.Stringer:

type StringFunc func() string

func (f StringFunc) String() { return f() }

func msgf(msg string, args ...any) StringFunc {
   return func() string {
       return fmt.Sprintf(msg, args...)
   }
}

States

There are many states: global status, global progress, slot status, slot progress, locks, etc. For each state, we need to refresh, update, or delete. Each action also needs to be able to retry:

globalStatus := mustRetry(ctx, m.strategy, msgf("load status")
	func() (Status, error) {
	    str, err := m.redisClient.GetString(ctx, "mgre:status")
	    if err != nil { return 0, err }
	    if str == "" { return NOT_STARTED }
	    return parseStatus(str)
	})
slotWorker := mustRetry(ctx, m.strategy, msgf("load last slot")
	func() (string, error) {
	    return m.redisClient.GetString(ctx, "mgre:", kSlotWorker(slot))
	})	
mustRetry(ctx, m.strategy, msg("save status")
	func() (int, error) {
	    str := encodeStatus(newStatus)
	    err := m.redisClient.SetStringTTL(ctx, "mgre:status", str)
	    return 0, err
	})

func kSlotWorker(slot fmt.Stringer) string {
    return fmt.Sprintf("mgre:%v:worker", slot)
}

The code will quickly become too verbose. We can encapsulate the key, including encoding logic, and other configs, in a State struct:

type State[T any] struct {
    v      atomic.Value
    key    string
    ttl    time.Duration
    retry  RetryStrategy
    parse  func(string) (T, error)
    encode func(T) (string, error)
}

func NewState[T any](
    key string, ttl time.Duration, strategy RetryStrategy,
    parse  func(string) (T, error), 
    encode func(T) (string, error) 
) *State[T] {
    var zero T
    st := &State[T]{ /* ... */ }
    st.v.Store(zero)
    return st
}
func (s *State[T]) Load() T {
    return s.v.Load().(T)
}
func (s *State[T]) Refresh(ctx context.Context, redis RedisClient) error {
    str, err:= retry(ctx, msgf("get key %q", s.key),
        func() (string, error) {
            return redisClient.Get(ctx, key)
        })
    if err != nil { return err }
    v, err := encode(str)
    if err != nil { return err }
    s.v.Store(v)
    return nil
}
func (s *State[T]) Save(ctx context.Context, redis RedisClient, v T) error {
    str, err := s.encode(v)
    if err != nil { return err }
    return retry(ctx, msgf("set key %q", s.key), 
        func() (int, error) {
            _, err := redisClient.Set(ctx, key, str, s.ttl)
            return 0, err
        })
}
func (s *State[T]) AcquireLock(ctx context.Context, redis RedisClient, v T) error {
    // πŸ‘‰ similar to Save(), use SetNX instead ...
}

Then implement a few helpers to quickly access them:

type DepsI  interface { _redis() RedisClient }
type StateI interface { _key() string; _ttl() time.Duration; /* ... */ }

func mustRefresh(ctx context.Context, deps DepsI, states StateI, msg fmt.Stringer) {
    /* ... */
}
func mustSet[T any](ctx context.Context, deps DepsI, state State[T], v T) {
    /* ... */
}

Finally, we can simplify the usage:

func (m *Migrator) exampleInit(slot Slot) {
    m.globalStatus = NewState(kStatus, longTTL, m.strategy, parseStatus, encodeStatus)
    m.slotWorker = NewState(kSlotWorker(slot), shortTTL, m.strategy, parseStr, encodeStr)
    // ...
}

func (m *Migrator) exampleRefreshStates() {
    mustRefresh(ctx, m, []StateI{m.globalStatus, m.lastSlot}, msgf("load states"))
    mustSet(ctx, m, m.globalStatus, FINISHED, msgf("save status"))
    // ...
}

That’s much better!

SlotStates

The Timeslot struct only contains definition for time slot. We need another struct to encapsulate its states:

type SlotStates struct {
    Timeslot             // πŸ‘‰ embedded Timeslot to quickly access methods
    status State[Status] // πŸ‘‰ mgre:TIMESLOT:status
    worker State[string] // πŸ‘‰ mgre:TIMESLOT:worker
}
func newSlotStates(slot Timeslot) *SlotStates {
    return &SlotStates{
        Timeslot: slot,
        status: NewState(kStatus, longTTL,  /* ... */),
        worker: NewState(kWorker, shortTTL, /* ... */),
    }
}

There should be only a single SlotStates for each Timeslot in a pod, shared among manager, keeper, and workers. So it’s better to have a centralized place to init and store them:

type Migrator {
    // ...
    slots map[string]*SlotStates
    mu    sync.RWMutex
}
func (m *Migrator) getSlotStates(slot Timeslot) *SlotStates {
    if st := m._getSlotStates(); st != nil { return st }
    m.mu.Lock()
    defer m.mu.Unlock()
    if st := m.slots[slot.String()]; st != nil { return st }
    st := newSlotStates(slot)
    s.slots[slot.String()] = st
    return st
}
func (m *Migrator) _getSlotStates() *SlotStates {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return m.slots[slot.String()]
}

Conclusion

Phew! That’s a lot!! I’m happy that you are still here!

Migrating large volumes of data in a real-world environment is far more complex than a simple script can handle. By carefully designing and implementing the migration code with partitioning, parallel processing, state management, fault tolerance, idempotency, and centralized progress tracking, we can achieve a reliable migration process, maintain data integrity, and minimize downtime.

And sleep well at night too! πŸ˜‹


Let's stay connected!

If you like the post, subscribe to my newsletter to get latest updates:

Author

I'm Oliver Nguyen. A software maker working mostly in Go and JavaScript. I enjoy learning and seeing a better version of myself each day. Occasionally spin off new open source projects. Share knowledge and thoughts during my journey. Connect with me on , , , , or subscribe to my posts.