Site icon Voina Blog (a tech warrior's blog)

Learning GO: Limit #concurrency in #golang

Advertisements

Concurrency is easy to implement and fun in go, but as we know all things that are easy and fun can end up doing a lot of dammage if we are not careful.
The way the concurrency was implemented in my previous post is prone to abuse.
What if the for loop iterates over a huge or unlimited sequence of inputs ?
In case we iterate over million of items or we get the data from a channel that is fed by a previous processing stage through a messaging system for ex. (rabbitmq, activemq etc.) we may end up with our code spawning and allocating resources for millions of go-routines until our resources are exhausted.
In general concurrency is good until we end up using all the available resources (CPU threads and memory) but becomes very bad as soon as we approach our resource limit.
As everything in life moderation is the answer, so we need to limit the concurrency if possible.

This can be done very easy in golang by changing the method defined in the previous post as follows:

func BusinessValidate(inputData <-chan Item, limit int) error {
	businessErrors := make([]string, 0)
	mux := &sync.Mutex{}
	var wg sync.WaitGroup
	wg.Add(limit)
	for x := 0; x < limit; x++ {
		go func() {
			defer wg.Done()
			for item := range inputData {
				err = ValidateItem(closureInvoice)
				if err != nil {
					mux.Lock()
					businessErrors = append(businessErrors, err.Error())
					mux.Unlock()
				}
			}
		}()
	}
	wg.Wait()
	if len(businessErrors) > 0 {
		return errors.New(strings.Join(businessErrors, "\n"))
	}
	return nil
}

The major changes are that we start with an initial delta on the wait group set to “limit”. This will in fact allocate “limit” tokens that will allow us to control how many go-routines will run at one time.

Another difference is that wg.Wait() call is moved outside the both for loops. We will see that the usage of the wait is different this time.

The way the function works is like this:

1. The first loop at line 6 will spawn a “limit” of go-routines.
2. The loop inside the go-routines will run forever until the input channel inputData is closed. Each of the “limit” go-routines will read messages from that channel and process them.
3. Errors are accumulated in the businessErrors container
4. When the inputData channel is closed the wait at line 19 makes sure that we wait for the current processing of all the “limit” routines to finish until we return from the main function.

This is a very efficient pattern of constructing functions where we want parallel execution but we also want to be able to control the resource usage and not run into resource starving issues.

Exit mobile version