fix: stalling on large file sets #18

Merged
brianmcgee merged 1 commits from fix/stalling into main 2024-01-07 18:57:52 +00:00

View File

@ -21,6 +21,8 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"
readBatchSize = 1024
)
// Entry represents a cache entry, indicating the last size and modified time for a file path.
@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(pathsBucket))
var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
}
defer func() {
// close any pending read tx
if tx != nil {
_ = tx.Rollback()
}
}()
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
// skip symlinks
return nil
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
})
}
// ignore symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil
}
// open a new read tx if there isn't one in progress
// we have to periodically open a new read tx to prevent writes from being blocked
if tx == nil {
tx, err = db.Begin(false)
if err != nil {
return fmt.Errorf("%w: failed to open a new read tx", err)
}
bucket = tx.Bucket([]byte(pathsBucket))
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
}
return nil
})
}