计划:技术栈完善2-实战JAVA高并发程序设计-第三章

JDK并发包

多线程间的团队协作: 同步控制

比如之前的synchronized关键字就是一种最简单的控制方法.它决定了一个线程是否可以访问临界资源区.
还有wait和notify.

synchronized的功能扩展: 重入锁

重入锁可以完全替代synchronized关键字.

重入锁使用 java.util.concurrent.locks.ReentrantLock
类来实现.

例:

import java.util.concurrent.locks.ReentrantLock;
public class p31 implements Runnable{

    public static ReentrantLock lock=new ReentrantLock();
    public static int i=0;
    @Override
    public void run(){
        for(int j=0;j<10000000;++j){
            lock.lock();
            try{
                i++;
            }finally{
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException{
        // TODO Auto-generated method stub
        p31 tl=new p31();
        Thread t1=new Thread(tl);
        Thread t2=new Thread(tl);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);
    }
}

可以看出这段代码是手动加锁的.故重入锁在逻辑控制的灵活性上远高于某关键字.
但一定注意推出临界区要释放锁
为什么叫重入锁呢?因为允许一个线程获得N个锁,所以叫重入锁.
一个线程获取多个锁后,也必须释放相同次数的锁

重入锁的中断响应

如果你一个线程一直等待锁,而拿锁的那个线程始终不放开锁,那不就死锁了么.
它提供了一种机制,即通知等待者无须再等待.即时停止工作.
isHeldByCurrentThread()方法是返回当前线程是否拥有该锁.
lockInterruptibly()方法是获取一个允许中断响应的锁.
lock()方法获取的锁不允许中断.
例:

import java.util.concurrent.locks.ReentrantLock;

public class p32 implements Runnable {
    public static ReentrantLock lock1=new ReentrantLock();
    public static ReentrantLock lock2=new ReentrantLock();

    int lock;
    /*
     * 控制加锁顺序,防止死锁
     */
    public p32(int lock){
        this.lock=lock;
    }

    @Override
    public void run(){
        try{
            if(lock==1){
                lock1.lockInterruptibly();
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                System.out.println("Lock1 Here IN");
                lock2.lockInterruptibly();
                System.out.println("Lock1 Here OUT");
            }else{
                lock2.lockInterruptibly();
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                System.out.println("Lock2 Here IN");
                lock1.lockInterruptibly();
                System.out.println("Lock2 Here OUT");
            }
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(lock1.isHeldByCurrentThread())
                lock1.unlock();
            if(lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getId()+"线程退出");
        }
    }

    public static void main(String[] args) throws InterruptedException{
        p32 r1=new p32(1);
        p32 r2=new p32(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();t2.start();
        Thread.sleep(1000);
        t2.interrupt();
    }

}

线程启动后,r1先占用lock1,再请求lock2
r2相反,这也就导致了t1和t2互相等待,形成死锁.
而当我们将r2中断以后,r2释放了所有的锁,r1检测到了,故只有r1完全执行完毕,r2则会抛出一个中断异常.

第二种中断方法

lock.tryLock(5,TimeUnit.SECONDS)

县城在这个锁请求中等待5秒,如果五秒内无法得到锁,则False

公平锁

公平锁的效率不高,所以一般不用,因为公平锁需要维护一个优先队列.

公平锁是通过对谁先获得当前资源进行合理的调度来防止死锁的产生.

使用方法: ReentrantLock

性质:

1.原子状态
2.等待队列(没有请求到锁就进入等待队列)
3.阻塞原语pair()与unpair()

重入锁好搭档: Condition条件

Condition和wait与notify的用法大致相同

package s;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class p31 implements Runnable {

    public static ReentrantLock lock=new ReentrantLock();
    public static Condition condition=lock.newCondition();

    @Override
    public void run(){
        try{
            lock.lock();
            System.out.println("SD");
            condition.await();
            System.out.println("This is going on.");
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        p31 tl=new p31();
        Thread t1=new Thread(tl);
        t1.start();
        Thread.sleep(2000);
        ///通知线程t1继续执行
        lock.lock();
        condition.signal();
        System.out.println("AA");
        lock.unlock();
    }

}

注: Condition只能在lock和unlock保护下才可以解锁.
wait 是等待,notify是返回通知开始执行

允许多个线程同时访问: 信号量(Semaphore)

构造函数:

public Semaphore(int permits)
public Semaphore(int permits,boolean fair)
第二个参数是是否公平

信号量主要逻辑方法

public void acquire() -准入许可,等待
public void acquireUninterruptibly() -不接收中断
public boolean tryAcquire() – 获得许可,不等待
public boolean tryAcquire(long timeout,TimeUnit unit)
public void release() – 释放许可

例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class p33 implements Runnable{
    final Semaphore semp=new Semaphore(5);
    public void run(){
        try{
            semp.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId()+":done!");
            semp.release();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ExecutorService exec =Executors.newFixedThreadPool(20);
        final p33 t1= new p33();
        for(int i=0;i<20;++i){
            exec.submit(t1);
        }
    }

}

为信号量传入的5代表线程队列中课同时存在的线程数量的最大值.
开启程序后,你会发现每一瞬间都会有5个线程执行并打印出数据,但在这5个释放占用的信号量后才会继续向下执行.

ReadWriteLock 读写锁

读操作不会破坏数据完整性,所以当读-读-…操作产生时,不需要加锁即可.这样使得大量读操作的系统会有很明显的效率上的提升.

但写会阻塞读,所以效率由写操作的次数来决定.

锁的创建:

private static ReentrantReadWriteLock readWriteLock=new RenntrantReadWriteLock();
private static Lock readLock=readWriteLock.readLock();
private static Lock WriteLock=readWriteLock.WriteLock();

倒计时器: CountDownLatch

它可以让一个线程在倒计时结束后再执行.
执行方式如下:

必须所有的线程都完成任务后,等待在CountDownLatch上的线程才能继续执行.

例:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class p34 implements Runnable{
    static final CountDownLatch end=new CountDownLatch(10);

    static final p34 demo=new p34();

    public void run(){
        try{
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check complete");
            end.countDown();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public static class now{
        public static void print() throws InterruptedException{
            end.await();
            System.out.println("我来了~~~~");
        }
    }

    public static void main(String[] args) throws InterruptedException{
        // TODO Auto-generated method stub
        final now t1=new now();
        ExecutorService exec=Executors.newFixedThreadPool(10);
        for(int i=0;i<10;++i){
            exec.submit(demo);
        }
        t1.print();
        exec.shutdown();
    }
}

循环栅栏: CyclicBarrier

它比上面那个更加复杂和强大

可以将它理解为一种障碍物.它是用来阻止线程继续执行,并且这个计数器可以反复使用,比如,10个执行完以后,再来一遍.

例:

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class p35 {
    public static class Soldier implements Runnable{
        private String soldier;
        private final CyclicBarrier cyclic;
        Soldier(CyclicBarrier cyclic,String s){
            this.cyclic=cyclic;
            this.soldier=s;
        }

        public void run(){
            try{
                //等待所有士兵到齐
                cyclic.await();
                doWork();
                //等待所有士兵完成工作
                cyclic.await();
            }catch(Exception e){
                e.printStackTrace();
            }
        }

        void doWork(){
            try{
                Thread.sleep(Math.abs(new Random().nextInt()%10000));
            }catch(Exception e){
                e.printStackTrace();
            }
            System.out.println(soldier+"任务完成!");
        }

    }

    public static class BarrierRun implements Runnable{
        boolean flag;
        int N;
        public BarrierRun(boolean a,int b){
            this.flag=a;
            this.N=b;
        }
        public void run(){
            if(flag){
                System.out.println("司令:[士兵"+N+"个,任务完成!]");
            }else{
                System.out.println("司令:[士兵"+N+"个,集合完毕!]");
                flag=true;
            }
        }
    }

    public static void main(String[] args) {
        final int N=5;
        Thread[] allSoldier=new Thread[10];
        boolean flag=false;
        CyclicBarrier cyclic=new CyclicBarrier(N,new BarrierRun(flag,N));
        //设置屏障点
        System.out.println("集合队伍!");
        for(int i=0;i<10;++i){
            System.out.println("士兵"+i+"报道!");
            allSoldier[i]=new Thread(new Soldier(cyclic,"士兵 "+i));
            allSoldier[i].start();
        }
    }

}

输出:
集合队伍!
士兵0报道!
士兵1报道!
士兵2报道!
士兵3报道!
士兵4报道!
士兵5报道!
司令:[士兵5个,集合完毕!]
士兵6报道!
士兵7报道!
士兵8报道!
士兵9报道!
司令:[士兵5个,任务完成!]
士兵 8任务完成!
士兵 7任务完成!
士兵 1任务完成!
士兵 0任务完成!
士兵 4任务完成!
司令:[士兵5个,任务完成!]
士兵 5任务完成!
士兵 3任务完成!
士兵 9任务完成!
士兵 2任务完成!
士兵 6任务完成!
司令:[士兵5个,任务完成!]

你会发现是每5个释放一次锁.

LockSupport

它可以在线程内任意位置让其阻塞
之前suspend和resume时,如果resume在suspend前执行,则一定会出现线程被无限挂起,导致无法正常退出.
我们可以用LockSupport解决这一问题.

因为LockSupport是用信号量来实现的.它为每一个线程准备了一个许可,如果许可可用,则park()函数会立即返回,并且消费许可(变为不可用).如果许可不可用,就会被阻塞.
但和信号量不同的是,许可永远只有一个.

LockSupport.park()
LockSupport.unpark(Runnable)
LockSupport.parkNanos()
LockSupport.parkUtil()

此外,如果是用park(Object),则这个阻塞对象会出现在线程Dump中(报错),分析问题就更方便了.

线程复用: 线程池

多线程的软件设计方法确实可以最大限度的发挥现代多核处理器的计算能力,提高生产系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利影响.

一种极简的处理方法:

new Thread(new Runnable(){
    public void run(){
        //do sth
    }
}).start();

这样的线程在run完后就会自动回收,但线程量过大时,则会耗尽CPU和内存资源.

而且如果为每一个小程序都创建一个线程,就可能出现销毁时间远大于该线程实际工作所消耗的时间.

其次,可能因线程过多而爆栈/堆.

大量的线程回收也会给GC造成很大压力,延长GC的停顿时间.

什么是线程池

为了避免系统频繁的创建和销毁线程,我们会尽量的让线程复用.
数据库连接池:
为了避免每次数据库查询都重新建立和销毁数据库连接,我们可以使用数据库连接池保护一些数据库连接,让他们长期在激活状态.当系统需要数据库时,并不是真正创建一个新的连接,而是从连接池中获得一个可用的连接即可.反之,当需要关闭连接时,并不是真的把链接关闭,而是将这个链接还给连接池即可.

线程池:
线程池中,总有那么几个活跃线程,当你需要时,可以从池子中随便拿一个空闲线程,当完成工作时,并不着急关闭线程,而是将这个线程退回到池子,方便别人使用.

换言之,创建线程变成了从池子中获得线程,销毁变成了归还.

JDK内置线程池框架: Executor

框架结构图

关于Executor的设计模式: 生产者-消费者模式和工厂方法

例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class p36 {
    public static class MyTask implements Runnable{
        public void run(){
            System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
            try{
                Thread.sleep(1000);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args){
        // TODO Auto-generated method stub
        MyTask task=new MyTask();
        ExecutorService es=Executors.newFixedThreadPool(5);
        for(int i=0;i<10;++i){
            es.submit(task);
        }
        es.shutdown();
    }

}

可能我们之前一直对为什么我们传入submit的是一个对象,但他们得ID却不同呢?
这是因为线程的ID与对象并无直接关系,线程的ID是直接分配好的.
我们可以尝试打印出this.toString()来查看是否是同一个对象,结果表明确实是同一个对象,如果不想使用用同一个对象来做测试,那就用new MyTask()作为参数就可以了.
但直接new的话,会出现一个很严重的问题,就是new出来的对象的执行顺序可能产生混乱.因为不是同一个对象,所以就不会按照顺序来执行了.
例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

public class p36 { 
    public static ReentrantLock lock=new ReentrantLock();
    public static class MyTask implements Runnable{
        private int kt=0;
        public void cal(int t){
            kt=kt+t;
        }
        public void run(){
            //lock.lock();
            cal(1);
            //lock.unlock();
            //System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
            if(kt>9990)
                System.out.println(this.toString()+" "+kt);
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException{
        // TODO Auto-generated method stub
        MyTask tk=new MyTask(); 
        Thread pk=new Thread(tk);
        ExecutorService es=Executors.newFixedThreadPool(10);

        for(int i=0;i<10000;++i){
            es.submit(tk);
        }
        es.shutdown();
        System.out.println("AAAAAA"+tk.kt);
    }

}

打印后会发现,结果并不是正确的,甚至10000的数据前提下9990也不能保证.
当然,加上锁以后就正常了.

刨根问底: 核心线程池的内部实现

拒接策略

P108

扩展线程池

ThreadPoolExecutor是一个可扩展的线程池
它为我们提供了三个接口

beforeExecute()
afterExecute()
terminated()

字面意思

我们重写一下试试:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class p37 {
    public static class MyTask implements Runnable{
        public String name;

        public MyTask(String name){
            this.name=name;
        }

        public void run(){
            System.out.println("正在执行"+":Thread ID:"+Thread.currentThread().getId()+":Task Name:"+name);
            try{
                Thread.sleep(1000);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        ExecutorService es=new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,new LinkedBlockingDeque<Runnable>()){
            @Override
            protected void beforeExecute(Thread t,Runnable r){
                System.out.println("准备执行: "+((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r,Throwable t){
                System.out.println("执行完成: "+((MyTask)r).name);
            }

            @Override
            protected void terminated(){
                System.out.println("线程池退出");
            }
        };
        for(int i=0;i<5;++i){
            MyTask task=new MyTask("TASK-GEYM-"+i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

注: shutdown方法会等所有的线程执行结束后才关闭线程池.

合理的选择: 优化线程池线程数量

线程池的大小对系统的性能也有影响.过大或过小都不可以.但也不需要特别精确.
一般来说确定线程池的大小需要考虑CPU的数量,内存大小等因素.

注:线程池可能会吃掉异常

而 execute方法会打印出部分异常,
或者修改submit的使用:
Future re=pools.submit(new DivTask(100,0));
re.get();
这样也可以.

扩展ThreadPoolExecutor以显示异常



分而治之: Fork/Join框架

著名的MapReduce也是采用了分而治之的思想,简单来说,如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理10个,然后,分阶段处理100个,将100个结果进行合成.就是1000个结果.
JDK为我们提供了ForkJoinPool线程池.

Fork/join执行逻辑

互相帮助的线程

其中ForkJoinTask有两个重要的子类.关系如下:

RecursiveTask<>
是实现一个compute函数(返回值要与泛型一致)即可.

例:

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class p38 extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public p38(long start,long end){
        this.start=start;
        this.end=end;
    }

    public Long compute(){
        long sum=0;
        boolean canCompute=(end-start)<THRESHOLD;
        if(canCompute){
            ///如果大于THRESHOLD的话才进行分解,否则直接进行即可
            for(long i=start;i<=end;++i){
                sum+=i;
            }
        }else{
            //分成100个小任务(整块)
            long step=(start+end)/100;
            ArrayList<p38> subTasks=new ArrayList<p38>();
            long pos=start;
            for(int i=0;i<100;++i){
                long lastOne=pos+step;
                if(lastOne>end)lastOne=end;
                p38 subTask=new p38(pos,lastOne);
                pos+=step+1;
                subTasks.add(subTask);
                //使用fork提交子任务
                subTask.fork();
            }
            //所有子任务结束后,再次求和
            for(p38 t:subTasks){
                sum+=t.join();
            }
        }
        return sum;
    }


    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ForkJoinPool forkjoinpool=new ForkJoinPool();
        p38 task=new p38(0,200000L);
        ForkJoinTask<Long> result=forkjoinpool.submit(task);
        try{
            long res=result.get();
            System.out.println("sum="+res);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}

什么时候要加锁?

如果只是读操作,没有写操作,则可以不用加锁,此种情形下,变量加上final关键字;
如果有写操作,但是变量的写操作跟当前的值无关联,且与其他的变量也无关联,则可考虑变量加上volatile关键字,同时写操作方法通过synchronized加锁;
如果有写操作,且写操作依赖变量的当前值(如:i++),则getXXX和写操作方法都要通过synchronized加锁。

线程池是自带锁的.

JDK 并发容器


Tip:

这点有点迷:P128

其中CopyOnWrite是高效的读取,在这个容器中,写入不会阻塞读取.

跳表

跳表是一种可以快速查找的数据结构,它有点类似于平衡树,它只需要部分锁即可,而跳表的时间复杂度也是O(log n)

更多的数据结构可以见线程那个包

Done