wip: better pipeline processing

Signed-off-by: Brian McGee <brian@bmcgee.ie>
This commit is contained in:
Brian McGee 2024-04-19 10:57:41 +01:00
parent 8333c99ebf
commit 832e9141d0
Signed by: brianmcgee
GPG Key ID: D49016E76AD1E8C0
2 changed files with 182 additions and 227 deletions

View File

@ -7,14 +7,12 @@ import (
"fmt" "fmt"
"io/fs" "io/fs"
"os" "os"
"os/signal"
"strings" "strings"
"syscall"
"time" "time"
"git.numtide.com/numtide/treefmt/cache" "git.numtide.com/numtide/treefmt/cache"
"git.numtide.com/numtide/treefmt/config" "git.numtide.com/numtide/treefmt/config"
format2 "git.numtide.com/numtide/treefmt/format" "git.numtide.com/numtide/treefmt/format"
"git.numtide.com/numtide/treefmt/walk" "git.numtide.com/numtide/treefmt/walk"
"github.com/charmbracelet/log" "github.com/charmbracelet/log"
@ -46,7 +44,7 @@ func (f *Format) Run() error {
return fmt.Errorf("%w: failed to read config file", err) return fmt.Errorf("%w: failed to read config file", err)
} }
globalExcludes, err := format2.CompileGlobs(cfg.Global.Excludes) globalExcludes, err := format.CompileGlobs(cfg.Global.Excludes)
// create optional formatter filter set // create optional formatter filter set
formatterSet := make(map[string]bool) formatterSet := make(map[string]bool)
@ -67,7 +65,7 @@ func (f *Format) Run() error {
} }
} }
formatters := make(map[string]*format2.Formatter) formatters := make(map[string]*format.Formatter)
// detect broken dependencies // detect broken dependencies
for name, formatterCfg := range cfg.Formatters { for name, formatterCfg := range cfg.Formatters {
@ -114,8 +112,8 @@ func (f *Format) Run() error {
continue continue
} }
formatter, err := format2.NewFormatter(name, formatterCfg, globalExcludes) formatter, err := format.NewFormatter(name, formatterCfg, globalExcludes)
if errors.Is(err, format2.ErrCommandNotFound) && Cli.AllowMissingFormatter { if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
l.Debugf("formatter not found: %v", name) l.Debugf("formatter not found: %v", name)
continue continue
} else if err != nil { } else if err != nil {
@ -128,14 +126,7 @@ func (f *Format) Run() error {
// iterate the initialised formatters configuring parent/child relationships // iterate the initialised formatters configuring parent/child relationships
for _, formatter := range formatters { for _, formatter := range formatters {
if formatter.Before() != "" { if formatter.Before() != "" {
child, ok := formatters[formatter.Before()] formatter.SetChild(formatters[formatter.Before()])
if !ok {
// formatter has been filtered out by the user
formatter.ResetBefore()
continue
}
formatter.SetChild(child)
child.SetParent(formatter)
} }
} }
@ -144,27 +135,18 @@ func (f *Format) Run() error {
} }
// //
completedCh := make(chan string, 1024) eg, mainCtx := errgroup.WithContext(ctx)
fg, formatterCtx := errgroup.WithContext(ctx)
ctx = format2.SetCompletedChannel(ctx, completedCh)
//
eg, ctx := errgroup.WithContext(ctx)
// start the formatters
for name := range formatters {
formatter := formatters[name]
eg.Go(func() error {
return formatter.Run(ctx)
})
}
// determine paths to be formatted // determine paths to be formatted
pathsCh := make(chan string, 1024) pathsCh := make(chan string, 1024*10)
processedCh := make(chan string, 1024*10)
batches := make(map[string][]string)
batchSize := 1024
// update cache as paths are completed // update cache as paths are completed
eg.Go(func() error { eg.Go(func() error {
batchSize := 1024
batch := make([]string, 0, batchSize) batch := make([]string, 0, batchSize)
var changes int var changes int
@ -186,9 +168,9 @@ func (f *Format) Run() error {
LOOP: LOOP:
for { for {
select { select {
case <-ctx.Done(): case <-mainCtx.Done():
return ctx.Err() return mainCtx.Err()
case path, ok := <-completedCh: case path, ok := <-processedCh:
if !ok { if !ok {
break LOOP break LOOP
} }
@ -214,33 +196,81 @@ func (f *Format) Run() error {
return nil return nil
}) })
tryApply := func(key string, path string) {
batch, ok := batches[key]
if !ok {
batch = make([]string, 0, batchSize)
}
batch = append(batch, path)
batches[key] = batch
if len(batch) == batchSize {
// todo optimize
first := strings.Split(key, ":")[0]
formatter := formatters[first]
// copy the batch
paths := make([]string, len(batch))
copy(paths, batch)
fg.Go(func() error {
if err := formatter.Apply(formatterCtx, paths); err != nil {
return err
}
for _, path := range paths {
processedCh <- path
}
return nil
})
batches[key] = batch[:0]
}
}
flushBatches := func() {
for _, formatter := range formatters {
key := formatter.BatchKey()
// todo optimize
first := strings.Split(key, ":")[0]
formatter := formatters[first]
batch := batches[key]
if len(batch) > 0 {
fg.Go(func() error {
if err = formatter.Apply(formatterCtx, batch); err != nil {
return fmt.Errorf("%w: formatter failure, %s", err, key)
}
for _, path := range batch {
processedCh <- path
}
return nil
})
}
}
}
eg.Go(func() error { eg.Go(func() error {
// pass paths to each formatter defer func() {
// close processed channel
close(processedCh)
}()
for path := range pathsCh { for path := range pathsCh {
for _, formatter := range formatters { for _, formatter := range formatters {
if formatter.Wants(path) { if !formatter.Wants(path, true) {
formatter.Put(path) continue
} }
tryApply(formatter.BatchKey(), path)
} }
} }
// indicate no more paths for each formatter // flush any partial batches which remain
for _, formatter := range formatters { flushBatches()
if formatter.Parent() != nil {
// this formatter is not a root, it will be closed by a parent // wait for all outstanding formatting tasks to complete
continue if err := fg.Wait(); err != nil {
} return fmt.Errorf("formatter processing failure: %w", err)
formatter.Close()
} }
// await completion
for _, formatter := range formatters {
formatter.AwaitCompletion()
}
// indicate no more completion events
close(completedCh)
return nil return nil
}) })
@ -257,16 +287,16 @@ func (f *Format) Run() error {
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths) walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
if err != nil { if err != nil {
return fmt.Errorf("%w: failed to create walker", err) return fmt.Errorf("failed to create walker: %w", err)
} }
defer close(pathsCh) defer close(pathsCh)
if Cli.NoCache { if Cli.NoCache {
return walker.Walk(ctx, func(path string, info fs.FileInfo, err error) error { return walker.Walk(mainCtx, func(path string, info fs.FileInfo, err error) error {
select { select {
case <-ctx.Done(): case <-mainCtx.Done():
return ctx.Err() return mainCtx.Err()
default: default:
// ignore symlinks and directories // ignore symlinks and directories
if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) { if !(info.IsDir() || info.Mode()&os.ModeSymlink == os.ModeSymlink) {
@ -277,16 +307,19 @@ func (f *Format) Run() error {
}) })
} }
return cache.ChangeSet(ctx, walker, pathsCh) if err = cache.ChangeSet(mainCtx, walker, pathsCh); err != nil {
return fmt.Errorf("failed to generate change set: %w", err)
}
return nil
}) })
// listen for shutdown and call cancel if required //// listen for shutdown and call cancel if required
go func() { //go func() {
exit := make(chan os.Signal, 1) // exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM) // signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
<-exit // <-exit
cancel() // cancel()
}() //}()
return eg.Wait() return eg.Wait()
} }

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os/exec" "os/exec"
"strings"
"time" "time"
"git.numtide.com/numtide/treefmt/config" "git.numtide.com/numtide/treefmt/config"
@ -24,32 +25,17 @@ type Formatter struct {
log *log.Logger log *log.Logger
executable string // path to the executable described by Command executable string // path to the executable described by Command
before string child *Formatter
child *Formatter
parent *Formatter
// internal compiled versions of Includes and Excludes. // internal compiled versions of Includes and Excludes.
includes []glob.Glob includes []glob.Glob
excludes []glob.Glob excludes []glob.Glob
// inboxCh is used to accept new paths for formatting. batch []string
inboxCh chan string
// completedCh is used to wait for this formatter to finish all processing.
completedCh chan interface{}
// Entries from inboxCh are batched according to batchSize and stored in batch for processing when the batchSize has
// been reached or Close is invoked.
batch []string
batchSize int
} }
func (f *Formatter) Before() string { func (f *Formatter) Before() string {
return f.before return f.config.Before
}
func (f *Formatter) ResetBefore() {
f.before = ""
} }
// Executable returns the path to the executable defined by Command // Executable returns the path to the executable defined by Command
@ -57,15 +43,92 @@ func (f *Formatter) Executable() string {
return f.executable return f.executable
} }
func (f *Formatter) SetChild(formatter *Formatter) {
f.child = formatter
}
func (f *Formatter) BatchKey() string {
var key []string
node := f
for node != nil {
key = append(key, node.name)
node = node.child
}
return strings.Join(key, ":")
}
func (f *Formatter) Apply(ctx context.Context, paths []string) error {
// filter paths
for _, path := range paths {
if f.Wants(path, false) {
f.batch = append(f.batch, path)
}
}
// only apply if the resultant batch is not empty
if len(f.batch) > 0 {
// construct args, starting with config
args := f.config.Options
// append each file path
for _, path := range f.batch {
args = append(args, path)
}
// execute
start := time.Now()
cmd := exec.CommandContext(ctx, f.config.Command, args...)
if out, err := cmd.CombinedOutput(); err != nil {
f.log.Debugf("\n%v", string(out))
// todo log output
return err
}
f.log.Infof("%v files processed in %v", len(f.batch), time.Now().Sub(start))
}
// if we have a downstream formatter pass the paths onwards
if f.child != nil {
return f.child.Apply(ctx, paths)
}
// reset batch
f.batch = f.batch[:0]
return nil
}
// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string, recursive bool) bool {
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
if recursive && !match && f.child != nil {
// check if the child wants the path
match = f.child.Wants(path, recursive)
}
return match
}
// NewFormatter is used to create a new Formatter. // NewFormatter is used to create a new Formatter.
func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.Glob) (*Formatter, error) { func NewFormatter(
name string,
config *config.Formatter,
globalExcludes []glob.Glob,
) (*Formatter, error) {
var err error var err error
f := Formatter{} f := Formatter{}
// capture the name from the config file
// capture config and the formatter's name
f.name = name f.name = name
f.config = config f.config = config
f.before = config.Before
// test if the formatter is available // test if the formatter is available
executable, err := exec.LookPath(config.Command) executable, err := exec.LookPath(config.Command)
@ -78,10 +141,6 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
// initialise internal state // initialise internal state
f.log = log.WithPrefix("format | " + name) f.log = log.WithPrefix("format | " + name)
f.batchSize = 1024
f.batch = make([]string, 0, f.batchSize)
f.inboxCh = make(chan string, f.batchSize)
f.completedCh = make(chan interface{}, 1)
f.includes, err = CompileGlobs(config.Includes) f.includes, err = CompileGlobs(config.Includes)
if err != nil { if err != nil {
@ -96,140 +155,3 @@ func NewFormatter(name string, config *config.Formatter, globalExcludes []glob.G
return &f, nil return &f, nil
} }
func (f *Formatter) SetParent(formatter *Formatter) {
f.parent = formatter
}
func (f *Formatter) Parent() *Formatter {
return f.parent
}
func (f *Formatter) SetChild(formatter *Formatter) {
f.child = formatter
}
// Wants is used to test if a Formatter wants path based on it's configured Includes and Excludes patterns.
// Returns true if the Formatter should be applied to path, false otherwise.
func (f *Formatter) Wants(path string) bool {
if f.parent != nil {
// we don't accept this path directly, our parent will forward it
return false
}
match := !PathMatches(path, f.excludes) && PathMatches(path, f.includes)
if match {
f.log.Debugf("match: %v", path)
}
return match
}
// Put add path into this Formatter's inboxCh for processing.
func (f *Formatter) Put(path string) {
f.inboxCh <- path
}
// Run is the main processing loop for this Formatter.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) Run(ctx context.Context) (err error) {
defer func() {
if f.child != nil {
// indicate no further processing for the child formatter
f.child.Close()
}
// indicate this formatter has finished processing
f.completedCh <- nil
}()
LOOP:
// keep processing until ctx has been cancelled or inboxCh has been closed
for {
select {
case <-ctx.Done():
// ctx has been cancelled
err = ctx.Err()
break LOOP
case path, ok := <-f.inboxCh:
// check if the inboxCh has been closed
if !ok {
break LOOP
}
// add path to the current batch
f.batch = append(f.batch, path)
if len(f.batch) == f.batchSize {
// drain immediately
if err := f.apply(ctx); err != nil {
break LOOP
}
}
}
}
// check if LOOP was exited due to an error
if err != nil {
return
}
// processing any lingering batch
return f.apply(ctx)
}
// apply executes Command against the latest batch of paths.
// It accepts a context which is used to lookup certain dependencies and for cancellation.
func (f *Formatter) apply(ctx context.Context) error {
// empty check
if len(f.batch) == 0 {
return nil
}
// construct args, starting with config
args := f.config.Options
// append each file path
for _, path := range f.batch {
args = append(args, path)
}
// execute
start := time.Now()
cmd := exec.CommandContext(ctx, f.config.Command, args...)
if out, err := cmd.CombinedOutput(); err != nil {
f.log.Debugf("\n%v", string(out))
// todo log output
return err
}
f.log.Infof("%v files processed in %v", len(f.batch), time.Now().Sub(start))
if f.child == nil {
// mark each path in this batch as completed
for _, path := range f.batch {
MarkPathComplete(ctx, path)
}
} else {
// otherwise forward each path onto the next formatter for processing
for _, path := range f.batch {
f.child.Put(path)
}
}
// reset batch
f.batch = f.batch[:0]
return nil
}
// Close is used to indicate that a Formatter should process any remaining paths and then stop it's processing loop.
func (f *Formatter) Close() {
close(f.inboxCh)
}
func (f *Formatter) AwaitCompletion() {
// todo support a timeout
<-f.completedCh
}