In this article I show a solution to the problem of closing a Go channel that is being written by several goroutines. We see the process and reasoning to get to a solution and during this process we use several standard Go concurrency tools like closing channels for synchronization, atomic types, select and wait groups.

Introduction

I found an interesting problem in Go development while building the open source project Conveyor (https://github.com/leolara/conveyor, https://github.com/leolara/conveyor-impl). How do you close a channel that is being written by several goroutines? To bound more the problem, in my case, you control the writers but not the readers.

Researching about this I found this article https://go101.org/article/channel-closing.html, but it actually says that you shouldn’t try:

don’t close a channel if the channel has multiple concurrent senders.

The referred article suggest several workarounds, which expect you to write the readers in a particular matter. I want to close the channel while the reader can be reading in a idiomatic manner like with for range so they exited the loop normally when the channel is closed.

In this article I want not only to provide a solution has good code quality and it is robust, but as well I would like to show the journey and reasoning getting to it. That way I think it can be helpful for people starting on concurrent programming.

The core of the problem

The main difficulty comes from the fact that writing on a closed channel always panics, even when you wrote on it before it was closed. You read right, when you write on a channel the goroutine will usually block, if before the data is read the channel is closed the blocked goroutine will panic.

This does not happen when you read from a channel, in Go you can read it and find out whether it was closed (either before reading or while blocked reading). Unfortunately, the same behaviour is not implemented in Golang when writing to channels. I do not know the reasons, a possible guess is that this design decision might make Go channels much more efficient.

A starting example

package gochannels

type Publisher struct {
	ch chan string
}

func (p *Publisher) Read() <-chan string {
	return p.ch
}

func (p *Publisher) write(data string) {
	go func() {
		p.ch <- data
	}()
}

func (p *Publisher) Close() {
	close(p.ch)
}

// there are other Publisher methods

In this example other internal code in Publisher will call the private method write to send data to the reader or readers, for example as a reaction to a network message. Of course, if write is called after the channel is closed the actual write in the channel will panic. This is easy solvable, for example using a atomic flag:

type Publisher struct {
	ch chan string
	closing uint32
}

func (p *Publisher) write(data string) {
	if atomic.LoadUint32(&p.closing) != 0 {
		return nil
	}
	go func(data string) {
		p.ch <- data
	}(data)
}

func (p *Publisher) Close() {
	atomic.StoreUint32(&p.closing, 1)

	close(p.ch)
}

But this does not solve our main problem, as stated above, which is that the goroutines inside write will panic if the channel is closed after they are blocked writing on the channel, and before they have been unblocked by a channel reader.

Some experiments

A first straight forward workaround could be to let the write panic when the channel is closed and catch the panic using recover. This can be a good solution if you do not want to think much of the problem. With this you lose some control of what is happening and also you have to account for other types of panics. However, I was sure that there is a solution using the main tools of concurrency in Go, and getting to it, is at the very least a very interesting exercise.

The first idea that came to mind is to read from the channel until all the writers are unblocked, and hence the panic can be avoided. It is very easy to read everything from a channel using a for range loop:

func (p *Publisher) Close() {
	atomic.StoreUint32(&p.closing, 1)

	for range p.ch {
	}

	close(p.ch)
}

This code first sets a flag (atomically to avoid race conditions) which prevents new goroutines entering a blocked state. After that it enters in a loop reading from the channel and discarding the data, and hence unlocks all the goroutines writing on the channel. At last, it closes the channel.

This obvoiusly does not work, as the code will get stuck in the loop, it only exits the loop if the channel is closed, and that is something we want to do after we have read from all the writing goroutines and blocked them. Following this way of reasoning we could just launch the reading loop in a goroutine and then wait for it to have read from all the blocked writers.

func (p *Publisher) Close() {
	atomic.StoreUint32(&p.closing, 1)

	go func() {
		for range p.ch {
		}
	}()

	<-time.After(1 * time.Second)

	close(p.ch)
}

Interestingly one of the biggest problems is that it will work. It will work most of time, until in perhaps in two years time it will start to crash your server continously, and more importantly you will never think that the code crashing your server today was introduced two years ago.

This is why I find concurrent programming so interesting, bugs can be very well hidden, you are required to be rigorous and think things through. The fact is that our code will panic when after one second, there are still writing subroutines blocked. This can be caused by a overload of requests, or simply because other goroutines are being added that decrease the availability of threads and hence being scheduled, or simply our virtual machine is being migrated.

It creates a bug, that can be triggered by things that are distant from the actual code causing the bug, and hence will might be very difficult to detect. Increasing the time will make it more unlikely but also will make your code slower. Up to where would you increase the wait time, knowing that it just makes the problem less unlikely?

Another try with select

It seems that our main problem is that the writers are blocked and the only way to unblock them is to read from the channel. But there is another tool at our disposal, select. This would allow us to put the write on the channel together with something else, and if select would take that other branch before the channel is closed then we would have avoided the problem all together.

Another tool we see, also present in the article of go101 mentioned in the intro, is channels that are just closed as a signal. As reading from a closed channel does not panic, you could have a channel that you only closed as a signal for the readers of the channel.

type Publisher struct {
	ch chan string
	closingCh chan interface{}
}

func (p *Publisher) write(data string) {
	select {
	case <-p.closingCh:
		return
	default:
	}

	go func(data string) {
		select {
			case <-p.closingCh:
			case p.ch <- data:
		}
	}(data)
}

func (p *Publisher) Close() {
	close(p.closingCh)

	go func() {
		for range p.ch {
		}
	}()

	<-time.After(1 * time.Second)

	close(p.ch)
}

This seems more robust, it is using channels and select and it might look like all the blocked writers will exit when closingCh.

There is a major caveat though. At some point both closingCh and ch will be closed. The natural thing would be to think that if one of the select branches is ok to proceed and the other causes a panic, it would take the one that is ok. Actually it can take any of the branches.

Reading the documentation https://golang.org/ref/spec#Select_statements

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection.

And actually causing a panic is not blocking, and therefore that branch can be chosen, causing the code to panic.

We are again, with a bug that might lie dormant for years, until it can become fatal.

The solution involves wait groups

Going back to the previous code where we read from the channel to unlock the writers, our problem is that we cannot definitely know when all writers have been unlocked, so the only solution we thought was to wait for some time. The solution is to use a wait group.

The reason why I didn’t think about wait groups right away id that in Go they are usually seen in a very specific scenario. You launch a bunch of goroutines and then you wait for all of them to finish. The wait operation is usually executed right after all the add operations have been executed.

However, a wait group should be much more general tool than that. A wait group should really be a thread safe counter, with a wait operation that blocks until the counter reaches zero. We could be incrementing and decrementing the wait group counter, reaching perhaps several times zero and increasing again, and then when we need it using wait to block until the counter reaches zero.

From that perspective, we can use it as a method for orderly tear down of a subsystem that spawns many short lived goroutines. All goroutines increment and decrement the same wait group as they start and finish. When we want to stop the subsystem we need first a mechanism to prevent the system from launching new goroutines. From that point we know that the wait group is only going to decrease, and then we just need to wait on the wait group to reach zero.

As we will see below some of these assumptions are not correct.

Anyway, lets see in detail this solution, going back to our previous attempt trying to read from the channel until all the writers are unblocked:

package gochannels

type Publisher struct {
	ch chan string
	closing uint32
	writersWG sync.WaitGroup
}

func (p *Publisher) Read() <-chan string {
	return p.ch
}

func (p *Publisher) write(data string) {
	p.writersWG.Add(1)
	go func(data string) {
		defer p.writersWG.Done()
		if atomic.LoadUint32(&p.closing) != 0 {
			return nil
		}

		p.ch <- data
	}(data)
}

func (p *Publisher) Close() {
	atomic.StoreUint32(&p.closing, 1)

	go func() {
		for range p.ch {
		}
	}()

	p.writersWG.Wait()

	close(p.ch)
}

// there are other Publisher methods

And this seemed to solved it.

There are other options using wait groups.

Having this solution a new thing comes up. Using this wait group approach, it is not really necessary to drain the writers ourselves. With this approach we could be ok to leave the reader to read all the pending writes, and then close the channel.

func (p *Publisher) CloseWithoutDraining() {
	atomic.StoreUint32(&p.closing, 1)

	p.writersWG.Wait()

	close(p.ch)
}

Another thing that comes up is that now, the approach suing channels to close might work as well.

type Publisher struct {
	ch chan string
	closingCh chan interface{}
	writersWG sync.WaitGroup
}

func (p *Publisher) write(data string) {
	p.writersWG.Add(1)

	go func(data string) {
		defer p.writersWG.Done()

		select {
		case <-p.closingCh:
			return
		default:
		}

		select {
			case <-p.closingCh:
			case p.ch <- data:
		}
	}(data)
}

func (p *Publisher) Close() {
	close(p.closingCh)

	go func() {
		for range p.ch {
		}
	}()

	p.writersWG.Wait()

	close(p.ch)
}

func (p *Publisher) CloseWithoutDraining() {
	close(p.closingCh)

	p.writersWG.Wait()

	close(p.ch)
}

// there are other Publisher methods

Something to note in this last implementation is that the first select in method write is non-bloking as it contains a default case.

Testing for race conditions

From wikipedia https://en.wikipedia.org/wiki/Race_condition

A race condition or race hazard is the condition of an electronics, software, or other system where the system’s substantive behavior is dependent on the sequence or timing of other uncontrollable events. It becomes a bug when one or more of the possible behaviors is undesirable.

Testing and finding race conditions bugs is very complicated. The main factor being that for a race condition to happen in needs the right timing between instructions in the same program, and hence it is dependent on things like computer load at the moment, how the OS does context switching or the actual CPU version and model.

One important factor to trigger race conditions the environment variable GOMAXPROCS that determines the number of OS threads that are use to run the Go program. A value of 1, will hide many possible race conditions, and this might be the default in your system. Also, changing its value we can change the timing of the program and then trigger a race conditions bug. The go test option -cpu allows us to run the tests with several different values for GOMAXPROCS and will help us to discover race conditions bugs.

Another very useful tool is the flag -race when executing go tests, it will show a message if it detects a race condition.

What I did was to run go tests with a list of -cpu values, the flag -race activated, and -count=1000 to run it many times and cover different timings, we also use -failfast to stop the test once we get an error. Something like:

go test -cpu=1,9,55,99 -race -count=1000 -failfast

Unfortunately running this several times triggers the integrated race condition detector. The important part of the error is this one:

==================
WARNING: DATA RACE
Read at 0x00c000590078 by goroutine 138:
  sync.(*WaitGroup).Add()
      /usr/local/Cellar/go/1.12.6/libexec/src/internal/race/race.go:37 +0x169
  [...]/gochannels/solution.(*Publisher).write()
      /Users/leo/src/hugo/code/gochannels/solution/publisher.go:34 +0x4b
  [...]/gochannels/solution.(*Publisher).Run()
      /Users/leo/src/hugo/code/gochannels/solution/publisher.go:25 +0xb7
  [...]/gochannels/solution.TestIntermediate.func2()
      /Users/leo/src/hugo/code/gochannels/solution/publisher_test.go:75 +0x42

Previous write at 0x00c000590078 by goroutine 142:
  sync.(*WaitGroup).Wait()
      /usr/local/Cellar/go/1.12.6/libexec/src/internal/race/race.go:41 +0xef
  [...]/gochannels/solution.(*Publisher).Close()
      /Users/leo/src/hugo/code/gochannels/solution/publisher.go:61 +0x88
  [...]/gochannels/solution.TestIntermediate()
      /Users/leo/src/hugo/code/gochannels/solution/publisher_test.go:89 +0x2bd
  testing.tRunner()
      /usr/local/Cellar/go/1.12.6/libexec/src/testing/testing.go:865 +0x163

The race detector, warn us about a race condition that happens when Wait and Add are called simultaneously.It seems odd that there is a race condition between two methods that are from a package that is for synchronizing goroutines. But this component has a caveat, from the documentation https://golang.org/pkg/sync/#WaitGroup.Add

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.

So, there can be some race condition bugs if Wait and Add are called concurrently with a positive value parameter for Add, I guess the developers of the standard library had to make a compromise perhaps for performance and/or complexity. The call to Done can be done concurrently without problems.

There are several instances of people talking about this problem online, for example https://groups.google.com/forum/#!topic/golang-nuts/Qq_h0_M51YM and https://stackoverflow.com/questions/53769216/is-it-safe-to-add-to-a-waitgroup-from-multiple-goroutines

After trying several things, I found a solution, to surround all calls to Wait and Add(1) with a sync.Mutex lock.

Using a mutex lock around a waitgroup

type Publisher struct {
	ch chan big.Int

	closingCh      chan interface{}
	writersWG      sync.WaitGroup
	writersWGMutex sync.Mutex
}

func (p *Publisher) write(data string) {
	go func(data string) {
		p.writersWGMutex.Lock()
		p.writersWG.Add(1)
		p.writersWGMutex.Unlock()

		defer p.writersWG.Done()

		select {
		case <-p.closingCh:
			return
		default:
		}

		select {
			case <-p.closingCh:
			case p.ch <- data:
		}
	}(data)
}

func (p *Publisher) Close() {
	close(p.closingCh)

	go func() {
		for range p.ch {
		}
	}()

	p.writersWGMutex.Lock()
	p.writersWG.Wait()
	p.writersWGMutex.Unlock()

	close(p.ch)
}

At first it seems like a bad idea to to put a call to something that blocks (Wait) inside a mutex lock, it seems that it will hold the mutex lock indefinitely and prevent others to enter into it.

One way of looking at a mutex is that it creates a “critical section” between the lock and unlock. This critical section cannot be entered by more than one goroutine at the same time, or equivalently, all “critical sections” executions are done one after another never in parallel.

That way, we can reason about our problem in the following matter: there are two kinds of Add(1) some are executed before Wait and some others are executed after Add(1). The mutex prevents any other possibility.

For the Add(1) that are executed before Wait we can see that either they call Done when a reader has unblock their channel write, or because before calling Wait, closingCh was closed. Hence, for all Add(1) called before Wait, one way or the other Done will be called, reducing the counter to zero and hence unblocking Wait at some point. The important thing here, is that the mutex lock does not prevent from calling Done and hence at some point unlock Wait.

We have seen then, that Wait will always unblock.

Now, we should deal with all Add(1) executed after Wait, we can see that in this case closingCh will be closed before the call to Add(1), hence when it arrives to the first select it will exit and call Done without trying to write on ch, which would panic.

From this reasoning we can see that this does not cause a deadlock, and the tests confirms it.

There might be some performance implications but it seems that they are not important. All calls to Add will be sequenced, but this is a fast and constant time operation, so it should not produce much contention. Anyway, it would be a good idea to benchmark this.

Another interesting research that comes from this is about why the standard wait group have this race condition and if it could be fixed without affecting performance in a simple matter.

The solution

You have here a full example of the solution with tests: https://gist.github.com/leolara/f6fb5dfc04d64947487f16764d6b37b6

You could try to comment out the mutex and see that the tests return an error due to a race condition.

We have seen how complicated can be reasoning about concurrency can be. This is not a complex scenario, in more real life complex scenarios concurrency can be one of the most challenging things in computer science.

Thanks to https://github.com/TopherGopher for his review.

I hope you all have enjoyed reading this article, do not hesitate to comment.

Cheers, Leo