Concurrency with Goroutines and Channels
Introduction to Goroutines and Channels for achieving concurrency in Go programs
Concurrency Patterns in Go
Synchronization and Communication Patterns
Go's concurrency model is built around goroutines and channels. Goroutines are lightweight, concurrently executing functions, and channels are typed conduits that allow goroutines to communicate and synchronize. Understanding synchronization and communication patterns is crucial for writing robust and efficient concurrent Go programs.
Synchronization ensures that access to shared resources is coordinated to prevent race conditions and data corruption. Common synchronization techniques in Go include:
- Mutexes (
sync.Mutex
): Provide exclusive access to a shared resource. A goroutine must acquire the lock before accessing the resource and release it afterwards. - Read/Write Mutexes (
sync.RWMutex
): Allow multiple readers to access a resource concurrently, but only one writer at a time. This can improve performance when reads are much more frequent than writes. - Atomic Operations (
sync/atomic
): Provide low-level, lock-free mechanisms for manipulating primitive data types. These are generally faster than mutexes for simple operations like incrementing counters. - WaitGroups (
sync.WaitGroup
): Wait for a collection of goroutines to finish. The main goroutine adds to the counter before starting each worker goroutine, and each worker callsDone()
when it finishes. The main goroutine blocks onWait()
until the counter reaches zero. - Conditional Variables (
sync.Cond
): Allow goroutines to wait until a certain condition becomes true. They are often used in conjunction with mutexes to protect shared state.
Communication enables goroutines to exchange data and signal events. The primary mechanism for communication in Go is through channels:
- Channels (
chan
): Typed conduits for sending and receiving data between goroutines. Channels can be buffered or unbuffered. Unbuffered channels require a sender and receiver to be ready at the same time (synchronous communication), while buffered channels can hold a limited number of values before blocking. - Select Statement: Allows a goroutine to wait on multiple channel operations. The select statement blocks until one of the cases is ready to proceed. If multiple cases are ready, one is chosen randomly. This provides a powerful mechanism for handling multiple concurrent events.
Exploring Common Concurrency Patterns
Worker Pools
A worker pool is a concurrency pattern that limits the number of concurrent goroutines to prevent resource exhaustion. A central dispatcher distributes tasks to a fixed number of worker goroutines.
package main import ( "fmt""sync" ) func worker(id int, jobs <-int, results chan<-int) { for j := range jobs {
fmt.Printf("worker %d processing job %d\n", id, j) // Simulate workvar sum intfor i := 0; i < j*1000000; i++ {
sum += i
}
results <- j * 2 // Simulate calculation and send result }
} func main() {
numJobs := 10
numWorkers := 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs) var wg sync.WaitGroup // Create worker goroutinesfor w := 1; w <= numWorkers; w++ {
wg.Add(1) //Increment WaitGroup countergo func(w int) { defer wg.Done() // Decrement WaitGroup counter when worker finishes worker(w, jobs, results)
}(w)
} // Send jobs to the jobs channelfor j := 1; j <= numJobs; j++ {
jobs <- j
} close(jobs) //Close jobs channel to signal no more jobs// Collect results from the results channelgo func() {
wg.Wait() // Wait for all workers to completeclose(results) // Close results channel to signal no more results }() for a := range results {
fmt.Println("Result:", a)
}
}
This example demonstrates a worker pool. The main
function creates a channel for jobs (jobs
) and a channel for results (results
). It then launches a fixed number of worker goroutines that listen on the jobs
channel. The main
function sends jobs to the jobs
channel and then closes it to signal that there are no more jobs. The worker goroutines process the jobs and send the results to the results
channel. Finally, the main
function collects the results from the results
channel. The sync.WaitGroup
ensures all workers finish before the results
channel is closed.
Fan-Out
Fan-out is a concurrency pattern where a single input channel is distributed to multiple worker goroutines for parallel processing.
package main import ( "fmt""sync""time" ) func process(id int, input int) int {
fmt.Printf("Worker %d processing input %d\n", id, input)
time.Sleep(time.Millisecond * 100) // Simulate workreturn input * 2
} func worker(id int, inputCh <-int, outputCh chan<-int, wg *sync.WaitGroup) { defer wg.Done() for input := range inputCh {
result := process(id, input)
outputCh <- result
}
} func main() {
numWorkers := 3
inputs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
inputCh := make(chan int, len(inputs))
outputCh := make(chan int, len(inputs)) var wg sync.WaitGroup // Start worker goroutinesfor i := 1; i <= numWorkers; i++ {
wg.Add(1) go worker(i, inputCh, outputCh, &wg)
} // Send inputs to the input channelfor _, input := range inputs {
inputCh <- input
} close(inputCh) // Close the output channel after all workers are donego func() {
wg.Wait() close(outputCh)
}() // Collect and print the resultsfor result := range outputCh {
fmt.Println("Result:", result)
}
}
In this example, the main
function creates an input channel (inputCh
) and an output channel (outputCh
). It then starts multiple worker goroutines that read from the inputCh
, process the data, and send the results to the outputCh
. The main
function sends the input data to the inputCh
and closes it. A separate goroutine waits for all the worker goroutines to finish using a sync.WaitGroup
and then closes the outputCh
. Finally, the main
function iterates through the outputCh
to print the results.
Fan-In
Fan-in is a concurrency pattern where multiple input channels are multiplexed into a single output channel. This allows a single goroutine to consume data from multiple sources concurrently.
package main import ( "fmt""sync""time" ) func source(id int, outputCh chan<-int) { for i := 1; i <= 5; i++ {
time.Sleep(time.Millisecond * 50) // Simulate producing data outputCh <- id*10 + i
}
} func fanIn(inputChs ...<-chan int) <-chan int {
outputCh := make(chan int) var wg sync.WaitGroup // Launch a goroutine for each input channelfor _, inputCh := range inputChs {
wg.Add(1) go func(ch <-chan int) { defer wg.Done() for val := range ch {
outputCh <- val
}
}(inputCh)
} // Close the output channel after all input channels are closedgo func() {
wg.Wait() close(outputCh)
}() return outputCh
} func main() {
numSources := 3
inputChs := make([]chan int, numSources) // Create source channels and start source goroutinesfor i := 0; i < numSources; i++ {
inputChs[i] = make(chan int) go source(i+1, inputChs[i])
} // Fan-in the input channels into a single output channel outputCh := fanIn(func() []<-chan int {
result := make([]<-chan int, len(inputChs)) for i, ch := range inputChs {
result[i] = ch
} return result
}()...) // Collect and print the resultsfor result := range outputCh {
fmt.Println("Result:", result)
} // Close all input channels after source goroutines are done (not strictly necessary here since they're already closed implicitly)for i := 0; i < numSources; i++ { close(inputChs[i]) //this line actually doesn't do anything because channel is already closed and will cause panic if called multiple times }
}
In this example, the main
function creates multiple input channels (inputChs
) and starts source goroutines that send data to these channels. The fanIn
function takes a variable number of input channels as arguments and returns a single output channel. It launches a goroutine for each input channel that reads from the input channel and sends the data to the output channel. A sync.WaitGroup
is used to wait for all the input channel processing goroutines to finish before closing the output channel. The main
function then consumes data from the output channel. Note the use of a variadic function to accept multiple input channels and the type conversion to <-chan int
.
Fan-Out/Fan-In Combined
You can combine Fan-Out and Fan-In patterns to create more complex concurrent pipelines. This allows you to distribute tasks across multiple workers and then aggregate the results from those workers into a single channel.