Go语言实战(六)并发
线程与进程
- 应用启动的时候就会有一个进程(Process),像linux里面的PID;一个进程可以起多个Thread(线程);所以一个进程至少包含一个线程,进程终止,那线程也就game over
并发与并行
- 并发(concurrency)是多个线程一起跑,而并行(Parallelism)是跑在不同的处理器上面,依赖于cpu的核心,若是单核的,那就不用谈什么并行了;而并发却可以跑在单核的CPU上
goroutine
- 如何启动一个goroutine? 使用关键字go就可以了
// channel stop then main routine util write data
go greet(c)
- channel自带阻塞功能,读是一个等待的过程
func greet(c chan string) {
fmt.Printf("hello +%s\n", <-c)
}
func TestChannel1(t *testing.T) {
c := make(chan string)
// channel stop then main routine util write data
greet(c)
fmt.Printf("after greet")
}
func TestChannel2(t *testing.T) {
c := make(chan string)
// channel stop then main routine util write data
go greet(c)
c <- "world"
fmt.Printf("after greet")
}
输出:
hello +world
after greet
- deadlock
package main
import "fmt"
func main() {
fmt.Println("main() started")
c := make(chan string)
c <- "John"
fmt.Println("main() stopped")
}
关于waitgroup的11个测试
竞争
/**
竞争, 多个线程争抢一个资源,无法保证资源的原子性
*/
func TestCompete(t *testing.T) {
var count int32
var wg sync.WaitGroup
threadNum := 2
wg.Add(threadNum)
for i := 0; i < threadNum; i++ {
go incCount(&wg, &count)
}
wg.Wait()
fmt.Println(count)
}
func incCount(wg *sync.WaitGroup, count *int32) {
defer wg.Done()
for i := 0; i < 2; i++ {
value := *count
runtime.Gosched()
value++
*count = value
}
}
由于多个线程一起跑,所以count处于一个竞争状态,即线程不安全,输出的结果也就不确定,可能是2、3、4
使用Atomic进行操作
/**
使用原子操作,进行处理,使得资源能够保证线程安全
*/
func TestCompeteAtomic(t *testing.T) {
t.Parallel()
t.Run("competeAtomicTest", func(t *testing.T) {
CompeteAtomicFunc()
})
}
func CompeteAtomicFunc() {
var (
count int64
wg sync.WaitGroup
)
threadNum := 2
wg.Add(threadNum)
for i := 0; i < threadNum; i++ {
go incAtomicCount(&wg, &count)
}
wg.Wait()
fmt.Println(count)
}
func incAtomicCount(wg *sync.WaitGroup, count *int64) {
defer wg.Done()
for i := 0; i < 2; i++ {
atomic.AddInt64(count, 1)
}
}
原子操作能够保证输出结果一直都是4
互斥锁
可以使用go自带的mutex进行操作;当修改资源的时候先进行锁住,修改完成再释放掉
/**
使用互斥锁操作,进行处理,使得资源能够保证线程安全
*/
func TestCompeteMutex(t *testing.T) {
t.Parallel()
t.Run("competeMutexTest", func(t *testing.T) {
CompeteMutexFunc()
})
}
func CompeteMutexFunc() {
var (
count int64
wg sync.WaitGroup
mu sync.Mutex
)
threadNum := 3
wg.Add(threadNum)
for i := 0; i < threadNum; i++ {
go incMutexCount(&wg, &mu, &count)
}
wg.Wait()
fmt.Println(count)
}
func incMutexCount(wg *sync.WaitGroup, mu *sync.Mutex, count *int64) {
defer wg.Done()
for i := 0; i < 2; i++ {
mu.Lock()
{
value := *count
value++
*count = value
}
mu.Unlock()
}
}
通道
多个goroutine之间建立起通道,就可以进行数据的传输;通道的数据类型可以是基础数据类型,结构体,引用类型或指针;通道分为有缓冲通道、无缓冲通道;
c := make(chan string) // 无缓冲通道
c := make(chan string 10) //有缓冲通道
close(c) //关闭通道
通道的长度和容量
有缓冲的通道的容量是初始的时候设定的;若超出容量,则会挂起,等待被消费
// TestResult
// === RUN TestChannel3
// c1 length: 0, capacity: 0
// c2 length: 0, capacity: 10
// c2 length: 10, capacity: 10
// c2 holding, not run
func TestChannel3(t *testing.T) {
c1 := make(chan int)
c2 := make(chan int, 10)
fmt.Printf("c1 length: %d, capacity: %d\n", len(c1), cap(c1))
fmt.Printf("c2 length: %d, capacity: %d\n", len(c2), cap(c2))
for i := 0; i < 10; i++ {
c2 <- i
}
fmt.Printf("c2 length: %d, capacity: %d\n", len(c2), cap(c2))
// 超出c2的capacity的时候,routine会等待被取走数据后再写入
fmt.Printf("c2 holding, not run\n")
c2 <- 11
}
通道是有序的,先进先出
// TestResult
// value of c2 is: 0
// value of c2 is: 1
// value of c2 is: 2
// value of c2 is: 3
// value of c2 is: 4
// value of c2 is: 5
// value of c2 is: 6
// value of c2 is: 7
// value of c2 is: 8
// value of c2 is: 9
func TestChannel4(t *testing.T) {
c2 := make(chan int, 10)
for i := 0; i < 10; i++ {
c2 <- i
}
for i := 0; i < 10; i++ {
fmt.Printf("value of c2 is: %d\n", <-c2)
}
}
同一goroutine不能操作无缓冲通道的channel
// TestDeadLock 在同一goroutine里面不能又读又取无缓冲通道的channel
// 程序hang住
func TestDeadLock(t *testing.T) {
c := make(chan string)
c <- "hello"
fmt.Printf("after channel: %s\n", <-c)
}