From 55ca4468ccc5ad4f31f7fa8fe66b75c62cf5039f Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Sun, 7 Jan 2024 18:57:51 +0000 Subject: [PATCH] fix: stalling on large file sets (#18) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When running against nixpkgs we were stalling. This was due to a long running read tx which was preventing any writes. This breaks up reading the cache when walking the filesystem into many smaller read txs. On my laptop I'm now getting the following with the echo sample: ```console # fresh cache ❯ nix run .# -- -c --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs 38825 files changed in 320.655826ms # hot cache ❯ nix run .# -- --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs 0 files changed in 252.920853ms% ``` Signed-off-by: Brian McGee Reviewed-on: https://git.numtide.com/numtide/treefmt/pulls/18 Reviewed-by: Jonas Chevalier Co-authored-by: Brian McGee Co-committed-by: Brian McGee --- internal/cache/cache.go | 88 ++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 88169f7..120496f 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -21,6 +21,8 @@ import ( const ( pathsBucket = "paths" formattersBucket = "formatters" + + readBatchSize = 1024 ) // Entry represents a cache entry, indicating the last size and modified time for a file path. @@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error { // ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh. // It determines if a path is new or has changed by comparing against cache entries. func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error { - return db.Update(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(pathsBucket)) + var tx *bolt.Tx + var bucket *bolt.Bucket + var processed int - return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("%w: failed to walk path", err) - } else if ctx.Err() != nil { - return ctx.Err() - } else if info.IsDir() { - // todo what about symlinks? - return nil - } + defer func() { + // close any pending read tx + if tx != nil { + _ = tx.Rollback() + } + }() - if info.Mode()&os.ModeSymlink == os.ModeSymlink { - // skip symlinks - return nil - } - - cached, err := getEntry(bucket, path) - if err != nil { - return err - } - - changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size()) - - if !changedOrNew { - // no change - return nil - } - - // pass on the path - pathsCh <- path + return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("%w: failed to walk path", err) + } else if ctx.Err() != nil { + return ctx.Err() + } else if info.IsDir() { + // todo what about symlinks? return nil - }) + } + + // ignore symlinks + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + return nil + } + + // open a new read tx if there isn't one in progress + // we have to periodically open a new read tx to prevent writes from being blocked + if tx == nil { + tx, err = db.Begin(false) + if err != nil { + return fmt.Errorf("%w: failed to open a new read tx", err) + } + bucket = tx.Bucket([]byte(pathsBucket)) + } + + cached, err := getEntry(bucket, path) + if err != nil { + return err + } + + changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size()) + + if !changedOrNew { + // no change + return nil + } + + // pass on the path + pathsCh <- path + + // close the current tx if we have reached the batch size + processed += 1 + if processed == readBatchSize { + return tx.Rollback() + } + + return nil }) }