fix: stalling on large file sets #18
34
internal/cache/cache.go
vendored
34
internal/cache/cache.go
vendored
@ -21,6 +21,8 @@ import (
|
||||
const (
|
||||
pathsBucket = "paths"
|
||||
formattersBucket = "formatters"
|
||||
|
||||
readBatchSize = 1024
|
||||
)
|
||||
|
||||
// Entry represents a cache entry, indicating the last size and modified time for a file path.
|
||||
@ -171,8 +173,16 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
|
||||
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
|
||||
// It determines if a path is new or has changed by comparing against cache entries.
|
||||
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(pathsBucket))
|
||||
var tx *bolt.Tx
|
||||
var bucket *bolt.Bucket
|
||||
var processed int
|
||||
|
||||
defer func() {
|
||||
// close any pending read tx
|
||||
if tx != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
@ -184,11 +194,21 @@ func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ignore symlinks
|
||||
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
|
||||
// skip symlinks
|
||||
return nil
|
||||
}
|
||||
|
||||
// open a new read tx if there isn't one in progress
|
||||
// we have to periodically open a new read tx to prevent writes from being blocked
|
||||
if tx == nil {
|
||||
tx, err = db.Begin(false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: failed to open a new read tx", err)
|
||||
}
|
||||
bucket = tx.Bucket([]byte(pathsBucket))
|
||||
}
|
||||
|
||||
cached, err := getEntry(bucket, path)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -203,9 +223,15 @@ func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
|
||||
|
||||
// pass on the path
|
||||
pathsCh <- path
|
||||
|
||||
// close the current tx if we have reached the batch size
|
||||
processed += 1
|
||||
if processed == readBatchSize {
|
||||
return tx.Rollback()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Update is used to record updated cache information for the specified list of paths.
|
||||
|
Reference in New Issue
Block a user