fix: stalling on large file sets (#18)

When running against nixpkgs we were stalling. This was due to a long running read tx which was preventing any writes.

This breaks up reading the cache when walking the filesystem into many smaller read txs.

On my laptop I'm now getting the following with the echo sample:

```console
# fresh cache

❯ nix run .# -- -c --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs
38825 files changed in 320.655826ms

# hot cache

❯ nix run .# -- --config-file ./test/echo.toml --tree-root ../../../github.com/nixos/nixpkgs
0 files changed in 252.920853ms%
```

Signed-off-by: Brian McGee <brian@bmcgee.ie>

Reviewed-on: #18
Reviewed-by: Jonas Chevalier <zimbatm@noreply.git.numtide.com>
Co-authored-by: Brian McGee <brian@bmcgee.ie>
Co-committed-by: Brian McGee <brian@bmcgee.ie>
This commit is contained in:
Brian McGee 2024-01-07 18:57:51 +00:00 committed by Brian McGee
parent a3ca7825a6
commit 55ca4468cc

View File

@ -21,6 +21,8 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"
readBatchSize = 1024
)
// Entry represents a cache entry, indicating the last size and modified time for a file path.
@ -171,40 +173,64 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, root string, pathsCh chan<- string) error {
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(pathsBucket))
var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
}
defer func() {
// close any pending read tx
if tx != nil {
_ = tx.Rollback()
}
}()
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
// skip symlinks
return nil
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
return filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("%w: failed to walk path", err)
} else if ctx.Err() != nil {
return ctx.Err()
} else if info.IsDir() {
// todo what about symlinks?
return nil
})
}
// ignore symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil
}
// open a new read tx if there isn't one in progress
// we have to periodically open a new read tx to prevent writes from being blocked
if tx == nil {
tx, err = db.Begin(false)
if err != nil {
return fmt.Errorf("%w: failed to open a new read tx", err)
}
bucket = tx.Bucket([]byte(pathsBucket))
}
cached, err := getEntry(bucket, path)
if err != nil {
return err
}
changedOrNew := cached == nil || !(cached.Modified == info.ModTime() && cached.Size == info.Size())
if !changedOrNew {
// no change
return nil
}
// pass on the path
pathsCh <- path
// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
}
return nil
})
}