一、线程通信

1、wait()notify()notifyAll()

wait()notify()notifyAll()方法是由Object类提供的,这三个方法必须由同步监视器对象来调用:如果使用的是同步方法,则可以直接在同步方法中调用这些方法;如果使用的是同步代码块,则必须用synchronized关键字后面括号中的对象来调用。

  • wait()

让当前线程等待,直到其他线程调用该同步监视器的notify()notifyAll()方法;调用此方法的线程会释放对该同步监视器的锁定。

  • notify()

唤醒在此同步监视器上等待的单个线程(如果有多个线程等待,则唤醒其中任意一个)。

  • notifyAll()

唤醒在此同步监视器上等待的所有线程。

2、样例(未协调线程的运行)

下面的例子中,生产者线程调用工厂的生产方法生产商品,消费者线程调用工厂的消费方法消费商品:

  • 工厂
public class Factory {

	private String commodity;
	
	public synchronized void produce(String commodity){
		if(this.commodity != null){
			System.out.println(String.format("%s 生产的 %s 还未被消费,暂停生产!", Thread.currentThread().getName(), this.commodity));
		}else{
			this.commodity = commodity;
			System.out.println(String.format("%s 生产了 %s", Thread.currentThread().getName(), commodity));
		}
	}
	
	public synchronized void consume(){
		if(this.commodity != null){
			System.out.println(String.format("%s 消费了 %s", Thread.currentThread().getName(), this.commodity));
			this.commodity = null;
		}else{
			System.out.println(String.format("%s 没有商品可消费!", Thread.currentThread().getName()));
		}
	}
}
  • 生产者
public class ProducerThread implements Runnable{

	private Factory factory;
	
	public ProducerThread(Factory factory) {
		this.factory = factory;
	}
	
	@Override
	public void run() {
		String[] commodities = new String[]{
			"香蕉", "苹果", "荔枝", "西瓜", "草莓"
		};
		for(int i = 0; i < 5; i++){
			factory.produce(commodities[i]);
		}
	}
}
  • 消费者
public class ConsumerThread implements Runnable{

	private Factory factory;
	
	public ConsumerThread(Factory factory) {
		this.factory = factory;
	}
	
	@Override
	public void run() {
		for(int i = 0; i < 5; i++){
			factory.consume();
		}
	}
}

程序输出:

生产者 生产了 香蕉
生产者 生产的 香蕉 还未被消费,暂停生产!
生产者 生产的 香蕉 还未被消费,暂停生产!
生产者 生产的 香蕉 还未被消费,暂停生产!
生产者 生产的 香蕉 还未被消费,暂停生产!
消费者 消费了 香蕉
消费者 没有商品可消费!
消费者 没有商品可消费!
消费者 没有商品可消费!
消费者 没有商品可消费!

从上面的结果可以看出,在生产者线程获得执行的时候会一直生产,而消费者线程获得执行的时候会一直消费,两者只是在线程的调度下运行。而线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但可以通过一些方法来让线程之间协调运行。

3、使用waitnotify协调线程运行

修改Factory类如下:

public class Factory {

	private String commodity;
	
	public synchronized void produce(String commodity){
		try {
			if(this.commodity != null){
				System.out.println(String.format("%s 生产的 %s 还未被消费,暂停生产!", Thread.currentThread().getName(), this.commodity));
				wait();
			}else{
				this.commodity = commodity;
				System.out.println(String.format("%s 生产了 %s", Thread.currentThread().getName(), commodity));
				notifyAll();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public synchronized void consume(){
		try {
			if(this.commodity != null){
				System.out.println(String.format("%s 消费了 %s", Thread.currentThread().getName(), this.commodity));
				this.commodity = null;
				notifyAll();
			}else{
				System.out.println(String.format("%s 没有商品可消费!", Thread.currentThread().getName()));
				wait();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

程序输出:

生产者 生产了 香蕉
生产者 生产的 香蕉 还未被消费,暂停生产!
消费者 消费了 香蕉
消费者 没有商品可消费!
生产者 生产了 荔枝
生产者 生产的 荔枝 还未被消费,暂停生产!
消费者 消费了 荔枝
消费者 没有商品可消费!
生产者 生产了 草莓
消费者 消费了 草莓

由于生产者(消费者)notifyAll()时除了唤醒消费者(生产者)外也会唤醒本身,到最后一次时,生产者线程生产了草莓后结束,此时唤醒的只有消费者线程,因此会有上面的输出结果。

4、使用条件变量协调线程运行

如果程序中是使用Lock对象来保证同步的,则需要使用Lock对象的Condition对象(lock.newCondition())的await()signal()signalAll()方法来协调线程的运行。其中,await()方法类似前面的wait()方法,让当前线程等待;signal()signalAll()方法分别类似前面的notify()notifyAll()方法,可以唤醒在此Lock对象上等待的单个线程或所有线程。

对应的Factory类如下:

public class Factory {
	
	private String commodity;
	
	private final Lock lock = new ReentrantLock();
	
	private final Condition condition = lock.newCondition();
	
	public void produce(String commodity){
		lock.lock();
		try {
			if(this.commodity != null){
				System.out.println(String.format("%s 生产的 %s 还未被消费,暂停生产!", Thread.currentThread().getName(), this.commodity));
				condition.await();
			}else{
				this.commodity = commodity;
				System.out.println(String.format("%s 生产了 %s", Thread.currentThread().getName(), commodity));
				condition.signalAll();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
	}
	
	public void consume(){
		lock.lock();
		try {
			if(this.commodity != null){
				System.out.println(String.format("%s 消费了 %s", Thread.currentThread().getName(), this.commodity));
				this.commodity = null;
				condition.signalAll();
			}else{
				System.out.println(String.format("%s 没有商品可消费!", Thread.currentThread().getName()));
				condition.await();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
	}
}

二、线程组

ThreadGroup类是Java中的线程组,使用线程组可以对多个线程分类管理。

在创建线程时可以指定线程所属的线程组,如果没有指定,则属于默认线程组。通常情况下,子线程和创建它的父线程属于同一个线程组。

当线程加入了指定的线程组后,该线程将一直属于该线程组,直到线程死亡,在线程运行时不能改变它所属的线程组。可以通过java.lang.Thread.getThreadGroup()方法获取线程所属的线程组。

  • 样例
public class TestThread implements Runnable{
	@Override
	public void run() {
		//do Something
	}
}

主线程所属的线程组,也是所有线程默认的线程组:

ThreadGroup group = Thread.currentThread().getThreadGroup();
System.out.println(group.getName());//main
System.out.println(group.isDaemon());//false

Runnable target = new TestThread();
Thread thread = new Thread(target);
System.out.println(thread.getThreadGroup().getName());//main

创建线程组:

ThreadGroup threadGroup = new ThreadGroup("自定义线程组");
//也可以在创建时指定父线程组
ThreadGroup group = Thread.currentThread().getThreadGroup();
threadGroup = new ThreadGroup(group, "指定父线程组的线程组");

创建线程时指定线程组:

ThreadGroup threadGroup = new ThreadGroup("自定义线程组");
threadGroup.setDaemon(true);
Thread thread = new Thread(threadGroup, target);
System.out.println(thread.getThreadGroup().getName());//自定义线程组
System.out.println(thread.getThreadGroup().isDaemon());//true
  • 其他常用方法

    • void setDaemon(boolean daemon):设置线程组为后台线程组。当后台线程组的最后一个线程死亡时,后台线程组自动销毁。

    • boolean isDaemon():是否是后台线程组。

    • void interrupt():中断此线程组的所有线程。

    • int activeCount():返回此线程组中活动线程的个数。

三、未处理异常处理器

如果线程在执行过程中抛出了一个未处理异常,JVM在结束该线程之前会检查是否有对应的java.lang.Thread.UncaughtExceptionHandler对象;如果有该对象,则会调用该对象的uncaughtException(Thread t, Throwable e)方法来处理异常。

1、设置异常处理器

Thread类中提供了两种方式设置UncaughtExceptionHandler

  • public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh)

为所有线程实例设置默认的UncaughtExceptionHandler

  • public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh)

设置某个线程实例的UncaughtExceptionHandler

2、异常处理过程

线程所属的线程组是默认的异常处理器(ThreadGroup实现了Thread.UncaughtExceptionHandler接口):

ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
System.out.println(threadGroup == Thread.currentThread().getUncaughtExceptionHandler());//true

当一个线程抛出未处理异常时,JVM首先会查找该异常对应的异常处理器(使用setUncaughtExceptionHandler()方法设置的处理器),如果找到,则调用该异常处理器的uncaughtException()方法;否则,调用线程所属线程组的uncaughtException()方法。

线程组处理异常的过程如下:

  • 如果该线程组有父线程组,则调用父线程组的uncaughtException()方法。

  • 如果有通过Thread.setDefaultUncaughtExceptionHandler()方法设置的处理器,则调用该处理器的uncaughtException()方法。

public class MyExceptionHandler implements Thread.UncaughtExceptionHandler{
	@Override
	public void uncaughtException(Thread t, Throwable e) {
		System.out.println(String.format("%s 线程出现了异常: %s", t.getName(), e));
	}
}
public static void main(String[] args) {
	System.out.println(Thread.getDefaultUncaughtExceptionHandler());//null

	Thread.setDefaultUncaughtExceptionHandler(new MyExceptionHandler());
	//执行下面的代码后输出:main 线程出现了异常: java.lang.ArithmeticException: / by zero
	System.err.println(5/0);
}

四、CallableFuture

1、Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable接口提供了一个call()方法来作为线程执行体,call()Runnalbe接口的run()方法相比,有以下优势:

  • 可以有返回值

  • 可以声明抛出异常

2、Future

public interface Future<V> {
	boolean cancel(boolean mayInterruptIfRunning);

	//Return true if this task was cancelled before it completed
	boolean isCancelled();

	//Return true if this task completed.
	boolean isDone();

	V get() throws InterruptedException, ExecutionException;
	V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口代表Callable接口中call()方法的返回值;它有一个实现类FutureTask,该实现类实现了RunnableFuture两个接口,因此,此类可以作为Thread类的target。

Future接口中部分方法解释如下:

  • boolean cancel(boolean mayInterruptIfRunning)

    试图取消执行此任务。

  • V get()

    返回Callable对象call()方法的返回值。调用此方法会导致程序阻塞,必须等到子线程结束才会得到返回值。

  • V get(long timeout, TimeUnit unit)

    返回Callable对象call()方法的返回值。此方法让程序最多阻塞timeout和unit指定的时间。如果在指定时间内仍然没有得到返回值,将抛出TimeoutException

3、创建并启动线程

  • 创建Callable接口的实现类,此类的call()方法为线程执行体

  • 创建实现类的实例,使用FutureTask类来包装Callable对象

  • 使用包装后的FutureTask类作为Thread的target创建并启动线程

  • 调用FutureTask对象的方法获取线程执行的返回值

public class CallableThread implements Callable<Integer>{

	private int number;

	public CallableThread(int number) {
		this.number = number;
	}
	
	@Override
	public Integer call() throws Exception {
		int sum = 0;
		for(int i = 0; i < number; i++){
			System.out.println(String.format("%s : 第 %s 次累加", Thread.currentThread().getName(), i));
			sum += i;
		}
		return sum;
	}
	
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
	CallableThread callable = new CallableThread(3);
	FutureTask<Integer> task = new FutureTask<>(callable);
	for(int i = 0; i < 5; i++){
		System.out.println(String.format("%s : %s", Thread.currentThread().getName(), i));
		if(i == 2){
			new Thread(task).start();
			System.out.println(String.format("子线程执行结果: %s", task.get()));
		}
	}
}

程序输出:

main : 0
main : 1
main : 2
Thread-0 : 第 0 次累加
Thread-0 : 第 1 次累加
Thread-0 : 第 2 次累加
子线程执行结果: 3
main : 3
main : 4

可以看到,由于在启动子线程后就调用get()方法获取返回值,因此程序阻塞,等获取到返回值后才继续执行。

参考资料: