线程协作

线程通信:生产者消费者模式

应用场景:生产者和消费之问题

  • 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走
  • 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走
  • 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止

Producer(生产者)——–>数据缓冲区———->Consumer( 消费者 )

线程通信–分析:

这是一个线程同步的问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。

  • 对于生产者,没有生产产品之前,要通知消费者等待,而且生产了产品之后,又需要马上通知消费者消费。
  • 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。
  • 在生产者和消费者问题中,仅有synchronized是不够的
    • synchronized可阻止并发更新同一个共享资源,实现了同步
    • synchronized不能用来实现不同的线程之间的消息传递(通信)

Java提供了几个方法解决线程之间的通信问题:

方法名 作用
wait() 表示线程一直等待,直到其他线程通知,与sleep不同,会释放锁(sleep的执行不会释放锁)
wait(long time) 指定等待的毫秒数
notify() 唤醒一个处于等待状态的线程
notifyAll() 唤醒同一个对象上所有调用wait()方法的线程,优先级高的线程优先调度

注意:这些方法均是Object类的方法,都只能在同步方法同步代码块中使用,否则会抛出异常IllegalMonitorStateException

解决方法

并发协作模型“生产者/消费者模式”—–>管程法

  • 生产者:负责生产数据的模块(可能是方法,对象,线程,进程)
  • 消费者:负责处理数据的模块(可能是方法,对象,线程,进程)
  • 缓冲区:消费者不能直接使用生产者的数据,他们之间有个“缓冲区”

生产者将生产好的数据放入缓冲区,消费者从缓冲区拿走数据

Producer(生产者)——–>数据缓冲区———->Consumer( 消费者 )

//测试:生产者消费者模型--->管程法(利用缓冲区解决)
//生产者负责生成鸡肉,消费者负责消费鸡肉,缓冲区容器一次最多装十个鸡肉
public class TestPC {
    public static void main(String[] args) {
        Container container=new Container();
        new Consumer(container).start();
        new Productor(container).start();
    }
}
//生产者
class Productor extends Thread{
    Container container;
    public Productor(Container container){
        this.container=container;
    }
    //生产
    @Override
    public void run() {
        //生产15个鸡肉
        for (int i = 1; i <=15; i++) {
            container.push(new Chicken(i));
            System.out.println("生产了"+i+"只鸡");
        }
    }
}
//消费者
class Consumer extends Thread{
    Container container;
    public Consumer(Container container){
        this.container=container;
    }
    //消费
    @Override
    public void run() {
        for (int i = 1; i <=15; i++) {
            System.out.println("消费了第"+container.pop().id+"只鸡");
        }
    }
}
//产品
class Chicken{
    int id; //产品编号
    public Chicken(int id) {
        this.id = id;
    }
}

//缓冲区
class Container{
  //容器
  Chicken[] chickens=new Chicken[10];
  //容器计数器
    int count=0;
  //生产者放入产品
  public  synchronized void push(Chicken chicken){
      //如果容器满了,就等待消费者消费
      if (count==chickens.length){
         //通知消费者消费,生产等待
          System.out.println("通知消费者消费,生产等待");
          try {
              this.wait();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      //如果容器没有满,就丢入产品
      chickens[count]=chicken;
      count++;
      //唤醒消费者,通知消费者可以消费了
      this.notifyAll();
  }
    //消费者消费产品
    public synchronized Chicken pop(){
      //判断能否消费
        if (count==0){
           //消费者等待生产者生产
            System.out.println("消费者等待生产者生产");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果可以消费
        count--;
        Chicken chicken=chickens[count];
        //消费完,唤醒生产者,通知生产者生产
        this.notifyAll();
        return chicken;
    }
}

结果:

消费者等待生产者生产
生产了1只鸡
生产了2只鸡
生产了3只鸡
消费了第1只鸡
消费了第3只鸡
消费了第2只鸡
消费者等待生产者生产
生产了4只鸡
生产了5只鸡
生产了6只鸡
生产了7只鸡
生产了8只鸡
生产了9只鸡
生产了10只鸡
生产了11只鸡
生产了12只鸡
生产了13只鸡
通知消费者消费,生产等待
消费了第13只鸡
消费了第12只鸡
消费了第11只鸡
消费了第10只鸡
消费了第9只鸡
消费了第8只鸡
消费了第7只鸡
生产了14只鸡
生产了15只鸡
消费了第14只鸡
消费了第15只鸡
消费了第6只鸡
消费了第5只鸡
消费了第4只鸡

Process finished with exit code 0

过程分析:从执行结果可以看出,消费者线程先执行,判断到没有产品不能消费时,执行了wait()方法,线程进入等待状态,并释放了Container(该锁由同步方法synchronized独占,如果wait方法不能释放锁的话,那么生产者线程在消费者线程等待期间便无法操作Container)的锁,此时生产者线程开始执行,在生产了产品后,生产者通过this.notifyAll()唤醒消费者,消费者开始消费。。。。。。。。

并发协作模型“生产者/消费者模式”—–>信号灯法

略略略略略略略略略略略略略略略略略略略略略略略略略略略略略略

线程池

背景

经常创建和销毁,使用量特别大的资源,比如并发情况下的线程,对性能影响很大

思路

提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。 可以避免频繁创建销毁,实现重复利用 。类似生活中的公共交通工具。

好处:

  • 提高响应速度(减少了创建新线程的时间)
  • 降低资源消耗(重复利用线程池中的线程,不需要每次都创建)
  • 便于线程管理(…….)
    • corePoolSize:核心池的大小
    • maximunPoolSize:最大线程数
    • keepAliveTime:线程没有任务时最多保持多长时间后会终止

JDK5.0起提供了线程池的相关API:ExecutorServiceExecutors

使用线程池

ExecutorService:真正的线程池接口。常见的子类ThreadPoolExecutor

  • void execute(Runnable command):执行任务/命令,没有返回值,一般来执行Runnable
  • Futuresubmit(Callabletask):执行任务,有返回值,一般用来执行Callable
  • void shutdown():关闭连接池

Executors:工具类,线程池的工厂类,用于创建并返回不同类型的线程池

测试:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//测试线程池
public class TestPool {
    public static void main(String[] args) {
        //1.创建服务,创建线程池
        //newFixedThreadPool  参数为池的大小
        ExecutorService service= Executors.newFixedThreadPool(10);
        //执行
        service.execute(new MyThread());
        service.execute(new MyThread());
        service.execute(new MyThread());
        service.execute(new MyThread());
        //关闭连接
        service.shutdown();
    }
}

class MyThread implements Runnable{
    @Override
    public void run() {
        for (int i = 1; i < 5; i++) {
            System.out.println(Thread.currentThread().getName()+"  i="+i);
        }
    }
}
pool-1-thread-1  i=1
pool-1-thread-4  i=1
pool-1-thread-3  i=1
pool-1-thread-2  i=1
pool-1-thread-4  i=2
pool-1-thread-1  i=2
pool-1-thread-4  i=3
pool-1-thread-2  i=2
pool-1-thread-3  i=2
pool-1-thread-2  i=3
pool-1-thread-4  i=4
pool-1-thread-1  i=3
pool-1-thread-1  i=4
pool-1-thread-2  i=4
pool-1-thread-3  i=3
pool-1-thread-3  i=4

Process finished with exit code 0