diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 88169f7..120496f 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -21,6 +21,8 @@ import ( const ( pathsBucket = "paths" formattersBucket = "formatters" + + readBatchSize = 1024 ) // Entry represents a cache entry, indicating the last size and modified time for a file path. @@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error { // ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh. // It determines if a path is new or has changed by comparing against cache entries. func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error { - return db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(pathsBucket)) + var tx *bolt.Tx + var bucket *bolt.Bucket + var processed int - return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("%w: failed to walk path", err) - } else if ctx.Err() != nil { - return ctx.Err() - } else if info.IsDir() { - // todo what about symlinks? - return nil - } + defer func() { + // close any pending read tx + if tx != nil { + _ = tx.Rollback() + } + }() - if info.Mode()&os.ModeSymlink == os.ModeSymlink { - // skip symlinks - return nil - } - - cached, err := getEntry(bucket, path) - if err != nil { - return err - } - - changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size()) - - if !changedOrNew { - // no change - return nil - } - - // pass on the path - pathsCh <- path + return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("%w: failed to walk path", err) + } else if ctx.Err() != nil { + return ctx.Err() + } else if info.IsDir() { + // todo what about symlinks? return nil - }) + } + + // ignore symlinks + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + return nil + } + + // open a new read tx if there isn't one in progress + // we have to periodically open a new read tx to prevent writes from being blocked + if tx == nil { + tx, err = db.Begin(false) + if err != nil { + return fmt.Errorf("%w: failed to open a new read tx", err) + } + bucket = tx.Bucket([]byte(pathsBucket)) + } + + cached, err := getEntry(bucket, path) + if err != nil { + return err + } + + changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size()) + + if !changedOrNew { + // no change + return nil + } + + // pass on the path + pathsCh <- path + + // close the current tx if we have reached the batch size + processed += 1 + if processed == readBatchSize { + return tx.Rollback() + } + + return nil }) }