java多线程编程中为了满足一些应用场景往往需要对线程进行调度,jdk提供了多种调度方法,接下来我们一一来举例说明。
- 信号量同步 主要是为了限流控制线程的并发数,这时我们可以采用这种方法,信号量简单理解为小区入口的闸门,刷卡一次获取一个信号,当然也可以一次占用多个信号,总之信号的总量不变,占用多了能通过的线程就少了,具体我们可以从代码中来进行分析
public class SemaphoreDemo {
private Semaphore semaphore = new Semaphore(10);
public void printNum(int i){
try {
semaphore.acquire(1);
System.out.println("queue = " + semaphore.getQueueLength());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+" semaphore = " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(1);
}
}
public static void main(String[] args){
final SemaphoreDemo demo = new SemaphoreDemo();
for(int i =0; i<100; i++){
final int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
demo.printNum(finalI);
}
}).start();
}
}
}
我们初始化了一个总量为10的信号量池,每一个线程在执行的时候都会获取一个信号 ,其中semaphore.acquire(1); 方法就是获取信号,当然也可以一次获取多个。拿到信号量后,线程开始执行自身的业务逻辑,当任务执行完成或者发生异常后我们需要释放信号量,不然会一直占用信号量,semaphore.acquire方法是阻塞执行,如果没有获取到信号量会一直阻塞代码的执行直到获取信号量成功,如果我们需要让代码异步执行可以尝试semaphore.tryAcquire,如果没有获取到信号量则会返回false告诉调用者,这样我们可以根据结果处理对应的逻辑,当然也可以设置一个超时时间,如果在指定的时间内没有获取到信号量则返回false。
- CountDownLatch 让指定的线程全部都执行完成后在接着做后续的逻辑,可以简单理解为,运动会的时候赛场上裁判必须让运动员就位以后才开始进行比赛,CountDownLatch就是那个场上的裁判。我们来看看代码实现
public class CountDownLatchDemo {
public void race(CountDownLatch latch){
try {
System.out.println(Thread.currentThread().getName()+ " is here");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+ " is done");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
}
}
public static void main(String[] args){
final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatchDemo demo = new CountDownLatchDemo();
for(int i=0;i<4;i++){
new Thread(new Runnable() {
@Override
public void run() {
demo.race(latch);
}
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all is done");
}
}
Thread-0 is here
Thread-1 is here
Thread-2 is here
Thread-3 is here
Thread-1 is done
Thread-3 is done
Thread-2 is done
Thread-0 is done
all is done
通过运行结果我们可以看到,4个线程各自等待3s后在执行最后的代码,每个线程在启动的时候会进行计数,然后latch.await方法就一直处于等待状态,只有计数器为0时才会执行之后的的代码,latch.await方法还可以设置超时时间,超时后会自动执行后续的代码。
- CyclicBarrier 过程同步,简单理解就是每个线程阶段完成各自任务后等待其他线程执行完成,然后在执行下一阶段的任务,完成后在等待其他线程,如此循环往复,就好比赛场上四名运动员跑步,哪个人领先来就停下来等其他的运动员赶上来,最后一起冲刺终点。我们看看在代码中是如何实现的
public class CyclicBarrierDemo {
private Random random = new Random();
public void race(CyclicBarrier barrier){
try {
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到达地点1");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到达地点2");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到达地点3");
barrier.await();
Thread.sleep(random.nextInt(5)*1000);
System.out.println(Thread.currentThread().getName()+"到达地点4");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
final CyclicBarrierDemo demo = new CyclicBarrierDemo();
final CyclicBarrier barrier = new CyclicBarrier(4);
for(int i=0;i<4;i++){
new Thread(new Runnable() {
@Override
public void run() {
demo.race(barrier);
}
}).start();
}
}
}
pool-1-thread-2到达地点1
pool-1-thread-1到达地点1
pool-1-thread-3到达地点1
pool-1-thread-4到达地点1
pool-1-thread-1到达地点2
pool-1-thread-4到达地点2
pool-1-thread-3到达地点2
pool-1-thread-2到达地点2
pool-1-thread-3到达地点3
pool-1-thread-1到达地点3
pool-1-thread-2到达地点3
pool-1-thread-4到达地点3
pool-1-thread-2到达地点4
pool-1-thread-3到达地点4
pool-1-thread-1到达地点4
pool-1-thread-4到达地点4
通过设置屏障点,我们可以看到四个线程都到达屏障点后才执行后续的逻辑,哪一个线程到达屏障点后就自动等待,一直到所有的线程都到达后方才开始执行。
- Phaser是一个更强大的、更复杂的同步辅助类,可以代替CyclicBarrier CountDownLatch的功能,但是比他们更强大。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才能进行下一步。
当我们有并发任务并且需要分解成几步执行的时候,这种机制就非常适合。
CyclicBarrier CountDownLatch 只能在构造时指定参与量,而phaser可以动态的增减参与量。
phaser 使用说明:
使用phaser.arriveAndAwaitAdvance(); //等待参与者达到指定数量,才开始运行下面的代码
使用phaser.arriveAndDeregister(); //注销当前线程,该线程就不会进入休眠状态,也会从phaser的数量中减少
模拟代替CountDownLatch功能,只需要当前线程arriveAndAwaitAdvance()之后运行需要的代码之后,就arriveAndDeregister()取消当前线程的注册。
phaser有一个重大特性,就是不必对它的方法进行异常处理。置于休眠的线程不会响应中断事件,不会抛出interruptedException异常, 只有一个方法会响应:AwaitAdvanceInterruptibly(int phaser).
其他api
arrive():这个方法通知phase对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段,必须小心使用这个方法,因为它不会与其他线程同步。
awaitAdvance(int phase):如果传入的阶段参数与当前阶段一致,这个方法会将当前线程至于休眠,直到这个阶段的所有参与者都运行完成。如果传入的阶段参数与当前阶段不一致,这个方法立即返回。
awaitAdvanceInterruptibly(int phaser):这个方法跟awaitAdvance(int phase)一样,不同处是:该访问将会响应线程中断。会抛出interruptedException异常
将参与者注册到phaser中:
register():将一个新的参与者注册到phase中,这个新的参与者将被当成没有执完本阶段的线程。
bulkRegister(int parties):将指定数目的参与者注册到phaser中,所有这些新的参与者都将被当成没有执行完本阶段的线程。
减少参与者
只提供了一个方法来减少参与者:arriveAndDeregister():告知phaser对应的线程已经完成了当前阶段,并它不会参与到下一阶段的操作中。
强制终止
当一个phaser么有参与者的时候,它就处于终止状态,使用forceTermination()方法来强制phaser进入终止状态,不管是否存在未注册的参与线程,当一个线程出现错误时,强制终止phaser是很有意义的。
当phaser处于终止状态的时候,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一个负数,而不再是一个正值了,如果知道phaser可能会被终止,就需要验证这些方法的值,以确定phaser是不是被终止了。
被终止的phaser不会保证参与者的同步
public class PhaserDemo {
private Random random = new Random();
public void race(Phaser phaser){
try {
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "aaaaa");
phaser.arriveAndAwaitAdvance();
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "bbbbb");
phaser.arriveAndAwaitAdvance();
Thread.sleep(random.nextInt(5)* 1000);
System.out.println(Thread.currentThread().getName()+ "cccc");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
final PhaserDemo demo = new PhaserDemo();
ExecutorService pool = Executors.newCachedThreadPool();
final Phaser phaser = new Phaser(3);
for(int i=0;i<3;i++){
pool.submit(new Runnable() {
@Override
public void run() {
demo.race(phaser);
}
});
}
pool.shutdown();
}
}