Clear last position on replica sync error

This commit is contained in:
Ben Johnson
2021-01-16 07:45:08 -07:00
parent cbc2dce6dc
commit 25fec29e1a
3 changed files with 26 additions and 7 deletions

12
db.go
View File

@@ -713,7 +713,7 @@ func (db *DB) Sync() (err error) {
if checkpoint {
changed = true
if err := db.checkpointAndInit(info, checkpointMode); err != nil {
if err := db.checkpointAndInit(info.shadowWALPath, checkpointMode); err != nil {
return fmt.Errorf("checkpoint: mode=%v err=%w", checkpointMode, err)
}
}
@@ -1199,7 +1199,7 @@ func (db *DB) checkpoint(mode string) (err error) {
// checkpointAndInit performs a checkpoint on the WAL file and initializes a
// new shadow WAL file.
func (db *DB) checkpointAndInit(info syncInfo, mode string) error {
func (db *DB) checkpointAndInit(shadowWALPath string, mode string) error {
// Read WAL header before checkpoint to check if it has been restarted.
hdr, err := readWALHeader(db.WALPath())
if err != nil {
@@ -1222,18 +1222,18 @@ func (db *DB) checkpointAndInit(info syncInfo, mode string) error {
}
// Copy the end of the previous WAL before starting a new shadow WAL.
if _, err := db.copyToShadowWAL(info.shadowWALPath); err != nil {
if _, err := db.copyToShadowWAL(shadowWALPath); err != nil {
return fmt.Errorf("cannot copy to end of shadow wal: %w", err)
}
// Parse index of current shadow WAL file.
index, _, _, _, err := ParseWALPath(info.shadowWALPath)
index, _, _, _, err := ParseWALPath(shadowWALPath)
if err != nil {
return fmt.Errorf("cannot parse shadow wal filename: %s", info.shadowWALPath)
return fmt.Errorf("cannot parse shadow wal filename: %s", shadowWALPath)
}
// Start a new shadow WAL file with next index.
newShadowWALPath := filepath.Join(filepath.Dir(info.shadowWALPath), FormatWALPath(index+1))
newShadowWALPath := filepath.Join(filepath.Dir(shadowWALPath), FormatWALPath(index+1))
if err := db.initShadowWALFile(newShadowWALPath); err != nil {
return fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err)
}

View File

@@ -566,6 +566,15 @@ func (r *FileReplica) snapshotN(generation string) (int, error) {
}
func (r *FileReplica) Sync(ctx context.Context) (err error) {
// Clear last position if if an error occurs during sync.
defer func() {
if err != nil {
r.mu.Lock()
r.pos = Pos{}
r.mu.Unlock()
}
}()
// Find current position of database.
dpos, err := r.db.Pos()
if err != nil {
@@ -588,7 +597,7 @@ func (r *FileReplica) Sync(ctx context.Context) (err error) {
}
// Determine position, if necessary.
if r.LastPos().IsZero() {
if r.LastPos().Generation != generation {
pos, err := r.CalcPos(ctx, generation)
if err != nil {
return fmt.Errorf("cannot determine replica position: %s", err)

View File

@@ -678,6 +678,16 @@ func (r *Replica) findBucketRegion(ctx context.Context, bucket string) (string,
}
func (r *Replica) Sync(ctx context.Context) (err error) {
// Clear last position if if an error occurs during sync.
defer func() {
if err != nil {
r.mu.Lock()
r.pos = litestream.Pos{}
r.mu.Unlock()
}
}()
// Connect to S3, if necessary.
if err := r.Init(ctx); err != nil {
return err
}