Learning GO: Limit #concurrency in #golang

By | August 26, 2018

Concurrency is easy to implement and fun in go, but as we know all things that are easy and fun can end up doing a lot of dammage if we are not careful.
The way the concurrency was implemented in my previous post is prone to abuse.
What if the for loop iterates over a huge or unlimited sequence of inputs ?
In case we iterate over million of items or we get the data from a channel that is fed by a previous processing stage through a messaging system for ex. (rabbitmq, activemq etc.) we may end up with our code spawning and allocating resources for millions of go-routines until our resources are exhausted.
In general concurrency is good until we end up using all the available resources (CPU threads and memory) but becomes very bad as soon as we approach our resource limit.
As everything in life moderation is the answer, so we need to limit the concurrency if possible.

This can be done very easy in golang by changing the method defined in the previous post as follows:

func BusinessValidate(inputData <-chan Item, limit int) error {
	businessErrors := make([]string, 0)
	mux := &sync.Mutex{}
	var wg sync.WaitGroup
	wg.Add(limit)
	for x := 0; x < limit; x++ {
		go func() {
			defer wg.Done()
			for item := range inputData {
				err = ValidateItem(closureInvoice)
				if err != nil {
					mux.Lock()
					businessErrors = append(businessErrors, err.Error())
					mux.Unlock()
				}
			}
		}()
	}
	wg.Wait()
	if len(businessErrors) > 0 {
		return errors.New(strings.Join(businessErrors, "\n"))
	}
	return nil
}

The major changes are that we start with an initial delta on the wait group set to “limit”. This will in fact allocate “limit” tokens that will allow us to control how many go-routines will run at one time.

Another difference is that wg.Wait() call is moved outside the both for loops. We will see that the usage of the wait is different this time.

The way the function works is like this:

1. The first loop at line 6 will spawn a “limit” of go-routines.
2. The loop inside the go-routines will run forever until the input channel inputData is closed. Each of the “limit” go-routines will read messages from that channel and process them.
3. Errors are accumulated in the businessErrors container
4. When the inputData channel is closed the wait at line 19 makes sure that we wait for the current processing of all the “limit” routines to finish until we return from the main function.

This is a very efficient pattern of constructing functions where we want parallel execution but we also want to be able to control the resource usage and not run into resource starving issues.

2 thoughts on “Learning GO: Limit #concurrency in #golang

  1. Andrei

    I didn’t test it, but I think you could add some more control to the flow by using a buffered channel. As such, you could start the validation routines faster if they are too slow, but only if resources usage allows you to start more of them.

    Reply
    1. George Valentin Voina Post author

      True, I have seen also that approach and you are right can be made more dynamic and adaptive with a buffered channel. But most of the time you need a simple way to limit concurrency and there are not so many cases when you actually need a dynamic number of goroutines.
      When I have time I will try also the buffered channel approach.

      Reply

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.