Merge pull request 'feat: introduce concept of pipelines for better concurrency' (#30) from feat/concurrency-refactor into main

Reviewed-on: #30
This commit is contained in:
Brian McGee 2024-04-26 09:36:36 +00:00
commit 5d341f929f
12 changed files with 512 additions and 478 deletions

43
cache/cache.go vendored
View File

@ -7,6 +7,8 @@ import (
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"time"
"git.numtide.com/numtide/treefmt/format"
@ -22,8 +24,6 @@ import (
const (
pathsBucket = "paths"
formattersBucket = "formatters"
readBatchSize = 1024
)
// Entry represents a cache entry, indicating the last size and modified time for a file path.
@ -32,7 +32,11 @@ type Entry struct {
Modified time.Time
}
var db *bolt.DB
var (
db *bolt.DB
ReadBatchSize = 1024 * runtime.NumCPU()
logger *log.Logger
)
// Open creates an instance of bolt.DB for a given treeRoot path.
// If clean is true, Open will delete any existing data in the cache.
@ -40,7 +44,7 @@ var db *bolt.DB
// The database will be located in `XDG_CACHE_DIR/treefmt/eval-cache/<id>.db`, where <id> is determined by hashing
// the treeRoot path. This associates a given treeRoot with a given instance of the cache.
func Open(treeRoot string, clean bool, formatters map[string]*format.Formatter) (err error) {
l := log.WithPrefix("cache")
logger = log.WithPrefix("cache")
// determine a unique and consistent db name for the tree root
h := sha1.New()
@ -85,7 +89,7 @@ func Open(treeRoot string, clean bool, formatters map[string]*format.Formatter)
}
clean = clean || entry == nil || !(entry.Size == stat.Size() && entry.Modified == stat.ModTime())
l.Debug(
logger.Debug(
"checking if formatter has changed",
"name", name,
"clean", clean,
@ -174,6 +178,12 @@ func putEntry(bucket *bolt.Bucket, path string, entry *Entry) error {
// ChangeSet is used to walk a filesystem, starting at root, and outputting any new or changed paths using pathsCh.
// It determines if a path is new or has changed by comparing against cache entries.
func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) error {
start := time.Now()
defer func() {
logger.Infof("finished generating change set in %v", time.Since(start))
}()
var tx *bolt.Tx
var bucket *bolt.Bucket
var processed int
@ -185,6 +195,9 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
}
}()
// for quick removal of tree root from paths
relPathOffset := len(walker.Root()) + 1
return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
@ -213,7 +226,8 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
bucket = tx.Bucket([]byte(pathsBucket))
}
cached, err := getEntry(bucket, path)
relPath := path[relPathOffset:]
cached, err := getEntry(bucket, relPath)
if err != nil {
return err
}
@ -230,13 +244,15 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
case <-ctx.Done():
return ctx.Err()
default:
pathsCh <- path
pathsCh <- relPath
}
// close the current tx if we have reached the batch size
processed += 1
if processed == readBatchSize {
return tx.Rollback()
if processed == ReadBatchSize {
err = tx.Rollback()
tx = nil
return err
}
return nil
@ -244,7 +260,12 @@ func ChangeSet(ctx context.Context, walker walk.Walker, pathsCh chan<- string) e
}
// Update is used to record updated cache information for the specified list of paths.
func Update(paths []string) (int, error) {
func Update(treeRoot string, paths []string) (int, error) {
start := time.Now()
defer func() {
logger.Infof("finished updating %v paths in %v", len(paths), time.Since(start))
}()
if len(paths) == 0 {
return 0, nil
}
@ -260,7 +281,7 @@ func Update(paths []string) (int, error) {
return err
}
pathInfo, err := os.Stat(path)
pathInfo, err := os.Stat(filepath.Join(treeRoot, path))
if err != nil {
return err
}

View File

@ -8,23 +8,42 @@ import (
"io/fs"
"os"
"os/signal"
"path/filepath"
"runtime"
"slices"
"sort"
"strings"
"syscall"
"time"
"git.numtide.com/numtide/treefmt/format"
"github.com/gobwas/glob"
"git.numtide.com/numtide/treefmt/cache"
"git.numtide.com/numtide/treefmt/config"
format2 "git.numtide.com/numtide/treefmt/format"
"git.numtide.com/numtide/treefmt/walk"
"github.com/charmbracelet/log"
"golang.org/x/sync/errgroup"
)
var ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
const (
BatchSize = 1024
)
func (f *Format) Run() error {
start := time.Now()
var (
start time.Time
globalExcludes []glob.Glob
formatters map[string]*format.Formatter
pipelines map[string]*format.Pipeline
pathsCh chan string
processedCh chan string
ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
)
func (f *Format) Run() (err error) {
start = time.Now()
Cli.Configure()
@ -36,86 +55,50 @@ func (f *Format) Run() error {
}
}()
// create an overall context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// read config
cfg, err := config.ReadFile(Cli.ConfigFile)
if err != nil {
return fmt.Errorf("%w: failed to read config file", err)
}
globalExcludes, err := format2.CompileGlobs(cfg.Global.Excludes)
// create optional formatter filter set
formatterSet := make(map[string]bool)
for _, name := range Cli.Formatters {
_, ok := cfg.Formatters[name]
if !ok {
return fmt.Errorf("%w: formatter not found in config: %v", err, name)
}
formatterSet[name] = true
if globalExcludes, err = format.CompileGlobs(cfg.Global.Excludes); err != nil {
return fmt.Errorf("%w: failed to compile global globs", err)
}
includeFormatter := func(name string) bool {
if len(formatterSet) == 0 {
return true
} else {
_, include := formatterSet[name]
return include
}
}
pipelines = make(map[string]*format.Pipeline)
formatters = make(map[string]*format.Formatter)
formatters := make(map[string]*format2.Formatter)
// detect broken dependencies
for name, formatterCfg := range cfg.Formatters {
before := formatterCfg.Before
if before != "" {
// check child formatter exists
_, ok := cfg.Formatters[before]
// filter formatters
if len(Cli.Formatters) > 0 {
// first check the cli formatter list is valid
for _, name := range Cli.Formatters {
_, ok := cfg.Formatters[name]
if !ok {
return fmt.Errorf("formatter %v is before %v but config for %v was not found", name, before, before)
return fmt.Errorf("formatter not found in config: %v", name)
}
}
// next we remove any formatter configs that were not specified
for name := range cfg.Formatters {
if !slices.Contains(Cli.Formatters, name) {
delete(cfg.Formatters, name)
}
}
}
// dependency cycle detection
for name, formatterCfg := range cfg.Formatters {
var ok bool
var history []string
childName := name
for {
// add to history
history = append(history, childName)
// sort the formatter names so that, as we construct pipelines, we add formatters in a determinstic fashion. This
// ensures a deterministic order even when all priority values are the same e.g. 0
if formatterCfg.Before == "" {
break
} else if formatterCfg.Before == name {
return fmt.Errorf("formatter cycle detected %v", strings.Join(history, " -> "))
}
// load child config
childName = formatterCfg.Before
formatterCfg, ok = cfg.Formatters[formatterCfg.Before]
if !ok {
return fmt.Errorf("formatter not found: %v", formatterCfg.Before)
}
}
names := make([]string, 0, len(cfg.Formatters))
for name := range cfg.Formatters {
names = append(names, name)
}
sort.Strings(names)
// init formatters
for name, formatterCfg := range cfg.Formatters {
if !includeFormatter(name) {
// remove this formatter
delete(cfg.Formatters, name)
l.Debugf("formatter %v is not in formatter list %v, skipping", name, Cli.Formatters)
continue
}
formatter, err := format2.NewFormatter(name, formatterCfg, globalExcludes)
if errors.Is(err, format2.ErrCommandNotFound) && Cli.AllowMissingFormatter {
for _, name := range names {
formatterCfg := cfg.Formatters[name]
formatter, err := format.NewFormatter(name, Cli.TreeRoot, formatterCfg, globalExcludes)
if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
l.Debugf("formatter not found: %v", name)
continue
} else if err != nil {
@ -123,141 +106,84 @@ func (f *Format) Run() error {
}
formatters[name] = formatter
}
// iterate the initialised formatters configuring parent/child relationships
for _, formatter := range formatters {
if formatter.Before() != "" {
child, ok := formatters[formatter.Before()]
if formatterCfg.Pipeline == "" {
pipeline := format.Pipeline{}
pipeline.Add(formatter)
pipelines[name] = &pipeline
} else {
key := fmt.Sprintf("p:%s", formatterCfg.Pipeline)
pipeline, ok := pipelines[key]
if !ok {
// formatter has been filtered out by the user
formatter.ResetBefore()
continue
pipeline = &format.Pipeline{}
pipelines[key] = pipeline
}
formatter.SetChild(child)
child.SetParent(formatter)
pipeline.Add(formatter)
}
}
// open the cache
if err = cache.Open(Cli.TreeRoot, Cli.ClearCache, formatters); err != nil {
return err
}
//
completedCh := make(chan string, 1024)
// create an app context and listen for shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = format2.SetCompletedChannel(ctx, completedCh)
go func() {
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
cancel()
}()
//
// create some groups for concurrent processing and control flow
eg, ctx := errgroup.WithContext(ctx)
// start the formatters
for name := range formatters {
formatter := formatters[name]
eg.Go(func() error {
return formatter.Run(ctx)
})
}
// create a channel for paths to be processed
// we use a multiple of batch size here to allow for greater concurrency
pathsCh = make(chan string, BatchSize*runtime.NumCPU())
// determine paths to be formatted
pathsCh := make(chan string, 1024)
// create a channel for tracking paths that have been processed
processedCh = make(chan string, cap(pathsCh))
// update cache as paths are completed
eg.Go(func() error {
batchSize := 1024
batch := make([]string, 0, batchSize)
// start concurrent processing tasks
eg.Go(updateCache(ctx))
eg.Go(applyFormatters(ctx))
eg.Go(walkFilesystem(ctx))
var changes int
// wait for everything to complete
return eg.Wait()
}
processBatch := func() error {
if Cli.NoCache {
changes += len(batch)
} else {
count, err := cache.Update(batch)
if err != nil {
return err
}
changes += count
}
batch = batch[:0]
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
return ctx.Err()
case path, ok := <-completedCh:
if !ok {
break LOOP
}
batch = append(batch, path)
if len(batch) == batchSize {
if err = processBatch(); err != nil {
return err
}
}
}
}
// final flush
if err = processBatch(); err != nil {
return err
}
if Cli.FailOnChange && changes != 0 {
return ErrFailOnChange
}
fmt.Printf("%v files changed in %v\n", changes, time.Now().Sub(start))
return nil
})
eg.Go(func() error {
// pass paths to each formatter
for path := range pathsCh {
for _, formatter := range formatters {
if formatter.Wants(path) {
formatter.Put(path)
}
}
}
// indicate no more paths for each formatter
for _, formatter := range formatters {
if formatter.Parent() != nil {
// this formatter is not a root, it will be closed by a parent
continue
}
formatter.Close()
}
// await completion
for _, formatter := range formatters {
formatter.AwaitCompletion()
}
// indicate no more completion events
close(completedCh)
return nil
})
eg.Go(func() (err error) {
func walkFilesystem(ctx context.Context) func() error {
return func() error {
paths := Cli.Paths
if len(paths) == 0 && Cli.Stdin {
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("%w: failed to determine current working directory", err)
}
// read in all the paths
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, scanner.Text())
path := scanner.Text()
if !strings.HasPrefix(path, "/") {
// append the cwd
path = filepath.Join(cwd, path)
}
paths = append(paths, path)
}
}
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
if err != nil {
return fmt.Errorf("%w: failed to create walker", err)
return fmt.Errorf("failed to create walker: %w", err)
}
defer close(pathsCh)
@ -277,16 +203,140 @@ func (f *Format) Run() error {
})
}
return cache.ChangeSet(ctx, walker, pathsCh)
})
// listen for shutdown and call cancel if required
go func() {
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
cancel()
}()
return eg.Wait()
if err = cache.ChangeSet(ctx, walker, pathsCh); err != nil {
return fmt.Errorf("failed to generate change set: %w", err)
}
return nil
}
}
func updateCache(ctx context.Context) func() error {
return func() error {
batch := make([]string, 0, BatchSize)
var changes int
processBatch := func() error {
if Cli.NoCache {
changes += len(batch)
} else {
count, err := cache.Update(Cli.TreeRoot, batch)
if err != nil {
return err
}
changes += count
}
batch = batch[:0]
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
return ctx.Err()
case path, ok := <-processedCh:
if !ok {
break LOOP
}
batch = append(batch, path)
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
}
}
// final flush
if err := processBatch(); err != nil {
return err
}
if Cli.FailOnChange && changes != 0 {
return ErrFailOnChange
}
fmt.Printf("%v files changed in %v\n", changes, time.Now().Sub(start))
return nil
}
}
func applyFormatters(ctx context.Context) func() error {
fg, ctx := errgroup.WithContext(ctx)
batches := make(map[string][]string)
tryApply := func(key string, path string) {
batch, ok := batches[key]
if !ok {
batch = make([]string, 0, BatchSize)
}
batch = append(batch, path)
batches[key] = batch
if len(batch) == BatchSize {
pipeline := pipelines[key]
// copy the batch
paths := make([]string, len(batch))
copy(paths, batch)
fg.Go(func() error {
if err := pipeline.Apply(ctx, paths); err != nil {
return err
}
for _, path := range paths {
processedCh <- path
}
return nil
})
batches[key] = batch[:0]
}
}
flushBatches := func() {
for key, pipeline := range pipelines {
batch := batches[key]
pipeline := pipeline // capture for closure
if len(batch) > 0 {
fg.Go(func() error {
if err := pipeline.Apply(ctx, batch); err != nil {
return fmt.Errorf("%s failure: %w", key, err)
}
for _, path := range batch {
processedCh <- path
}
return nil
})
}
}
}
return func() error {
defer func() {
// close processed channel
close(processedCh)
}()
for path := range pathsCh {
for key, pipeline := range pipelines {
if !pipeline.Wants(path) {
continue
}
tryApply(key, path)
}
}
// flush any partial batches which remain
flushBatches()
// wait for all outstanding formatting tasks to complete
if err := fg.Wait(); err != nil {
return fmt.Errorf("pipeline processing failure: %w", err)
}
return nil
}
}

View File

@ -1,11 +1,13 @@
package cli
import (
"bufio"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"testing"
config2 "git.numtide.com/numtide/treefmt/config"
@ -23,7 +25,7 @@ import (
func TestAllowMissingFormatter(t *testing.T) {
as := require.New(t)
tempDir := t.TempDir()
tempDir := test.TempExamples(t)
configPath := tempDir + "/treefmt.toml"
test.WriteConfig(t, configPath, config2.Config{
@ -41,27 +43,6 @@ func TestAllowMissingFormatter(t *testing.T) {
as.NoError(err)
}
func TestDependencyCycle(t *testing.T) {
as := require.New(t)
tempDir := t.TempDir()
configPath := tempDir + "/treefmt.toml"
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"a": {Command: "echo", Before: "b"},
"b": {Command: "echo", Before: "c"},
"c": {Command: "echo", Before: "a"},
"d": {Command: "echo", Before: "e"},
"e": {Command: "echo", Before: "f"},
"f": {Command: "echo"},
},
})
_, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.ErrorContains(err, "formatter cycle detected")
}
func TestSpecifyingFormatters(t *testing.T) {
as := require.New(t)
@ -129,7 +110,7 @@ func TestIncludesAndExcludes(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err := cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
// globally exclude nix files
cfg.Global.Excludes = []string{"*.nix"}
@ -137,7 +118,7 @@ func TestIncludesAndExcludes(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err = cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 29))
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
// add haskell files to the global exclude
cfg.Global.Excludes = []string{"*.nix", "*.hs"}
@ -145,7 +126,7 @@ func TestIncludesAndExcludes(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err = cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 23))
as.Contains(string(out), fmt.Sprintf("%d files changed", 24))
echo := cfg.Formatters["echo"]
@ -155,7 +136,7 @@ func TestIncludesAndExcludes(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err = cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 21))
as.Contains(string(out), fmt.Sprintf("%d files changed", 22))
// remove go files from the echo formatter
echo.Excludes = []string{"*.py", "*.go"}
@ -163,7 +144,7 @@ func TestIncludesAndExcludes(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err = cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 20))
as.Contains(string(out), fmt.Sprintf("%d files changed", 21))
// adjust the includes for echo to only include elm files
echo.Includes = []string{"*.elm"}
@ -201,7 +182,7 @@ func TestCache(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
out, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
@ -210,7 +191,7 @@ func TestCache(t *testing.T) {
// clear cache
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir, "-c")
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
@ -219,7 +200,7 @@ func TestCache(t *testing.T) {
// clear cache
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir, "-c")
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
@ -228,7 +209,7 @@ func TestCache(t *testing.T) {
// no cache
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir, "--no-cache")
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
}
func TestChangeWorkingDirectory(t *testing.T) {
@ -262,7 +243,7 @@ func TestChangeWorkingDirectory(t *testing.T) {
// this should fail if the working directory hasn't been changed first
out, err := cmd(t, "-C", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
}
func TestFailOnChange(t *testing.T) {
@ -439,70 +420,16 @@ func TestGitWorktree(t *testing.T) {
// add everything to the worktree
as.NoError(wt.AddGlob("."))
as.NoError(err)
run(30)
run(31)
// remove python directory
as.NoError(wt.RemoveGlob("python/*"))
run(27)
run(28)
// walk with filesystem instead of git
out, err := cmd(t, "-c", "--config-file", configPath, "--tree-root", tempDir, "--walk", "filesystem")
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 57))
}
func TestOrderingFormatters(t *testing.T) {
as := require.New(t)
tempDir := test.TempExamples(t)
configPath := path.Join(tempDir, "treefmt.toml")
// missing child
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"hs-a": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-b",
},
},
})
out, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.ErrorContains(err, "formatter hs-a is before hs-b but config for hs-b was not found")
// multiple roots
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"hs-a": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-b",
},
"hs-b": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-c",
},
"hs-c": {
Command: "echo",
Includes: []string{"*.hs"},
},
"py-a": {
Command: "echo",
Includes: []string{"*.py"},
Before: "py-b",
},
"py-b": {
Command: "echo",
Includes: []string{"*.py"},
},
},
})
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), "8 files changed")
as.Contains(string(out), fmt.Sprintf("%d files changed", 59))
}
func TestPathsArg(t *testing.T) {
@ -537,7 +464,7 @@ func TestPathsArg(t *testing.T) {
// without any path args
out, err := cmd(t, "-C", tempDir)
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 30))
as.Contains(string(out), fmt.Sprintf("%d files changed", 31))
// specify some explicit paths
out, err = cmd(t, "-C", tempDir, "-c", "elm/elm.json", "haskell/Nested/Foo.hs")
@ -604,3 +531,63 @@ go/main.go
as.NoError(err)
as.Contains(string(out), fmt.Sprintf("%d files changed", 3))
}
func TestDeterministicOrderingInPipeline(t *testing.T) {
as := require.New(t)
tempDir := test.TempExamples(t)
configPath := tempDir + "/treefmt.toml"
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
// a and b should execute in lexicographical order as they have default priority 0, with c last since it has
// priority 1
"fmt-a": {
Command: "test-fmt",
Options: []string{"fmt-a"},
Includes: []string{"*.py"},
Pipeline: "foo",
},
"fmt-b": {
Command: "test-fmt",
Options: []string{"fmt-b"},
Includes: []string{"*.py"},
Pipeline: "foo",
},
"fmt-c": {
Command: "test-fmt",
Options: []string{"fmt-c"},
Includes: []string{"*.py"},
Pipeline: "foo",
Priority: 1,
},
},
})
_, err := cmd(t, "-C", tempDir)
as.NoError(err)
matcher := regexp.MustCompile("^fmt-(.*)")
// check each affected file for the sequence of test statements which should be prepended to the end
sequence := []string{"fmt-a", "fmt-b", "fmt-c"}
paths := []string{"python/main.py", "python/virtualenv_proxy.py"}
for _, p := range paths {
file, err := os.Open(filepath.Join(tempDir, p))
as.NoError(err)
scanner := bufio.NewScanner(file)
idx := 0
for scanner.Scan() {
line := scanner.Text()
matches := matcher.FindAllString(line, -1)
if len(matches) != 1 {
continue
}
as.Equal(sequence[idx], matches[0])
idx += 1
}
}
}

View File

@ -59,6 +59,18 @@ func TestReadConfigFile(t *testing.T) {
as.Nil(alejandra.Options)
as.Equal([]string{"*.nix"}, alejandra.Includes)
as.Equal([]string{"examples/nix/sources.nix"}, alejandra.Excludes)
as.Equal("nix", alejandra.Pipeline)
as.Equal(1, alejandra.Priority)
// deadnix
deadnix, ok := cfg.Formatters["deadnix"]
as.True(ok, "deadnix formatter not found")
as.Equal("deadnix", deadnix.Command)
as.Nil(deadnix.Options)
as.Equal([]string{"*.nix"}, deadnix.Includes)
as.Nil(deadnix.Excludes)
as.Equal("nix", deadnix.Pipeline)
as.Equal(2, deadnix.Priority)
// ruby
ruby, ok := cfg.Formatters["ruby"]

View File

@ -9,6 +9,8 @@ type Formatter struct {
Includes []string
// Excludes is an optional list of glob patterns used to exclude certain files from this Formatter.
Excludes []string
// Before is the name of another formatter which must process a path after this one
Before string
// Indicates this formatter should be executed as part of a group of formatters all sharing the same pipeline key.
Pipeline string
// Indicates the order of precedence when executing as part of a pipeline.
Priority int
}

View File

@ -1,21 +0,0 @@
package format
import (
"context"
)
const (
completedChKey = "completedCh"
)
// SetCompletedChannel is used to set a channel for indication processing completion in the provided context.
func SetCompletedChannel(ctx context.Context, completedCh chan string) context.Context {
return context.WithValue(ctx, completedChKey, completedCh)
}
// MarkPathComplete is used to indicate that all processing has finished for the provided path.
// This is done by adding the path to the completion channel which should have already been set using
// SetCompletedChannel.
func MarkPathComplete(ctx context.Context, path string) {
ctx.Value(completedChKey).(chan string) <- path
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"time"
@ -23,33 +24,13 @@ type Formatter struct {
log *log.Logger
executable string // path to the executable described by Command
before string
child *Formatter
parent *Formatter
workingDir string
// internal compiled versions of Includes and Excludes.
includes []glob.Glob
excludes []glob.Glob
// inboxCh is used to accept new paths for formatting.
inboxCh chan string
// completedCh is used to wait for this formatter to finish all processing.
completedCh chan interface{}
// Entries from inboxCh are batched according to batchSize and stored in batch for processing when the batchSize has
// been reached or Close is invoked.
batch []string
batchSize int
}
func (f *Formatter) Before() string {
return f.before
}
func (f *Formatter) ResetBefore() {
f.before = ""
batch []string
}
// Executable returns the path to the executable defined by Command
@ -57,15 +38,86 @@ func (f *Formatter) Executable() string {
return f.executable
}
func (f *Formatter) Apply(ctx context.Context, paths []string, filter bool) error {
start := time.Now()
// construct args, starting with config
args := f.config.Options
// If filter is true it indicates we are executing as part of a pipeline.
// In such a scenario each formatter must sub filter the paths provided as different formatters might want different
// files in a pipeline.
if filter {
// reset the batch
f.batch = f.batch[:0]
// filter paths
for _, path := range paths {
if f.Wants(path) {
f.batch = append(f.batch, path)
}
}
// exit early if nothing to process
if len(f.batch) == 0 {
return nil
}
// append paths to the args
args = append(args, f.batch...)
} else {
// exit early if nothing to process
if len(paths) == 0 {
return nil
}
// append paths to the args
args = append(args, paths...)
}
// execute the command
cmd := exec.CommandContext(ctx, f.config.Command, args...)
cmd.Dir = f.workingDir
if out, err := cmd.CombinedOutput(); err != nil {
if len(out) > 0 {
_, _ = fmt.Fprintf(os.Stderr, "%s error:\n%s\n", f.name, out)
}
return fmt.Errorf("%w: formatter %s failed to apply", err, f.name)
}
//
f.log.Infof("%v files processed in %v", len(paths), time.Now().Sub(start))
return nil
}
// Wants is used to test if a Formatter wants a path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string) bool {
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
return match
}
// NewFormatter is used to create a new Formatter.
func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.Glob) (*Formatter, error) {
func NewFormatter(
name string,
treeRoot string,
config *config.Formatter,
globalExcludes []glob.Glob,
) (*Formatter, error) {
var err error
f := Formatter{}
// capture the name from the config file
// capture config and the formatter's name
f.name = name
f.config = config
f.before = config.Before
f.workingDir = treeRoot
// test if the formatter is available
executable, err := exec.LookPath(config.Command)
@ -77,11 +129,11 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
f.executable = executable
// initialise internal state
f.log = log.WithPrefix("format | " + name)
f.batchSize = 1024
f.batch = make([]string, 0, f.batchSize)
f.inboxCh = make(chan string, f.batchSize)
f.completedCh = make(chan interface{}, 1)
if config.Pipeline == "" {
f.log = log.WithPrefix(fmt.Sprintf("format | %s", name))
} else {
f.log = log.WithPrefix(fmt.Sprintf("format | %s[%s]", config.Pipeline, name))
}
f.includes, err = CompileGlobs(config.Includes)
if err != nil {
@ -96,140 +148,3 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
return &f, nil
}
func (f *Formatter) SetParent(formatter *Formatter) {
f.parent = formatter
}
func (f *Formatter) Parent() *Formatter {
return f.parent
}
func (f *Formatter) SetChild(formatter *Formatter) {
f.child = formatter
}
// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string) bool {
if f.parent != nil {
// we don't accept this path directly, our parent will forward it
return false
}
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
return match
}
// Put add path into this Formatter's inboxCh for processing.
func (f *Formatter) Put(path string) {
f.inboxCh <- path
}
// Run is the main processing loop for this Formatter.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) Run(ctx context.Context) (err error) {
defer func() {
if f.child != nil {
// indicate no further processing for the child formatter
f.child.Close()
}
// indicate this formatter has finished processing
f.completedCh <- nil
}()
LOOP:
// keep processing until ctx has been cancelled or inboxCh has been closed
for {
select {
case <-ctx.Done():
// ctx has been cancelled
err = ctx.Err()
break LOOP
case path, ok := <-f.inboxCh:
// check if the inboxCh has been closed
if !ok {
break LOOP
}
// add path to the current batch
f.batch = append(f.batch, path)
if len(f.batch) == f.batchSize {
// drain immediately
if err := f.apply(ctx); err != nil {
break LOOP
}
}
}
}
// check if LOOP was exited due to an error
if err != nil {
return
}
// processing any lingering batch
return f.apply(ctx)
}
// apply executes Command against the latest batch of paths.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) apply(ctx context.Context) error {
// empty check
if len(f.batch) == 0 {
return nil
}
// construct args, starting with config
args := f.config.Options
// append each file path
for _, path := range f.batch {
args = append(args, path)
}
// execute
start := time.Now()
cmd := exec.CommandContext(ctx, f.config.Command, args...)
if out, err := cmd.CombinedOutput(); err != nil {
f.log.Debugf("\n%v", string(out))
// todo log output
return err
}
f.log.Infof("%v files processed in %v", len(f.batch), time.Now().Sub(start))
if f.child == nil {
// mark each path in this batch as completed
for _, path := range f.batch {
MarkPathComplete(ctx, path)
}
} else {
// otherwise forward each path onto the next formatter for processing
for _, path := range f.batch {
f.child.Put(path)
}
}
// reset batch
f.batch = f.batch[:0]
return nil
}
// Close is used to indicate that a Formatter should process any remaining paths and then stop it's processing loop.
func (f *Formatter) Close() {
close(f.inboxCh)
}
func (f *Formatter) AwaitCompletion() {
// todo support a timeout
<-f.completedCh
}

38
format/pipeline.go Normal file
View File

@ -0,0 +1,38 @@
package format
import (
"context"
"slices"
)
type Pipeline struct {
sequence []*Formatter
}
func (p *Pipeline) Add(f *Formatter) {
p.sequence = append(p.sequence, f)
// sort by priority in ascending order
slices.SortFunc(p.sequence, func(a, b *Formatter) int {
return a.config.Priority - b.config.Priority
})
}
func (p *Pipeline) Wants(path string) bool {
var match bool
for _, f := range p.sequence {
match = f.Wants(path)
if match {
break
}
}
return match
}
func (p *Pipeline) Apply(ctx context.Context, paths []string) error {
for _, f := range p.sequence {
if err := f.Apply(ctx, paths, len(p.sequence) > 1); err != nil {
return err
}
}
return nil
}

View File

@ -25,6 +25,7 @@
# golang
go
delve
graphviz
]
++
# include formatters for development and testing

View File

@ -6,6 +6,7 @@ with pkgs; [
haskellPackages.cabal-fmt
haskellPackages.ormolu
mdsh
nixpkgs-fmt
nodePackages.prettier
python3.pkgs.black
rufo
@ -15,4 +16,17 @@ with pkgs; [
statix
deadnix
terraform
# util for unit testing
(pkgs.writeShellApplication {
name = "test-fmt";
text = ''
VALUE="$1"
shift
# append value to each file
for FILE in "$@"; do
echo "$VALUE" >> "$FILE"
done
'';
})
]

View File

@ -0,0 +1,13 @@
# One CLI to format the code tree - https://git.numtide.com/numtide/treefmt
[formatter.deadnix]
command = "deadnix"
includes = ["*.nix"]
pipeline = "nix"
priority = 1
[formatter.nixpkgs-fmt]
command = "nixpkgs-fmt"
includes = ["*.nix"]
pipeline = "nix"
priority = 2

View File

@ -31,12 +31,14 @@ command = "alejandra"
includes = ["*.nix"]
# Act as an example on how to exclude specific files
excludes = ["examples/nix/sources.nix"]
# Make this run before deadnix
# Note this formatter determines the file set for any 'downstream' formatters
before = "deadnix"
pipeline = "nix"
priority = 1
[formatter.deadnix]
command = "deadnix"
includes = ["*.nix"]
pipeline = "nix"
priority = 2
[formatter.ruby]
command = "rufo"