improve caching and tracking #34
149
cli/format.go
149
cli/format.go
@ -64,7 +64,9 @@ func (f *Format) Run() (err error) {
|
|||||||
pipelines = make(map[string]*format.Pipeline)
|
pipelines = make(map[string]*format.Pipeline)
|
||||||
formatters = make(map[string]*format.Formatter)
|
formatters = make(map[string]*format.Formatter)
|
||||||
|
|
||||||
|
// iterate the formatters in lexicographical order
|
||||||
for _, name := range cfg.Names {
|
for _, name := range cfg.Names {
|
||||||
|
// init formatter
|
||||||
formatterCfg := cfg.Formatters[name]
|
formatterCfg := cfg.Formatters[name]
|
||||||
formatter, err := format.NewFormatter(name, Cli.TreeRoot, formatterCfg, globalExcludes)
|
formatter, err := format.NewFormatter(name, Cli.TreeRoot, formatterCfg, globalExcludes)
|
||||||
if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
|
if errors.Is(err, format.ErrCommandNotFound) && Cli.AllowMissingFormatter {
|
||||||
@ -74,8 +76,12 @@ func (f *Format) Run() (err error) {
|
|||||||
return fmt.Errorf("%w: failed to initialise formatter: %v", err, name)
|
return fmt.Errorf("%w: failed to initialise formatter: %v", err, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// store formatter by name
|
||||||
formatters[name] = formatter
|
formatters[name] = formatter
|
||||||
|
|
||||||
|
// If no pipeline is configured, we add the formatter to a nominal pipeline of size 1 with the key being the
|
||||||
|
// formatter's name. If a pipeline is configured, we add the formatter to a pipeline keyed by
|
||||||
|
// 'p:<pipeline_name>' in which it is sorted by priority.
|
||||||
if formatterCfg.Pipeline == "" {
|
if formatterCfg.Pipeline == "" {
|
||||||
pipeline := format.Pipeline{}
|
pipeline := format.Pipeline{}
|
||||||
pipeline.Add(formatter)
|
pipeline.Add(formatter)
|
||||||
@ -110,17 +116,17 @@ func (f *Format) Run() (err error) {
|
|||||||
// initialise stats collection
|
// initialise stats collection
|
||||||
stats.Init()
|
stats.Init()
|
||||||
|
|
||||||
// create some groups for concurrent processing and control flow
|
// create an overall error group for executing high level tasks concurrently
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
// create a channel for paths to be processed
|
// create a channel for files needing to be processed
|
||||||
// we use a multiple of batch size here to allow for greater concurrency
|
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
|
||||||
filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())
|
filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())
|
||||||
|
|
||||||
// create a channel for tracking paths that have been processed
|
// create a channel for files that have been processed
|
||||||
processedCh = make(chan *walk.File, cap(filesCh))
|
processedCh = make(chan *walk.File, cap(filesCh))
|
||||||
|
|
||||||
// start concurrent processing tasks
|
// start concurrent processing tasks in reverse order
|
||||||
eg.Go(updateCache(ctx))
|
eg.Go(updateCache(ctx))
|
||||||
eg.Go(applyFormatters(ctx))
|
eg.Go(applyFormatters(ctx))
|
||||||
eg.Go(walkFilesystem(ctx))
|
eg.Go(walkFilesystem(ctx))
|
||||||
@ -129,15 +135,70 @@ func (f *Format) Run() (err error) {
|
|||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateCache(ctx context.Context) func() error {
|
||||||
|
return func() error {
|
||||||
|
// used to batch updates for more efficient txs
|
||||||
|
batch := make([]*walk.File, 0, BatchSize)
|
||||||
|
|
||||||
|
// apply a batch
|
||||||
|
processBatch := func() error {
|
||||||
|
if err := cache.Update(batch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
batch = batch[:0]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// detect ctx cancellation
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
// respond to processed files
|
||||||
|
case file, ok := <-processedCh:
|
||||||
|
if !ok {
|
||||||
|
// channel has been closed, no further files to process
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
// append to batch and process if we have enough
|
||||||
|
batch = append(batch, file)
|
||||||
|
if len(batch) == BatchSize {
|
||||||
|
if err := processBatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// final flush
|
||||||
|
if err := processBatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
|
||||||
|
if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 {
|
||||||
|
return ErrFailOnChange
|
||||||
|
}
|
||||||
|
|
||||||
|
// print stats to stdout
|
||||||
|
stats.Print()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func walkFilesystem(ctx context.Context) func() error {
|
func walkFilesystem(ctx context.Context) func() error {
|
||||||
return func() error {
|
return func() error {
|
||||||
paths := Cli.Paths
|
paths := Cli.Paths
|
||||||
|
|
||||||
|
// we read paths from stdin if the cli flag has been set and no paths were provided as cli args
|
||||||
if len(paths) == 0 && Cli.Stdin {
|
if len(paths) == 0 && Cli.Stdin {
|
||||||
|
|
||||||
|
// determine the current working directory
|
||||||
cwd, err := os.Getwd()
|
cwd, err := os.Getwd()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%w: failed to determine current working directory", err)
|
return fmt.Errorf("failed to determine current working directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// read in all the paths
|
// read in all the paths
|
||||||
@ -149,17 +210,21 @@ func walkFilesystem(ctx context.Context) func() error {
|
|||||||
path = filepath.Join(cwd, path)
|
path = filepath.Join(cwd, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// append the fully qualified path to our paths list
|
||||||
paths = append(paths, path)
|
paths = append(paths, path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create a filesystem walker
|
||||||
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
|
walker, err := walk.New(Cli.Walk, Cli.TreeRoot, paths)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create walker: %w", err)
|
return fmt.Errorf("failed to create walker: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// close the files channel when we're done walking the file system
|
||||||
defer close(filesCh)
|
defer close(filesCh)
|
||||||
|
|
||||||
|
// if no cache has been configured, we invoke the walker directly
|
||||||
if Cli.NoCache {
|
if Cli.NoCache {
|
||||||
return walker.Walk(ctx, func(file *walk.File, err error) error {
|
return walker.Walk(ctx, func(file *walk.File, err error) error {
|
||||||
select {
|
select {
|
||||||
@ -177,6 +242,8 @@ func walkFilesystem(ctx context.Context) func() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// otherwise we pass the walker to the cache and have it generate files for processing based on whether or not
|
||||||
|
// they have been added/changed since the last invocation
|
||||||
if err = cache.ChangeSet(ctx, walker, filesCh); err != nil {
|
if err = cache.ChangeSet(ctx, walker, filesCh); err != nil {
|
||||||
return fmt.Errorf("failed to generate change set: %w", err)
|
return fmt.Errorf("failed to generate change set: %w", err)
|
||||||
}
|
}
|
||||||
@ -184,69 +251,33 @@ func walkFilesystem(ctx context.Context) func() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateCache(ctx context.Context) func() error {
|
|
||||||
return func() error {
|
|
||||||
batch := make([]*walk.File, 0, BatchSize)
|
|
||||||
|
|
||||||
processBatch := func() error {
|
|
||||||
if err := cache.Update(batch); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
batch = batch[:0]
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
LOOP:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case path, ok := <-processedCh:
|
|
||||||
if !ok {
|
|
||||||
break LOOP
|
|
||||||
}
|
|
||||||
batch = append(batch, path)
|
|
||||||
if len(batch) == BatchSize {
|
|
||||||
if err := processBatch(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// final flush
|
|
||||||
if err := processBatch(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if Cli.FailOnChange && stats.Value(stats.Formatted) != 0 {
|
|
||||||
return ErrFailOnChange
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.Print()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyFormatters(ctx context.Context) func() error {
|
func applyFormatters(ctx context.Context) func() error {
|
||||||
|
// create our own errgroup for concurrent formatting tasks
|
||||||
fg, ctx := errgroup.WithContext(ctx)
|
fg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
// pre-initialise batches keyed by pipeline
|
||||||
batches := make(map[string][]*walk.File)
|
batches := make(map[string][]*walk.File)
|
||||||
|
for key := range pipelines {
|
||||||
tryApply := func(key string, file *walk.File) {
|
batches[key] = make([]*walk.File, 0, BatchSize)
|
||||||
batch, ok := batches[key]
|
|
||||||
if !ok {
|
|
||||||
batch = make([]*walk.File, 0, BatchSize)
|
|
||||||
}
|
}
|
||||||
batch = append(batch, file)
|
|
||||||
batches[key] = batch
|
|
||||||
|
|
||||||
|
// for a given pipeline key, add the provided file to the current batch and trigger a format if the batch size has
|
||||||
|
// been reached
|
||||||
|
tryApply := func(key string, file *walk.File) {
|
||||||
|
// append to batch
|
||||||
|
batches[key] = append(batches[key], file)
|
||||||
|
|
||||||
|
// check if the batch is full
|
||||||
|
batch := batches[key]
|
||||||
if len(batch) == BatchSize {
|
if len(batch) == BatchSize {
|
||||||
|
// get the pipeline
|
||||||
pipeline := pipelines[key]
|
pipeline := pipelines[key]
|
||||||
|
|
||||||
// copy the batch
|
// copy the batch
|
||||||
files := make([]*walk.File, len(batch))
|
files := make([]*walk.File, len(batch))
|
||||||
copy(files, batch)
|
copy(files, batch)
|
||||||
|
|
||||||
|
// apply to the pipeline
|
||||||
fg.Go(func() error {
|
fg.Go(func() error {
|
||||||
if err := pipeline.Apply(ctx, files); err != nil {
|
if err := pipeline.Apply(ctx, files); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -257,10 +288,12 @@ func applyFormatters(ctx context.Context) func() error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// reset the batch
|
||||||
batches[key] = batch[:0]
|
batches[key] = batch[:0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// format any partial batches
|
||||||
flushBatches := func() {
|
flushBatches := func() {
|
||||||
for key, pipeline := range pipelines {
|
for key, pipeline := range pipelines {
|
||||||
|
|
||||||
@ -287,6 +320,7 @@ func applyFormatters(ctx context.Context) func() error {
|
|||||||
close(processedCh)
|
close(processedCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// iterate the files channel, checking if any pipeline wants it, and attempting to apply if so.
|
||||||
for file := range filesCh {
|
for file := range filesCh {
|
||||||
var matched bool
|
var matched bool
|
||||||
for key, pipeline := range pipelines {
|
for key, pipeline := range pipelines {
|
||||||
@ -299,6 +333,7 @@ func applyFormatters(ctx context.Context) func() error {
|
|||||||
if matched {
|
if matched {
|
||||||
stats.Add(stats.Matched, 1)
|
stats.Add(stats.Matched, 1)
|
||||||
} else {
|
} else {
|
||||||
|
// no match, so we send it direct to the processed channel
|
||||||
processedCh <- file
|
processedCh <- file
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user