fix: stalling on large file sets

When running against nixpkgs we were stalling. This was due to a long running read tx which was preventing any writes.

This breaks up reading the cache when walking the filesystem into many smaller read txs.

Signed-off-by: Brian McGee <brian@bmcgee.ie>
This commit is contained in:
Brian McGee 2024-01-04 21:00:01 +00:00
parent a3ca7825a6
commit 5b91f74f89
Signed by: brianmcgee
GPG Key ID: D49016E76AD1E8C0

View File

@ -21,6 +21,8 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"
readBatchSize = 1024
)
// Entry represents a cache entry, indicating the last size and modified time for a file path.
@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(pathsBucket))
var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
}
defer func() {
// close any pending read tx
if tx != nil {
_ = tx.Rollback()
}
}()
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
// skip symlinks
return nil
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
})
}
// ignore symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil
}
// open a new read tx if there isn't one in progress
// we have to periodically open a new read tx to prevent writes from being blocked
if tx == nil {
tx, err = db.Begin(false)
if err != nil {
return fmt.Errorf("%w: failed to open a new read tx", err)
}
bucket = tx.Bucket([]byte(pathsBucket))
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
}
return nil
})
}