feat: introduce concept of pipelines for better concurrency

Replaces the `Before` config option with an optional `Pipeline` key.

This is used to group formatters together in the order in which they are specified within the config file.

Signed-off-by: Brian McGee <brian@bmcgee.ie>
This commit is contained in:
Brian McGee 2024-04-19 10:57:41 +01:00
parent 8333c99ebf
commit 8af5b3c076
Signed by: brianmcgee
GPG Key ID: D49016E76AD1E8C0
6 changed files with 267 additions and 410 deletions

View File

@ -8,23 +8,38 @@ import (
"io/fs"
"os"
"os/signal"
"strings"
"slices"
"syscall"
"time"
"git.numtide.com/numtide/treefmt/format"
"github.com/gobwas/glob"
"git.numtide.com/numtide/treefmt/cache"
"git.numtide.com/numtide/treefmt/config"
format2 "git.numtide.com/numtide/treefmt/format"
"git.numtide.com/numtide/treefmt/walk"
"github.com/charmbracelet/log"
"golang.org/x/sync/errgroup"
)
var ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
const (
BatchSize = 1024
)
func (f *Format) Run() error {
start := time.Now()
var (
start time.Time
globalExcludes []glob.Glob
formatters map[string]*format.Formatter
pipelines map[string]*format.Pipeline
pathsCh chan string
processedCh chan string
ErrFailOnChange = errors.New("unexpected changes detected, --fail-on-change is enabled")
)
func (f *Format) Run() (err error) {
start = time.Now()
Cli.Configure()
@ -36,86 +51,40 @@ func (f *Format) Run() error {
}
}()
// create an overall context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// read config
cfg, err := config.ReadFile(Cli.ConfigFile)
if err != nil {
return fmt.Errorf("%w: failed to read config file", err)
}
globalExcludes, err := format2.CompileGlobs(cfg.Global.Excludes)
// create optional formatter filter set
formatterSet := make(map[string]bool)
for _, name := range Cli.Formatters {
_, ok := cfg.Formatters[name]
if !ok {
return fmt.Errorf("%w: formatter not found in config: %v", err, name)
}
formatterSet[name] = true
if globalExcludes, err = format.CompileGlobs(cfg.Global.Excludes); err != nil {
return fmt.Errorf("%w: failed to compile global globs", err)
}
includeFormatter := func(name string) bool {
if len(formatterSet) == 0 {
return true
} else {
_, include := formatterSet[name]
return include
}
}
pipelines = make(map[string]*format.Pipeline)
formatters = make(map[string]*format.Formatter)
formatters := make(map[string]*format2.Formatter)
// detect broken dependencies
for name, formatterCfg := range cfg.Formatters {
before := formatterCfg.Before
if before != "" {
// check child formatter exists
_, ok := cfg.Formatters[before]
// filter formatters
if len(Cli.Formatters) > 0 {
// first check the cli formatter list is valid
for _, name := range Cli.Formatters {
_, ok := cfg.Formatters[name]
if !ok {
return fmt.Errorf("formatter %v is before %v but config for %v was not found", name, before, before)
return fmt.Errorf("formatter not found in config: %v", name)
}
}
}
// dependency cycle detection
for name, formatterCfg := range cfg.Formatters {
var ok bool
var history []string
childName := name
for {
// add to history
history = append(history, childName)
if formatterCfg.Before == "" {
break
} else if formatterCfg.Before == name {
return fmt.Errorf("formatter cycle detected %v", strings.Join(history, " -> "))
}
// load child config
childName = formatterCfg.Before
formatterCfg, ok = cfg.Formatters[formatterCfg.Before]
if !ok {
return fmt.Errorf("formatter not found: %v", formatterCfg.Before)
// next we remove any formatter configs that were not specified
for name := range cfg.Formatters {
if !slices.Contains(Cli.Formatters, name) {
delete(cfg.Formatters, name)
}
}
}
// init formatters
for name, formatterCfg := range cfg.Formatters {
if !includeFormatter(name) {
// remove this formatter
delete(cfg.Formatters, name)
l.Debugf("formatter %v is not in formatter list %v, skipping", name, Cli.Formatters)
continue
}
formatter, err := format2.NewFormatter(name, formatterCfg, globalExcludes)
if errors.Is(err, format2.ErrCommandNotFound) && Cli.AllowMissingFormatter {
formatter, err := format.NewFormatter(name, formatterCfg, globalExcludes)
if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
l.Debugf("formatter not found: %v", name)
continue
} else if err != nil {
@ -123,49 +92,101 @@ func (f *Format) Run() error {
}
formatters[name] = formatter
}
// iterate the initialised formatters configuring parent/child relationships
for _, formatter := range formatters {
if formatter.Before() != "" {
child, ok := formatters[formatter.Before()]
if formatterCfg.Pipeline == "" {
pipeline := format.Pipeline{}
pipeline.Add(formatter)
pipelines[name] = &pipeline
} else {
key := fmt.Sprintf("p:%s", formatterCfg.Pipeline)
pipeline, ok := pipelines[key]
if !ok {
// formatter has been filtered out by the user
formatter.ResetBefore()
continue
pipeline = &format.Pipeline{}
pipelines[key] = pipeline
}
formatter.SetChild(child)
child.SetParent(formatter)
pipeline.Add(formatter)
}
}
// open the cache
if err = cache.Open(Cli.TreeRoot, Cli.ClearCache, formatters); err != nil {
return err
}
//
completedCh := make(chan string, 1024)
// create an app context and listen for shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = format2.SetCompletedChannel(ctx, completedCh)
go func() {
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
cancel()
}()
//
// create some groups for concurrent processing and control flow
eg, ctx := errgroup.WithContext(ctx)
// start the formatters
for name := range formatters {
formatter := formatters[name]
eg.Go(func() error {
return formatter.Run(ctx)
})
// create a channel for paths to be processed
// we use a multiple of batch size here to allow for greater concurrency
pathsCh = make(chan string, 10*BatchSize)
// create a channel for tracking paths that have been processed
processedCh = make(chan string, cap(pathsCh))
// start concurrent processing tasks
eg.Go(updateCache(ctx))
eg.Go(applyFormatters(ctx))
eg.Go(walkFilesystem(ctx))
// wait for everything to complete
return eg.Wait()
}
func walkFilesystem(ctx context.Context) func() error {
return func() error {
paths := Cli.Paths
if len(paths) == 0 && Cli.Stdin {
// read in all the paths
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, scanner.Text())
}
}
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
if err != nil {
return fmt.Errorf("failed to create walker: %w", err)
}
defer close(pathsCh)
if Cli.NoCache {
return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// ignore symlinks and directories
if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) {
pathsCh <- path
}
return nil
}
})
}
if err = cache.ChangeSet(ctx, walker, pathsCh); err != nil {
return fmt.Errorf("failed to generate change set: %w", err)
}
return nil
}
}
// determine paths to be formatted
pathsCh := make(chan string, 1024)
// update cache as paths are completed
eg.Go(func() error {
batchSize := 1024
batch := make([]string, 0, batchSize)
func updateCache(ctx context.Context) func() error {
return func() error {
batch := make([]string, 0, BatchSize)
var changes int
@ -188,13 +209,13 @@ func (f *Format) Run() error {
select {
case <-ctx.Done():
return ctx.Err()
case path, ok := <-completedCh:
case path, ok := <-processedCh:
if !ok {
break LOOP
}
batch = append(batch, path)
if len(batch) == batchSize {
if err = processBatch(); err != nil {
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
@ -202,7 +223,7 @@ func (f *Format) Run() error {
}
// final flush
if err = processBatch(); err != nil {
if err := processBatch(); err != nil {
return err
}
@ -212,81 +233,84 @@ func (f *Format) Run() error {
fmt.Printf("%v files changed in %v\n", changes, time.Now().Sub(start))
return nil
})
}
}
eg.Go(func() error {
// pass paths to each formatter
for path := range pathsCh {
for _, formatter := range formatters {
if formatter.Wants(path) {
formatter.Put(path)
func applyFormatters(ctx context.Context) func() error {
fg, ctx := errgroup.WithContext(ctx)
batches := make(map[string][]string)
tryApply := func(key string, path string) {
batch, ok := batches[key]
if !ok {
batch = make([]string, 0, BatchSize)
}
batch = append(batch, path)
batches[key] = batch
if len(batch) == BatchSize {
pipeline := pipelines[key]
// copy the batch
paths := make([]string, len(batch))
copy(paths, batch)
fg.Go(func() error {
if err := pipeline.Apply(ctx, paths); err != nil {
return err
}
}
for _, path := range paths {
processedCh <- path
}
return nil
})
batches[key] = batch[:0]
}
}
// indicate no more paths for each formatter
for _, formatter := range formatters {
if formatter.Parent() != nil {
// this formatter is not a root, it will be closed by a parent
continue
}
formatter.Close()
}
flushBatches := func() {
for key, pipeline := range pipelines {
// await completion
for _, formatter := range formatters {
formatter.AwaitCompletion()
}
batch := batches[key]
pipeline := pipeline // capture for closure
// indicate no more completion events
close(completedCh)
return nil
})
eg.Go(func() (err error) {
paths := Cli.Paths
if len(paths) == 0 && Cli.Stdin {
// read in all the paths
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, scanner.Text())
}
}
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
if err != nil {
return fmt.Errorf("%w: failed to create walker", err)
}
defer close(pathsCh)
if Cli.NoCache {
return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// ignore symlinks and directories
if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) {
pathsCh <- path
if len(batch) > 0 {
fg.Go(func() error {
if err := pipeline.Apply(ctx, batch); err != nil {
return fmt.Errorf("%w: pipeline failure, %s", err, key)
}
for _, path := range batch {
processedCh <- path
}
return nil
})
}
}
}
return func() error {
defer func() {
// close processed channel
close(processedCh)
}()
for path := range pathsCh {
for key, pipeline := range pipelines {
if !pipeline.Wants(path) {
continue
}
})
tryApply(key, path)
}
}
return cache.ChangeSet(ctx, walker, pathsCh)
})
// flush any partial batches which remain
flushBatches()
// listen for shutdown and call cancel if required
go func() {
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit
cancel()
}()
return eg.Wait()
// wait for all outstanding formatting tasks to complete
if err := fg.Wait(); err != nil {
return fmt.Errorf("pipeline processing failure: %w", err)
}
return nil
}
}

View File

@ -41,27 +41,6 @@ func TestAllowMissingFormatter(t *testing.T) {
as.NoError(err)
}
func TestDependencyCycle(t *testing.T) {
as := require.New(t)
tempDir := t.TempDir()
configPath := tempDir + "/treefmt.toml"
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"a": {Command: "echo", Before: "b"},
"b": {Command: "echo", Before: "c"},
"c": {Command: "echo", Before: "a"},
"d": {Command: "echo", Before: "e"},
"e": {Command: "echo", Before: "f"},
"f": {Command: "echo"},
},
})
_, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.ErrorContains(err, "formatter cycle detected")
}
func TestSpecifyingFormatters(t *testing.T) {
as := require.New(t)
@ -451,60 +430,6 @@ func TestGitWorktree(t *testing.T) {
as.Contains(string(out), fmt.Sprintf("%d files changed", 57))
}
func TestOrderingFormatters(t *testing.T) {
as := require.New(t)
tempDir := test.TempExamples(t)
configPath := path.Join(tempDir, "treefmt.toml")
// missing child
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"hs-a": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-b",
},
},
})
out, err := cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.ErrorContains(err, "formatter hs-a is before hs-b but config for hs-b was not found")
// multiple roots
test.WriteConfig(t, configPath, config2.Config{
Formatters: map[string]*config2.Formatter{
"hs-a": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-b",
},
"hs-b": {
Command: "echo",
Includes: []string{"*.hs"},
Before: "hs-c",
},
"hs-c": {
Command: "echo",
Includes: []string{"*.hs"},
},
"py-a": {
Command: "echo",
Includes: []string{"*.py"},
Before: "py-b",
},
"py-b": {
Command: "echo",
Includes: []string{"*.py"},
},
},
})
out, err = cmd(t, "--config-file", configPath, "--tree-root", tempDir)
as.NoError(err)
as.Contains(string(out), "8 files changed")
}
func TestPathsArg(t *testing.T) {
as := require.New(t)

View File

@ -9,6 +9,6 @@ type Formatter struct {
Includes []string
// Excludes is an optional list of glob patterns used to exclude certain files from this Formatter.
Excludes []string
// Before is the name of another formatter which must process a path after this one
Before string
//
Pipeline string
}

View File

@ -24,32 +24,9 @@ type Formatter struct {
log *log.Logger
executable string // path to the executable described by Command
before string
child *Formatter
parent *Formatter
// internal compiled versions of Includes and Excludes.
includes []glob.Glob
excludes []glob.Glob
// inboxCh is used to accept new paths for formatting.
inboxCh chan string
// completedCh is used to wait for this formatter to finish all processing.
completedCh chan interface{}
// Entries from inboxCh are batched according to batchSize and stored in batch for processing when the batchSize has
// been reached or Close is invoked.
batch []string
batchSize int
}
func (f *Formatter) Before() string {
return f.before
}
func (f *Formatter) ResetBefore() {
f.before = ""
}
// Executable returns the path to the executable defined by Command
@ -57,15 +34,56 @@ func (f *Formatter) Executable() string {
return f.executable
}
func (f *Formatter) Apply(ctx context.Context, paths []string) error {
// only apply if the resultant batch is not empty
if len(paths) > 0 {
// construct args, starting with config
args := f.config.Options
// append each file path
for _, path := range paths {
args = append(args, path)
}
// execute
start := time.Now()
cmd := exec.CommandContext(ctx, f.config.Command, args...)
if out, err := cmd.CombinedOutput(); err != nil {
f.log.Debugf("\n%v", string(out))
// todo log output
return err
}
f.log.Infof("%v files processed in %v", len(paths), time.Now().Sub(start))
}
return nil
}
// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string) bool {
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
return match
}
// NewFormatter is used to create a new Formatter.
func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.Glob) (*Formatter, error) {
func NewFormatter(
name string,
config *config.Formatter,
globalExcludes []glob.Glob,
) (*Formatter, error) {
var err error
f := Formatter{}
// capture the name from the config file
// capture config and the formatter's name
f.name = name
f.config = config
f.before = config.Before
// test if the formatter is available
executable, err := exec.LookPath(config.Command)
@ -78,10 +96,6 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
// initialise internal state
f.log = log.WithPrefix("format | " + name)
f.batchSize = 1024
f.batch = make([]string, 0, f.batchSize)
f.inboxCh = make(chan string, f.batchSize)
f.completedCh = make(chan interface{}, 1)
f.includes, err = CompileGlobs(config.Includes)
if err != nil {
@ -96,140 +110,3 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
return &f, nil
}
func (f *Formatter) SetParent(formatter *Formatter) {
f.parent = formatter
}
func (f *Formatter) Parent() *Formatter {
return f.parent
}
func (f *Formatter) SetChild(formatter *Formatter) {
f.child = formatter
}
// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string) bool {
if f.parent != nil {
// we don't accept this path directly, our parent will forward it
return false
}
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
return match
}
// Put add path into this Formatter's inboxCh for processing.
func (f *Formatter) Put(path string) {
f.inboxCh <- path
}
// Run is the main processing loop for this Formatter.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) Run(ctx context.Context) (err error) {
defer func() {
if f.child != nil {
// indicate no further processing for the child formatter
f.child.Close()
}
// indicate this formatter has finished processing
f.completedCh <- nil
}()
LOOP:
// keep processing until ctx has been cancelled or inboxCh has been closed
for {
select {
case <-ctx.Done():
// ctx has been cancelled
err = ctx.Err()
break LOOP
case path, ok := <-f.inboxCh:
// check if the inboxCh has been closed
if !ok {
break LOOP
}
// add path to the current batch
f.batch = append(f.batch, path)
if len(f.batch) == f.batchSize {
// drain immediately
if err := f.apply(ctx); err != nil {
break LOOP
}
}
}
}
// check if LOOP was exited due to an error
if err != nil {
return
}
// processing any lingering batch
return f.apply(ctx)
}
// apply executes Command against the latest batch of paths.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) apply(ctx context.Context) error {
// empty check
if len(f.batch) == 0 {
return nil
}
// construct args, starting with config
args := f.config.Options
// append each file path
for _, path := range f.batch {
args = append(args, path)
}
// execute
start := time.Now()
cmd := exec.CommandContext(ctx, f.config.Command, args...)
if out, err := cmd.CombinedOutput(); err != nil {
f.log.Debugf("\n%v", string(out))
// todo log output
return err
}
f.log.Infof("%v files processed in %v", len(f.batch), time.Now().Sub(start))
if f.child == nil {
// mark each path in this batch as completed
for _, path := range f.batch {
MarkPathComplete(ctx, path)
}
} else {
// otherwise forward each path onto the next formatter for processing
for _, path := range f.batch {
f.child.Put(path)
}
}
// reset batch
f.batch = f.batch[:0]
return nil
}
// Close is used to indicate that a Formatter should process any remaining paths and then stop it's processing loop.
func (f *Formatter) Close() {
close(f.inboxCh)
}
func (f *Formatter) AwaitCompletion() {
// todo support a timeout
<-f.completedCh
}

31
format/pipeline.go Normal file
View File

@ -0,0 +1,31 @@
package format
import "context"
type Pipeline struct {
sequence []*Formatter
}
func (p *Pipeline) Add(f *Formatter) {
p.sequence = append(p.sequence, f)
}
func (p *Pipeline) Wants(path string) bool {
var match bool
for _, f := range p.sequence {
match = f.Wants(path)
if match {
break
}
}
return match
}
func (p *Pipeline) Apply(ctx context.Context, paths []string) error {
for _, f := range p.sequence {
if err := f.Apply(ctx, paths); err != nil {
return err
}
}
return nil
}

View File

@ -31,12 +31,12 @@ command = "alejandra"
includes = ["*.nix"]
# Act as an example on how to exclude specific files
excludes = ["examples/nix/sources.nix"]
# Make this run before deadnix
# Note this formatter determines the file set for any 'downstream' formatters
before = "deadnix"
pipeline = "nix"
[formatter.deadnix]
command = "deadnix"
includes = ["*.nix"]
pipeline = "nix"
[formatter.ruby]
command = "rufo"