From c720e414ac22eaa526a01bfda0b0a209295dcad4 Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Thu, 2 May 2024 11:28:22 +0100 Subject: [PATCH] chore: some cleanup and commenting Signed-off-by: Brian McGee --- cli/format.go | 147 +++++++++++++++++++++++++++++++------------------- 1 file changed, 91 insertions(+), 56 deletions(-) diff --git a/cli/format.go b/cli/format.go index ae32234..6d94687 100644 --- a/cli/format.go +++ b/cli/format.go @@ -64,7 +64,9 @@ func (f *Format) Run() (err error) { pipelines = make(map[string]*format.Pipeline) formatters = make(map[string]*format.Formatter) + // iterate the formatters in lexicographical order for _, name := range cfg.Names { + // init formatter formatterCfg := cfg.Formatters[name] formatter, err := format.NewFormatter(name, Cli.TreeRoot, formatterCfg, globalExcludes) if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter { @@ -74,8 +76,12 @@ func (f *Format) Run() (err error) { return fmt.Errorf("%w: failed to initialise formatter: %v", err, name) } + // store formatter by name formatters[name] = formatter + // If no pipeline is configured, we add the formatter to a nominal pipeline of size 1 with the key being the + // formatter's name. If a pipeline is configured, we add the formatter to a pipeline keyed by + // 'p:' in which it is sorted by priority. if formatterCfg.Pipeline == "" { pipeline := format.Pipeline{} pipeline.Add(formatter) @@ -110,17 +116,17 @@ func (f *Format) Run() (err error) { // initialise stats collection stats.Init() - // create some groups for concurrent processing and control flow + // create an overall error group for executing high level tasks concurrently eg, ctx := errgroup.WithContext(ctx) - // create a channel for paths to be processed - // we use a multiple of batch size here to allow for greater concurrency + // create a channel for files needing to be processed + // we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU()) - // create a channel for tracking paths that have been processed + // create a channel for files that have been processed processedCh = make(chan *walk.File, cap(filesCh)) - // start concurrent processing tasks + // start concurrent processing tasks in reverse order eg.Go(updateCache(ctx)) eg.Go(applyFormatters(ctx)) eg.Go(walkFilesystem(ctx)) @@ -129,15 +135,70 @@ func (f *Format) Run() (err error) { return eg.Wait() } +func updateCache(ctx context.Context) func() error { + return func() error { + // used to batch updates for more efficient txs + batch := make([]*walk.File, 0, BatchSize) + + // apply a batch + processBatch := func() error { + if err := cache.Update(batch); err != nil { + return err + } + batch = batch[:0] + return nil + } + + LOOP: + for { + select { + // detect ctx cancellation + case <-ctx.Done(): + return ctx.Err() + // respond to processed files + case file, ok := <-processedCh: + if !ok { + // channel has been closed, no further files to process + break LOOP + } + // append to batch and process if we have enough + batch = append(batch, file) + if len(batch) == BatchSize { + if err := processBatch(); err != nil { + return err + } + } + } + } + + // final flush + if err := processBatch(); err != nil { + return err + } + + // if fail on change has been enabled, check that no files were actually formatted, throwing an error if so + if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 { + return ErrFailOnChange + } + + // print stats to stdout + stats.Print() + + return nil + } +} + func walkFilesystem(ctx context.Context) func() error { return func() error { paths := Cli.Paths + // we read paths from stdin if the cli flag has been set and no paths were provided as cli args if len(paths) == 0 && Cli.Stdin { + // determine the current working directory cwd, err := os.Getwd() if err != nil { - return fmt.Errorf("%w: failed to determine current working directory", err) + return fmt.Errorf("failed to determine current working directory: %w", err) } // read in all the paths @@ -149,17 +210,21 @@ func walkFilesystem(ctx context.Context) func() error { path = filepath.Join(cwd, path) } + // append the fully qualified path to our paths list paths = append(paths, path) } } + // create a filesystem walker walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths) if err != nil { return fmt.Errorf("failed to create walker: %w", err) } + // close the files channel when we're done walking the file system defer close(filesCh) + // if no cache has been configured, we invoke the walker directly if Cli.NoCache { return walker.Walk(ctx, func(file *walk.File, err error) error { select { @@ -177,6 +242,8 @@ func walkFilesystem(ctx context.Context) func() error { }) } + // otherwise we pass the walker to the cache and have it generate files for processing based on whether or not + // they have been added/changed since the last invocation if err = cache.ChangeSet(ctx, walker, filesCh); err != nil { return fmt.Errorf("failed to generate change set: %w", err) } @@ -184,69 +251,33 @@ func walkFilesystem(ctx context.Context) func() error { } } -func updateCache(ctx context.Context) func() error { - return func() error { - batch := make([]*walk.File, 0, BatchSize) - - processBatch := func() error { - if err := cache.Update(batch); err != nil { - return err - } - batch = batch[:0] - return nil - } - - LOOP: - for { - select { - case <-ctx.Done(): - return ctx.Err() - case path, ok := <-processedCh: - if !ok { - break LOOP - } - batch = append(batch, path) - if len(batch) == BatchSize { - if err := processBatch(); err != nil { - return err - } - } - } - } - - // final flush - if err := processBatch(); err != nil { - return err - } - - if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 { - return ErrFailOnChange - } - - stats.Print() - return nil - } -} - func applyFormatters(ctx context.Context) func() error { + // create our own errgroup for concurrent formatting tasks fg, ctx := errgroup.WithContext(ctx) + + // pre-initialise batches keyed by pipeline batches := make(map[string][]*walk.File) + for key := range pipelines { + batches[key] = make([]*walk.File, 0, BatchSize) + } + // for a given pipeline key, add the provided file to the current batch and trigger a format if the batch size has + // been reached tryApply := func(key string, file *walk.File) { - batch, ok := batches[key] - if !ok { - batch = make([]*walk.File, 0, BatchSize) - } - batch = append(batch, file) - batches[key] = batch + // append to batch + batches[key] = append(batches[key], file) + // check if the batch is full + batch := batches[key] if len(batch) == BatchSize { + // get the pipeline pipeline := pipelines[key] // copy the batch files := make([]*walk.File, len(batch)) copy(files, batch) + // apply to the pipeline fg.Go(func() error { if err := pipeline.Apply(ctx, files); err != nil { return err @@ -257,10 +288,12 @@ func applyFormatters(ctx context.Context) func() error { return nil }) + // reset the batch batches[key] = batch[:0] } } + // format any partial batches flushBatches := func() { for key, pipeline := range pipelines { @@ -287,6 +320,7 @@ func applyFormatters(ctx context.Context) func() error { close(processedCh) }() + // iterate the files channel, checking if any pipeline wants it, and attempting to apply if so. for file := range filesCh { var matched bool for key, pipeline := range pipelines { @@ -299,6 +333,7 @@ func applyFormatters(ctx context.Context) func() error { if matched { stats.Add(stats.Matched, 1) } else { + // no match, so we send it direct to the processed channel processedCh <- file } }