[TOC]
并发
桌子上有很多桃子,每个人看到桃子去抓取。这就是并发,他存在的问题是 不同的人会尝试去抓同一个桃子。这么导致后面的人抓不到。
可能的解决办法是大家排队一次拿桃子,这样又会降低效率。
如何又快又不出错的拿桃子,这就是问题所在。实际上,并发比这个更复杂。
线程
进程只有一个指令执行路径,也就是只有一个pc计数器。
线程的引入 增加了指令执行路径。每个线程有自己的指令执行路径,即PC计数器,也有自己独立的寄存器。
但是这些线程又共享地址空间,从而共享代码和数据。
类似于进程间的切换有上下文切换,线程间切换也有上下文切换。
进程是把状态保存到进程控制块 (Process Control Block, PCB)。
线程是把状态保存到线程控制块(Thread Control Block, TCB)。需要注意的是,同一个进程的线程间 本身是共享地址空间的,所以不需要切换当前使用的页表。
因为每个线程都有独立的栈,多线程的引入还影响了地址空间布局。有的应用程序会利用这个特性,譬如Java有ThtreadLocal, 在线程栈里存变量。
线程和进程一样,并不是创建了就马上运行,而是会由操作系统调度。
多线程带来的问题
共享内存
简短地说a = a +1
其实不止一个指令
线程1
register b= a #a1 加载变量a到寄存器
b = b+1 #a2 增加1
a = b #a3 寄存器写回a
线程2
register b= a #b1 加载变量a到寄存器
b = b+1 #b2 增加1
a = b #b3 寄存器写回a
当有2个线程都执行这个指令序列时,由于调度问题。并不是按a1,a2,a3 运行的。
有一种更可能是 a1, a2, b1,b2,b3(此时a已经更新), a3(此时写回a, 覆盖了b3的更新)。 不同的指令序列,会产生不同的结果。这也称为竞态条件。由于执行这段代码的多个线程可能导致竞争状态,因此我们将此段代码称为临界区。临界区是访问共享资源的代码片段,一定不能由多个线程共同执行。
要确保不出现这种情况,必须确保 指令是原子性的,才不会互相影响。
加锁 是一种方法。但是加锁又会带来急剧的性能降低。现在更倡导的 通过通信来共享内存,而不是共享内存来通信。这点Erlang是最经典的。
objdump, gdb, valgrind, purify 这些工具有助于分析底层细节。
线程协作
譬如A 线程 等待B 线程的运行结果。B线程等待C线程的结果, C又等待A的结果。这样形成一个环,导致死锁。
解决方案
锁
琐是最基础的机制,它的实现依赖于硬件和操作系统。
锁的价值: 使临界区的代码 能够像原子指令一样执行。
所以使用的锁的时候,一定得注意哪个是共享变量,哪个是临界区。
如果一个共享变量,在A函数加了锁,在B函数没有锁。那是没有意义的。锁保护的就是某个共享变量,使得只有一个线程可以读写他。
最简单的机制就是一个大锁,锁住数据结构。但是这样会导致性能低,并发度低。引入多个锁(细粒度的锁,只锁数据结构中的某个成员),可以提高并发度。不过提高并发读不一定会提高性能。
评价锁
- 是否提供互斥?
- 公平性,当锁可用时,是否每一个竞争线程都有公平的机会抢到锁,更极端的是否有线程会饿死。
- 性能。3种场景下的考量:
- 没有竞争。
- 单CPU, 多个线程竞争
- 多CPU, 多个线程竞争
锁的实现
控制中断
最容易实现。因为他关闭了线程切换,当然不会有并发问题。
缺点很明显:
- 需要特权权限才可以关闭中断/恢复中断。这个权限显然不能开放给普通线程。
- 不支持多处理器
- 关闭中断 会导致中断丢失,可能会导致严重的系统问题
- 效率低。与正常的指令比,关/开中断的代码执行的比较慢。
只有有限的情况下,内核会使用控制中断的方式。
自旋锁的几种实现
测试并设置指令
不用的cpu有不同的指令。
指令伪代码如下:
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr;
*old_ptr = new;
return old;
}
可以理解为这个函数由硬件实现,是原子性的。
自旋锁代码:
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
lock->flag = 0;
}
void lock(lock_t *lock) {
while(TestAndSet(lock->flag, 1)==1)
;// 这里体现了自旋。 就是一直设置flag为1, 如果之前的值是0,表明站有锁,成功,否则就是已经有线程占有锁,只能重试。
// 这里也体现了: 在单处理器上,需要抢占式的调度器,即有时钟中断,时间片用完了,切换线程。否则就会死循环了。
}
这种实现低效且没有公平性,有饿死的可能。
比较并交换
从实现自旋锁的角度看,和 测试并设置 没差别。
指令伪代码如下:
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected) {
*ptr = new;
}
return actual;
}
链接的加载和条件式存储指令
关键是条件式存储指令。
在存储时,只有变量的值 还是 加载时的值时,存储才会成功。这其实是给了我们一个信息:加载和设置期间是否发生过线程切换。
可能的代码实现如下:
void lock(lock_t *lock) {
while(LoadLinked(&lock->flag) || !StoreConditional(&lock->flag,1))
; //如果flag=1, 表明已有线程持有锁,重试
// 如果存储失败,说明flag被其他线程修改过,需要重视
}
获取并增加
这个有点巧妙,并能保证所有线程都能抢到锁。
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while(lock->turn != myturn)
; // 如果没有其他线程抢锁,turn 和 更新前的tikcet 是相等的。
}
void unlock(lock_t *lcok) {
FetchAndAdd(&lock->turn);// 释放锁。ticket-turn 可以表明同时有多少个线程在竞争锁。
}
自旋锁的问题是空耗CPU, 性能低下。如何优化呢?
优化自旋锁
释放CPU
既然发现自己没有运行的机会,就把机会让出来吧。
void lock() {
while(TestAndSet(&flag,1)==1) {
yield();// 放弃自己的运行机会,给操作系统重新调度
}
}
有点用处,但是CPU切换本身性能低下,不是很好使。
使用队列:休眠替代自旋
这个需要操作系统支持,譬如Solaris 提供park()和unpark(threadId) 用来休眠和唤醒。
typedef struct * lock_t{
int flag; // 这个也是锁,实际线程抢占的
int guard;// 这个是锁。用于保护flag和q
queue_t *q; // 等待flag的队列
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while(TestAndSet(&m->guard,1)==1)
;// 自旋锁。不过由于是用于内部的guard, 自旋的时间很短
if (m->flag==0) { // 锁可以,持有它
m->flag = 1;
m->guard=0;
} else {
// 进入等待队列
queue_add(m->q, gettid());
m->guard = 0; // 这个代码不能放到park()下面,那样一来,其他线程没法抢到guard锁了
park(); // 这行代码其实也有问题。关键就是 m->guard = 0 和 park()之间可以中断。假设发生了中断,由持有锁的线程运行,该线程释放锁之后,调用unpark()。 然后调度回来运行park()。(由于配对的unpark已经无效运行了,后面可能一直处理睡眠状态,无法被唤醒。
// 为了解决这个问题 Solaris 提供了 setpark() 指令。
// setpark(); unpark(); park(), 这样的序列是可以工作的。park会直接返回,而不是睡眠。
// 在m->guard=0 之前 加入 setpark()即可。
// 从这里也可以看出并发程序很难写,因为可能指令序列实在是太多了。简单起见就是加个大锁,先保证正确,遇到性能问题时再优化。
}
}
void unlock(lock_t *m) {
while(TestAndSet(&m->guard,1)==1)
;// 自旋锁。
if (queue_empty(m->q)) {
m->flag = 0;
} else {
unpark(queue_remove(m->q));
}
m->guard = 0;
}
futex
类似于park()和unpark(threadId) , futex做的更多
futex_wait(address, expected)
如果内存addredd的值==expected, 线程休眠,等待唤醒。futex_wake(address)
唤醒在address 出等待的某个线程
// mutex 是个32位的整数。最高位表示锁。其他位表示等待锁的线程的个数
void mutex_lock(int *mutex) {
int v;
// 如果原来是0,表示没有竞争,直接占有锁并返回
if (atomic_bit_test_set(mutex, 31)==0) {
return;
}
atomic_increment(mutex);// 等待线程数+1
while(1) {
if (atomic_bit_test_set(mutex, 31)==0) {
atomic_decrement(mutex); // 抢占成功,等待线程数-1
return;
}
v = *mutex;
if (v>=0) {
continue;//锁可用了,接着尝试
}
futex_wait(mutex,v);// 如果mutex的值没有变过,休眠,否则说明有线程操作过mutex, 说不定锁可用了,可继续尝试
}
}
void mutex_unlock(int *mutex) {
if(atomic_add_zero(mutex, 0x80000000)) {
// 如果释放锁后,等待的线程数为0, 直接返回
return;
}
// 还有线程在等待,需要唤醒他们
futex_wake(mutex);
}
两阶段锁
这个应用中比较常见。
第一阶段:先自旋一定的时间。
第二阶段:再尝试获取锁。如果没有获取到就休眠等待唤醒。
适合自旋时间很短,很快就能释放锁的场景。上文的mutex就是自旋一次的案例。
条件变量
除了锁,我们还需要条件变量。
因为有场景需要: 条件满足时,线程才运行,不满足时就休眠。如果通过自旋锁性能太低了。
条件变量是一个显示队列,当某些执行状态不满足时,线程可以把自己加入队列,等待该条件。
另外某个线程,当它改变了上述状态时,就可以唤醒一个或者多个等待线程,让他们继续执行。
著名的场景就是 生产者/消费者 模型。
使用条件变量的小提示:
- 调用signal 和 wait时需要持有锁。
- 对条件变量使用while而不是if
除了wait, signal, 还需要broadcast, 因为有些场景你无法确保 唤醒的线程是否真的满足运行要求。
经典代码:
int buffer[MAX];// 缓冲区
int fill = 0;// 指示生产者的下一个填充位置
int use = 0; // 指示消费者下一个消费位置
int count = 0; // 指示可消费个数
// 需要注意 buffer, fill, count 都是共享变量
void put(int value) {
buffer[fill] = value;
fill = (fill+1)%MAX;
count++;
}
// 需要注意 buffer, use, count 都是共享变量
int get() {
int tmp = buffer[use];
use = (use+1)%MAX;
count--;
return tmp;
}
cond_t empty, fill; // 两个条件。empty 唤醒生产者,fill唤醒消费者
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i=0;i<loops;i++) {
Pthread_mutex_lock(&mutex);
while ( count == MAX) {
// 如果满了,需要等待 消费者来消费。 这里之所以用while 而不是if, 是因为 线程唤醒后, put调用前,是可能有其他线程运行的(譬如其他生产者),这样本线程又不满足 运行需求了。
Pthread_cond_wait(&empty, &mutex);
}
put(i);
Pthread_cond_signal(&fill);// 唤醒消费者, 你有东西可以消费了。
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i=0;i<loops;i++) {
Pthread_mutex_lock(&mutex);
while ( count == 0) {
// 如果空了,需要等待生产者生产。
Pthread_cond_wait(&fill, &mutex);
}
int tmp = get();
Pthread_cond_signal(&empy);// 唤醒生产者, 你有空间后可以放产品了。
Pthread_mutex_unlock(&mutex);
printf("%d\n",tmp);
}
}
理解条件变量 还有一点。
譬如wait
的职责是释放锁,并让调用者线程休眠。被唤醒时,重新获得锁,并返回调用者。
也就是这里面本身一对 (unlock, lock)的逻辑,不过wait
调用本身是原子的。这也是条件和变量必须搭配锁才能工作的原因。
信号量
信号量可以当成锁和条件变量使用。
定义
信号量是个整数值,通过sem_wait
和sem_post
来操作。
#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1); //第二个参数 0 表示同一进程里的多个线程共享。
sem_wait
信号量的值-1, 当值<0时,线程进入睡眠。其绝对值为等待的线程数。
sem_post
信号量的值+1, 当有线程在等待时,唤醒等待的线程之一。
二值信号量(锁)
sem_t m;
sem_init(&m,0,1);// 关键初值是1
sem_wait(&m);
// critical secion here
sem_post(&m);
信号量用作条件变量
sem_t s;
void *child(void *arg) {
printf("child\n");
sem_post(&s);// 这里就是改变条件,会唤醒父线程。
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0);// 关键初值0
printf("parent: begin\n");
pthread_t c;
Pthread_create(c, NULL, child, NULL);
sem_wait(&s);
printf("parent: end\n");
return 0;
}
解决生产者/消费者问题
int buffer[MAX];
int fill = 0;
int use = 0;
void put(int value) {
buffer[fill] = value;
fill = (fill+1) % MAX;
}
void get() {
int tmp = buffer[use];
use = (use+1) % MAX;
return tmp;
}
//////////
sem_t empty;
sem_t full;
sem_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&empty);// 等待可写入
sem_wait(&mutex); // 并发读写数组,需要枷锁
put(i);
sem_post(&mutex);
sem_post(&full);// 通知已有数据
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full);//
sem_wait(&mutex); // 并发读写数组,需要枷锁
int tmp = get();
sem_post(&mutex);
sem_post(&empty);//
printf("%d\n", tmp);
}
}
int main(int argc, char *argv[]) {
// ...
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
// ...
}
读写锁
typedef struct _rwlock_t {
sem_t lock;
sem_t writelock;
int readers;
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock,0,1);
sem_init(&rw->writelock, 0, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);// 就是普通的加锁
rw->readers++;
if (rw->readers == 1) {
sem_wait(&rw->writelock); // 第一个读者加写锁。。有读者时,不能写入数据
}
sem_post(&rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);// 就是普通的加锁
rw->readers--;
if (rw->readers == 0) {
sem_post(&rw->writelock); // 最后一个读者释放写锁。。
}
sem_post(&rw->lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->writelock);
}
void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->writelock);
}
这个实现有个问题,就是写锁有饥饿问题。一直有读,导致没法写入数据。优化方式有写排队时,不让新的读者加入。
提示:简单的笨办法可能更好
哲学家就餐问题
略,就是改变某个哲学家的就餐习惯。
具体参考哲学家就餐问题
如何实现信号量
使用锁+条件变量实现信号量。
typedef struct _Zem_t {
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} Zem_t;
void Zem_init(Zem_t *s, int value) {
s->value = value;
Cond_init(&s->cond);
Mutex_init(&s->lock);
}
void Zem_wait(Zem_t *s) {
Mutex_lock(&s->lock);
while(s->value <=0) { // 不要忘了,条件变量需要用while。虽然这里没有必要。因为lock已经保护了。
Cond_wait(&s->cond, &s->lock);
}
s->value --;
Mutex_unlock(&s->lock);
}
void Zem_post(Zem_t *s) {
Mutex_lock(&s->lock);
s->value ++;
Cond_signal(&s->cond);
Mutex_unlock(&s->lock);
}
常见并发问题
非死锁缺陷
违反原子性缺陷: 违反了多次内存访问中预期的可串行性(即代码段本意是原子性的,但在执行中并没有强制实现原子性)
代码示例:
Thread1::
if (thd->proc_info) {
...
fputs(thd->proc_info,...);
...
}
Thread2::
thd->proc_info = NULL;
解决方法很简单,加个锁变成原子性就可以了。
违反顺序缺陷:两个内存访问的预期顺序被打破了(即A应该在B之前执行,但是实际运行中却不是这个顺序)
代码示例:
Thread1::
void init() {
...
mThread = PR_CreateThread(mMain, ...)
...
}
Thread2::
void mMain(...) {
...
mState = mThread->State;//这里假设了 Thread1已经运行。如果Thread2先运行,会空指针异常
...
}
解决办法是加个条件变量。必须Thread1先运行了。 mState那行代码才能运行。
死锁缺陷
死锁容易发生有2个原因:
- 大型的代码库中,组件之间会有复杂的依赖。
- 封装。软件开发者一直倾向于隐藏实现细节,以模块化的方式让软件开发更容易。模块化和锁不是很契合。
产生死锁的4个条件:
互斥
:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)持有并等待
: 线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)非抢占
: 线程获得的资源(例如锁),不能被抢占。循环等待
: 线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。
如果这4个条件的任何一个没有满足,死锁就不会发生。
预防
循环等待
按一致的顺序加锁,可以避免循环等待。
全序和偏序都需要细致的锁策略的设计和实现。另外,顺序只是一个约定,粗心的程序员很容易忽略,导致死锁。
有种经验是: 通过锁的地址来强制锁的顺序。
持有并等待
增加一个全局锁,把业务锁都包起来。可以避免持有并等待的问题。
他的问题是不适用于封装,同时降低了并发度。
非抢占
增加tryLock。不能获取锁就释放已经持有的锁。
2个问题:1 是会产生活锁。2是不利于封装。
互斥
直接使用硬件指令,如CompareAndSwap, 而不使用锁。
问题是工作量很大,并不容易实现。
通过调度避免死锁
使用场景太少,得事先知道所有线程对锁的需求。然后分析冲突,避免有冲突的线程同时运行。
检查和恢复
死锁毕竟是低概率的事情,所以允许它发生,并在检查到死锁时再采取行动。
基于事件的并发
多线程并发有两个问题:
1 是正确处理并发很有难度,如忘加锁、死锁等问题。
2 是开发者无法控制多线程在某一刻的调度。
基于事件的并发恰好可以处理上述2个问题。1是单线程,不存在资源竞争等问题,不需要加锁。2是开发者调度。
基本模型如下:
while(1) {
events = getEvents();
for (e in events) {
processEvent(e);//处理事件
}
}
接收事件有几个api: select
,poll
、epll
等。可以参考I/O多路复用
由于是单线程,所以在基于事件的系统中必须遵守一条规则: 不允许阻塞调用。
一旦阻塞调用了,整个程序都需要等待。为此,在基于时间的系统中一般是用select
等异步网络API, aio异步磁盘I/O。
基于事件的并发有几个挑战:
- 不允许阻塞调用。
- 状态管理。多线程中,操作系统帮你管理好了。这里你得手工栈管理。一般是设计结构。
- 在多CPU中,要充分利用CPU, 又得并发处理,就有资源竞争需要加锁了。
- 不能很好地与某些类型的系统活动集成。譬如缺页中断,这种隐式的阻塞调用无法处理,频繁发生时可能导致较大的性能问题。
- 随着时间的推移,代码可能难以管理。你必须时刻注意API语义的变化,是阻塞调用还是非阻塞调用。
参考
《操作系统导论》