进程
进程(Process)是系统资源分配的基本单位,操作系统为每个进程分配独立的地址空间。
- 一个进程无法访问另一个进程的变量和数据结构。
- 一个进程的崩溃不会影响另一个进程。
进程是系统中正在运行的一个程序,程序一旦运行就是进程。进程可以看成程序执行的一个实例。(程序是静态的,进程是动态的)
用户空间和内核空间
- 用户空间:运行用户程序,进程间独立。
- 内核空间:运行内核程序,具备更高的权限,资源共享。
由于内核空间是共享的,因此用户空间的进程交换需要通过内核。
由于设备会进行访问控制(Access Control),因此用户空间的程序无法直接访问设备资源,也需要通过内核。
CPU从用户空间切换到内核空间主要有两种方式:
- 中断(Interrupt):CPU在每个时钟周期末尾检测是否有中断信号到达,如果有,则根据优先级决定是否去处理中断指令。(可以看作是CPU级别的轮询)
- 系统调用(System Call):操作系统提供给应用程序的接口,通过System Call调用系统内核操作。(例如设备读写)
进程间的关系
多进程之间存在两种关系:
- 竞争关系(间接制约):进程之间没有直接关系,彼此不知道各自的存在。但是由于这些进程共用一套计算机资源(硬件设备、存储器、处理器、文件等),因此会出现资源竞争。
- 协作关系(直接制约):多个进程为了完成同一个任务需要分工协作,一方先执行完成后会阻塞,等到其他合作进程都执行完成之后,再通过协调信号或消息唤醒,再继续执行。协作关系又分为两种
- 松散协作:例如共同写文件,A进程先写完前半段,B进程等A写完之后再写后半段。
- 紧密协作:进程间需要互相通信。例如共同写文件,A进程把前半段数据发给B进程,B进程把两段内容合到一起,写入文件。
协作可能需要竞争,也可以不需要竞争
竞争关系可以看作是一种特殊的协作,对资源使用上的协调
竞争会产生两个问题:
- 死锁(deadlock):多个并发进程因争夺系统资源而产生相互等待的现象。(两个进程都获得了部分资源,并且还需要其他进程所占有的资源。)
- 饥饿(starvation):一个进程由于其他进程优先,而被无限期拖延。
死锁
死锁的必要条件:
- 互斥条件:一个资源每次只能被一个进程使用。
- 请求与保持条件:一个进程因请求资源而阻塞时,不释放已获得的资源。
- 不剥夺条件:进程已获得的资源,在未使用完成前,不能被强行剥夺。
- 循环等待条件:多个进程之间形成头尾相接的循环等待资源关系。A等待B,B等待C,C等待A
预防死锁:
- 打破互斥条件:允许进程同时访问某个资源
- 打破请求与保持:资源利用率低,降低并发性,不能动态分配资源
- 一次性申请全部资源
- 进程获得运行初期需要的资源,运行过程中逐步释放掉已经用完的资源,再请求新的资源
- 打破不可抢占:进程请求新的资源时,如果无法被满足,则释放所占有的资源,以后重新申请。
- 打破循环等待:实行资源有序分配策略。采用这种策略,即把资源事先分类编号,按号分配,使进程在申请,占用资源时不会形成环路。所有进程对资源的请求必须严格按资源序号递增的顺序提出。进程占用了小号资源,才能申请大号资源。
避免死锁:
- 如果一个进程的请求会导致死锁,则不启动该进程
- 如果一个进程增加资源的请求会导致死锁 ,则拒绝该申请。
临界资源:一个资源同时只能被一个进程访问
临界区:访问临界资源的代码,例如Java synchronized
中编写的代码就是临界区
互斥、同步、通信
网上的文章经常把进程/线程的同步和通信方式放一起,很混乱。这里做下区分,谈下自己的理解。以进程为例(线程同理)
为了解决进程间竞争的关系,引入了互斥,为了解决进程间的协作关系,引入了同步
- 进程的互斥(mutual exclusion):指若干个进程要使用同一共享资源时,任何时刻最多允许一个进程去使用,其他要使用该资源的进程必须等待,直到占有资源的进程释放该资源。
- 进程的同步(synchronization):指两个以上进程基于某个条件来协调它们的活动。一个进程的执行依赖于另一个协作进程的消息或信号,当一个进程没有得到来自于另一个进程的消息或信号时则需等待,直到消息或信号到达才被唤醒。
- 进程的通信(Inter-Process Communication):两个进程间建立紧密的联系,互相交换数据或信息。
跨进程调用本质也是进程间的通信,只不过发送的信息是要调用的方法名。双方定义一套协议,接收到该消息的时候调用指定的方法。
进程互斥是一种特殊的同步:互斥是B进程等A进程释放资源,同步可以是B进程等A进程发送消息,也可以是等A进程释放资源
进程同步是一种特殊的通信,无法交换数据,只是传递信号或状态。进程间通信更倾向于数据交换
从名词上解读,感觉同步更像是目的,为了确定进程的执行顺序,或者同步双方数据和状态。
通信更像是一种机制,可以利用这种机制实现同步。当然也可以用来做其他事情。
进程间通信
为了做区分,这里认为同步方式用于交换少量信息(低级通信机制),通信方式用于交换数据(高级通信机制)。
或者认为同步一般针对访问临界资源,控制进程执行顺序,通信则没有限制。
当然进程间通信方式也能用来做同步,同步方式则不一定能用来通信。
锁是一种同步方式
同步方式:
信号量(semophore) + 原语操作(PV):限制同一时刻访问资源的进程数目。(P申请资源,S减1,V释放资源,S加1)
- 信号量S:整型变量,需要初始化值大于0
- P原语:表示减少信号量,该操作必须是原子的。原子减少,然后如果
S < 0
,则阻塞当前线程 - V原语:表示增加信号量,该操作必须是原子的。原子增加,然后如果
S <= 0
,则唤醒一个阻塞的线程
互斥锁(mutex):是一种特殊的信号量,S=1。只允许一个进程访问。
- 信号(signal): 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
通信方式:
- 管道(Pipe):一种半双工的通信方式,数据只能单向流动,并且只有父子进程能通信
- 有名管道(named pipe)FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系的进程通信。
- 消息队列(MessageQueue):将消息链表存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 套接字(socket):可用于不同机器间的进程通信。
- 共享内存(shared memory):共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。
- 内存映射(mmap)
进程和线程
进程和线程都可用于并发。
一个进程至少拥有一个线程去执行。程序启动默认会开启一个主线程。
一个进程可以拥有多个线程,共享进程的地址空间和资源。每个线程拥有独立的栈空间。
区别:
- 进程之间地址空间和资源独立,线程间共享本进程的地址空间和资源(例如内存、I/O、CPU、文件等),一个线程修改了内存,另一个线程可以知道。
- CPU在进程间调度和切换开销较大,在线程间调度和切换开销较小。
- 进程通信往往需要内核参与,而线程通信是在同一地址空间内,无需内核参与,效率更高。(需要进行同步和互斥,保证数据的一致性)
- 进程是一个可执行程序的实体,线程不是,线程不能脱离进程单独执行。
类比:把计算机当作一个工厂
- 进程是车间,车间分为多块区域(内存)
- 线程是工人,线程可以共享车间中的区域:线程共享进程的地址空间和资源
- 一个车间(进程)可以有多个工人(线程):进程执行单个任务,线程将一个任务拆分,共同完成一个任务
- 每个区域的大小不同,有的房间只能容纳一个人,其他人想进去需要等它结束。为了防止其他人进去,可以上一把锁:互斥锁
- 有的区域可以容纳n个人,可以在门口放n把钥匙,每次进去则取走一把钥匙,出来时再放回原处,后来的人需要等多余的钥匙空出来:信号量,互斥锁是信号量的特殊情况。
- 每个车间互相独立,不同车间里的工人不能互相交流、访问:进程间资源独立
- 不同车间需要通信有几种方式:
- 在车间外有个公共厕所,所有人都可以访问,但是可能需要一把钥匙或者等其他人用完:共享内存
- 车间划分出一块区域,让其他人能够进去:内存映射
- 两个车间各安装一部电话,可以打电话交流:Socket通信
生产者-消费者模型
介绍:生产者和消费者不直接通信,而是通过一个阻塞队列进行通信,相当于缓冲区。
适用于不同线程、不同进程、不同主机
目的:平衡生产者和消费者的处理能力。
优点:
- 解耦:通过一个中间人(缓冲区)隔离生产者和消费者
- 支持并发:生产和消费可以同时进行
- 数据缓冲:来不及消费的数据先放在缓冲区
- 多生产者、多消费者
实例:
- 线程池:生产者提交任务到线程池,线程池根据条件决定是交给线程执行,还是放到任务队列中
- 消息队列
- 管道:单向传输,例如shell的
|
管道符和标准输入/输出关联两个进程
实现:
- 缓冲区满了,生产者等待
- 缓冲区为空,消费者等待
- 生产者和消费者同时只能有一方访问缓冲区,因此需要加锁
问题:
- 频繁push、pop,造成内存分配性能开销大:使用环形缓冲区,在固定空间中进行分配
- 同步和互斥性能开销:使用双缓冲区
环形缓冲区实现思路:
- 使用数组或者链表,头尾相连
- 通过两个索引记录生产者插入位置(W),消费者移除位置(R),W追上R表示缓冲区已满,R追上W表示缓冲区为空
- 判断缓冲区是否已满:通过一个额外变量记录大小,当R和W重叠时,判断该变量
代码实现
使用synchronized
这里把生产和消费放在同一个类中:
- 共用一个list,容量为1
- 生产和消费耗时操作其实不需要加锁,只有操作列表时才需要加锁
- 使用
synchronized
修饰方法,作用的是调用该方法的对象的锁。如果放在两个类中,则需要提供一个共享对象,用于加锁:synchronized(object)
import java.util.ArrayList;
public class Main {
public static void main(String[] args) {
MyService service = new MyService();
// 先创建哪个都一样
new ConsumerThread(service).start();
new ProducerThread(service).start();
}
static class MyService {
final ArrayList<Integer> list = new ArrayList<Integer>();
synchronized void produce() {
try{
while(!list.isEmpty()) {
System.out.println("生产者:队列不为空,等待");
this.wait();
}
// 生成100以内的随机数
int data = (int) (Math.random() * 100);
System.out.println("生产数据:" + data);
// 模拟耗时操作
Thread.sleep(2000);
list.add(data);
// 唤醒消费者
this.notifyAll();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
synchronized void consume() {
try{
while(list.isEmpty()) {
System.out.println("消费者:队列为空,等待");
this.wait();
}
int data = list.get(0);
System.out.println("消费数据:" + data);
// 模拟耗时操作
Thread.sleep(2000);
list.clear();
// 唤醒生产者
this.notifyAll();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
static class ProducerThread extends Thread {
MyService service;
ProducerThread(MyService service) {
this.service = service;
}
@Override
public void run() {
while(true) {
service.produce();
}
}
}
static class ConsumerThread extends Thread {
MyService service;
ConsumerThread(MyService service) {
this.service = service;
}
@Override
public void run() {
while(true) {
service.consume();
}
}
}
}
使用Lock+Condition
和synchronized
类似,只不过加锁方式换了而已
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class Main {
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
ArrayList<Integer> list = new ArrayList<Integer>();
// 先创建哪个都一样
new ConsumerThread(list).start();
new ProducerThread(list).start();
}
static class ProducerThread extends Thread {
ArrayList<Integer> list;
ProducerThread(ArrayList<Integer> list) {
this.list = list;
}
void produce() {
lock.lock();
try{
while(!list.isEmpty()) {
System.out.println("生产者:队列不为空,等待");
condition.await();
}
// 生成100以内的随机数
int data = (int) (Math.random() * 100);
System.out.println("生产数据:" + data);
// 模拟耗时操作
Thread.sleep(2000);
list.add(data);
// 唤醒消费者
condition.signalAll();
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
while(true) {
produce();
}
}
}
static class ConsumerThread extends Thread {
ArrayList<Integer> list;
ConsumerThread(ArrayList<Integer> list) {
this.list = list;
}
void consume() {
lock.lock();
try{
while(list.isEmpty()) {
System.out.println("消费者:队列为空,等待");
condition.await();
}
int data = list.get(0);
System.out.println("消费数据:" + data);
// 模拟耗时操作
Thread.sleep(2000);
list.clear();
// 唤醒生产者
condition.signalAll();
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
while(true) {
consume();
}
}
}
}