fix: stalling on large file sets #18
88
internal/cache/cache.go
vendored
88
internal/cache/cache.go
vendored
|
@ -21,6 +21,8 @@ import (
|
||||||
const (
|
const (
|
||||||
pathsBucket = "paths"
|
pathsBucket = "paths"
|
||||||
formattersBucket = "formatters"
|
formattersBucket = "formatters"
|
||||||
|
|
||||||
|
readBatchSize = 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// Entry represents a cache entry, indicating the last size and modified time for a file path.
|
// Entry represents a cache entry, indicating the last size and modified time for a file path.
|
||||||
|
@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
|
||||||
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
|
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
|
||||||
// It determines if a path is new or has changed by comparing against cache entries.
|
// It determines if a path is new or has changed by comparing against cache entries.
|
||||||
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
|
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
|
||||||
return db.Update(func(tx *bolt.Tx) error {
|
var tx *bolt.Tx
|
||||||
bucket := tx.Bucket([]byte(pathsBucket))
|
var bucket *bolt.Bucket
|
||||||
|
var processed int
|
||||||
|
|
||||||
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
|
defer func() {
|
||||||
if err != nil {
|
// close any pending read tx
|
||||||
return fmt.Errorf("%w: failed to walk path", err)
|
if tx != nil {
|
||||||
} else if ctx.Err() != nil {
|
_ = tx.Rollback()
|
||||||
return ctx.Err()
|
}
|
||||||
} else if info.IsDir() {
|
}()
|
||||||
// todo what about symlinks?
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
|
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
|
||||||
// skip symlinks
|
if err != nil {
|
||||||
return nil
|
return fmt.Errorf("%w: failed to walk path", err)
|
||||||
}
|
} else if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
cached, err := getEntry(bucket, path)
|
} else if info.IsDir() {
|
||||||
if err != nil {
|
// todo what about symlinks?
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
|
|
||||||
|
|
||||||
if !changedOrNew {
|
|
||||||
// no change
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// pass on the path
|
|
||||||
pathsCh <- path
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
|
// ignore symlinks
|
||||||
|
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a new read tx if there isn't one in progress
|
||||||
|
// we have to periodically open a new read tx to prevent writes from being blocked
|
||||||
|
if tx == nil {
|
||||||
|
tx, err = db.Begin(false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%w: failed to open a new read tx", err)
|
||||||
|
}
|
||||||
|
bucket = tx.Bucket([]byte(pathsBucket))
|
||||||
|
}
|
||||||
|
|
||||||
|
cached, err := getEntry(bucket, path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
|
||||||
|
|
||||||
|
if !changedOrNew {
|
||||||
|
// no change
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// pass on the path
|
||||||
|
pathsCh <- path
|
||||||
|
|
||||||
|
// close the current tx if we have reached the batch size
|
||||||
|
processed += 1
|
||||||
|
if processed == readBatchSize {
|
||||||
|
return tx.Rollback()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in New Issue
Block a user