计划:技术栈完善2-实战JAVA高并发程序设计-第二章




Java并行程序基础

进程(Process)

进程:

1.是计算机中的程序关于某数据集合上的一次运行活动.
2.是系统进行资源分配和调度的基本单位
3.是操作系统结构的基础
4.早期,进程是程序的基本执行实体
5.当代,进程是线程的容器
6.程序是指令、数据及其组织形式的描述,进程是程序的实体

我们使用多线程而非使用多进程去进行并发程序的设计,是因为线程间的切换和调度的成本远小于进程.

线程的生命周期

Java中的线程就是继承Runnable,故生命周期如上图所示.

以下是Java.lang.Thread中关于State的枚举定义源码:

    public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */
        NEW,

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
        RUNNABLE,

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */
        BLOCKED,

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called <tt>Object.wait()</tt>
         * on an object is waiting for another thread to call
         * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
         * that object. A thread that has called <tt>Thread.join()</tt>
         * is waiting for a specified thread to terminate.
         */
        WAITING,

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         */
        TIMED_WAITING,

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */
        TERMINATED;
    }

初始线程: 线程的基本操作

新建线程

线程启动时的调用顺序
start()->run()
所以,当我们使用start启动线程时是真正启动了一个线程,而在这个线程中调用run方法.
而如果使用了run(),则代表只是调用了一次run函数.

使用匿名内部类修改run方法,然后启动线程


public class p21 { public static void main(String[] args){ Thread t1=new Thread(){ @Override public void run(){ System.out.println("Hello, I'm t1"); } }; t1.start(); } }

使用Runnable接口来作为Thread的构造参数传入

以下是Runnable接口的源码:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

可以发现,我们只需要实现run方法即可.
当我们调用Thread的run方法时,他会先判断下是否有Rannable,如果有,则调用Runnable的run方法.

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

我们来使用Runnable接口实现线程


public class p21 implements Runnable { public static void main(String[] args){ Thread t1=new Thread(){ @Override public void run(){ System.out.println("Hello, I'm t1"); } }; t1.start(); Thread t2=new Thread(new p21()); t2.start(); } @Override public void run(){ System.out.println("Hello,I'm t2."); } }

这样就避免了重载Thread的run()方法,也是最常用的做法

终止线程

当我们在实现某些功能时,可能会让一些线程常驻在内存中.
那么我们该如何停止这些线程呢?
Thread内部有一个Stop()的方法,但他已被标注为将废弃,因为该方法太过暴力,很有可能造成数据不一致的问题.

因为stop方法会在结束线程时,直接终止线程,并且释放掉这个线程的所有锁.而这些锁则是为了保证对象的一致性.如果此时,写线程写到一半,被强行终止,那么对象的完整性就可能会被破坏.
而因为锁被释放了,所以另一个线程就顺理成章的读到了这个不完整的对象…

测试

我们用一个程序来模拟下上面说的情况:
具体思路为->开启读取线程,如果User名字和id不一样,输出->不停地创建修改线程,修改的id和name一样->修改完成后stop()->观察结果


public class p22 { public static User u=new User(); public static class User{ private int id; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } private String name; public User(){ id=0; name="0"; } @Override public String toString(){ return "User [id="+id+",name="+name+"]"; } } public static class ChangeObjectThread extends Thread{ @Override public void run(){ while(true){ synchronized(u){ int v=(int)(System.currentTimeMillis()/1000); u.setId(v); //Oh,do sth.else try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } u.setName(String.valueOf(v)); } Thread.yield(); } } } public static class ReadObjectThread extends Thread{ @Override public void run(){ while(true){ synchronized(u){ if(u.getId()!=Integer.parseInt(u.getName())){ System.out.println(u.toString()); } } Thread.yield(); } } } public static void main(String[] args) throws InterruptedException { new ReadObjectThread().start(); while(true){ Thread t=new ChangeObjectThread(); t.start(); Thread.sleep(100); t.stop(); } } }

输出:

User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]
User [id=1529221875,name=1529221874]

我们发现会出现很多如此的错误,为什么呢?和之前一样的原因,指令的顺序在优化中可能被更改,而线程的执行顺序也和调度算法有关,所以就造成了有可能某线程对User数据修改时sleep了一段时间,而这段时间内突然被stop了,其他的线程就拿到了不完整的数据.

当然,如果你将两个sleep都设置为0就不会出现这种错误了.

自定义线程停止

如何解决这种问题呢?

我们自行决定线程何时退出就可以了。

  • volatile关键字
    > 添加该关键字的变量是:
    >> 不同线程访问和修改的变量

即该指令不会因为编译器的优化而忽略,且要求每次直接读值.

我们只需要为ChangeObjectThread添加一个方法stopMe(),
当stopme为true的时候才可以读取,为false的时候就禁止读取.并且直接退出run方法.
这也就保证了不会导致修改中途被撤销.


public class p22 { public static User u=new User(); public static class User{ private int id; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } private String name; public User(){ id=0; name="0"; } @Override public String toString(){ return "User [id="+id+",name="+name+"]"; } } public static class ChangeObjectThread extends Thread{ volatile boolean stopme=false; public void stopMe(){ stopme=true; } @Override public void run(){ while(true){ synchronized(u){ if(stopme){ System.out.println("exit by stop me"); break; } int v=(int)(System.currentTimeMillis()/1000); u.setId(v); //Oh,do sth.else try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } u.setName(String.valueOf(v)); } Thread.yield(); } } } public static class ReadObjectThread extends Thread{ @Override public void run(){ while(true){ synchronized(u){ if(u.getId()!=Integer.parseInt(u.getName())){ System.out.println(u.toString()); } } Thread.yield(); } } } public static void main(String[] args) throws InterruptedException { new ReadObjectThread().start(); while(true){ ChangeObjectThread t=new ChangeObjectThread(); t.start(); Thread.sleep(100); t.stopMe(); } } }

线程中断

为了解决stop可能会导致数据冲突的问题,JDK中提供了三个方法来实现线程中断.

线程中断

即线程中断不会使线程立即退出,而是给线程发一个通知,告知目标线程要中断了,至于如何处理,何时中断,由目标决定.


public class p23 { public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Thread t1=new Thread(){ @Override public void run(){ while(true){ if(Thread.currentThread().isInterrupted()){ System.out.println("Interruted!"); break; } Thread.yield(); } } }; t1.start(); Thread.sleep(100); t1.interrupt(); } }

Thread.sleep函数

他的签名如下:

Thread.sleep()会让当前线程休眠若干时间,他会抛出一个InterruptedException中断异常,这个异常不是运行时异常,也就是说程序必须捕获并处理它,当线程休眠时,如果被中断,该异常就会产生.


public class p23 { public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Thread t1=new Thread(){ @Override public void run(){ while(true){ if(Thread.currentThread().isInterrupted()){ System.out.println("Interruted!"); break; } try{ Thread.sleep(2000); }catch(InterruptedException e){ System.out.println("Interrpted When Sleep"); Thread.currentThread().interrupt(); } Thread.yield(); } } }; t1.start(); Thread.sleep(100); t1.interrupt(); } }

所以我们必须在捕捉该异常的同时再次放出中断异常,这样才能保证该线程被正常中断.

wait()与notify()

为了支持多线程间协作,JDK提供了等待wait()和通知notify()两个方法.
但这两个方法不存在Thread类中,而是输出Object类.这也就意味着任意的对象都可以调用该方法.

两个方法签名如下:

public final void wait() throws InterruptedException
public final native void notify()





简单的例子:


public class p24 { final static Object object = new Object(); public static class T1 extends Thread{ public void run(){ synchronized(object){ System.out.println(System.currentTimeMillis()+":T1 start!"); try{ System.out.println(System.currentTimeMillis()+":T1 wait for object"); object.wait(); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println(System.currentTimeMillis()+":T1 end!"); } } } public static class T2 extends Thread{ public void run(){ synchronized(object){ System.out.println(System.currentTimeMillis()+":T2 start! notify one thread"); object.notify(); System.out.println(System.currentTimeMillis()+":T2 end!"); try{ Thread.sleep(2000); }catch(InterruptedException e){ e.printStackTrace(); } } } } public static void main(String[] args) { Thread t1=new T1(); Thread t2=new T2(); t1.start(); t2.start(); } }

结果:

1529292002347:T1 start!
1529292002347:T1 wait for object
1529292002348:T2 start! notify one thread
1529292002348:T2 end!
1529292004348:T1 end!

Tip: wait会释放所有的锁

挂起(suspend)和继续执行(resume)线程

suspend乍看起来和stop或者wait相似简单的用法,但是,值得注意的是,suspend并不会释放任何资源和锁.所以就会导致其他想要索取资源的线程也被牵连.
而且resume也是存在问题,有可能在suspend前执行,这就会导致当前线程的状态被误判.

等待线程结束(join)和谦让(yield)

一个线程需要等待另一个线程的结束才能继续执行(依赖输出)则用join.

public final void join() throws InterruptedExcption

public final synchronized void join(long millis) throws InterruptedException

第一个表示无限等待,他会一直阻塞线程,直到目标线程执行完毕.
第二个表示如果超过一段时间还没等到,则不等待,继续执行.

例:


public class p25 { public volatile static int i=0; public static class AddThread extends Thread{ @Override public void run(){ for(i=0;i<10000000;++i); } } public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub AddThread at=new AddThread(); at.start(); at.join(); System.out.println(i); } }

上述主函数中,如果不执行join来等待线程结束,则更多的可能是只会出现在线程结束前输出i的值(比如0)的情况.
如果用join来等待的话,则最终一定会输出1e7.

join的本质是让调用线程wait()在当前线程对象实例上.

另一个: yield()方法是让当前线程让出CPU,然后重新加入到资源的争抢当中.如果觉得一个线程不是很重要,又害怕它占用过多的CPU,可以调用yield方法.

volatile与Java内存模型(JMM)

使用volatitle就表示告诉了虚拟机这个变量很可能被某线程修改.
虚拟机会特别小心的处理这个变量,尤其是当发现修改的顺序是反的时候.
volatile可以很大程度上保证变量的完整性,但不保证操作的原子性,比如i++的原子操作完整性(i为32位下的长整型)

此外,volatile也能保证数据的可见性和有序性.

例:


public class p26 { private volatile static boolean ready; private static int number; private static class ReaderThread extends Thread{ public void run(){ while(!ready); System.out.println(number); } } public static void main(String[] args) throws InterruptedException{ new ReaderThread().start(); Thread.sleep(1000); number=42; ready=true; Thread.sleep(10000); } }

因为指令的优化,在Server下线程无法”看到”ready”被修改.所以会无限的执行下去,这就是典型的可见性问题.

而加了volatiel修饰后的ready就不会出现这种情况了.

线程组

一个系统中如果存在过多的线程,而且分工比较明确,就可以将相同功能的线程放置在一个线程组内.这样会使效率更高些.


public class p27 implements Runnable{ public static void main(String[] args) { // TODO Auto-generated method stub ThreadGroup tg=new ThreadGroup("PrintGroup"); Thread t1=new Thread(tg,new p27(),"T1"); Thread t2=new Thread(tg,new p27(),"T2"); t1.start(); t2.start(); System.out.println(tg.activeCount()); tg.list(); //tg.stop();-慎用 } @Override public void run(){ String groupAndName=Thread.currentThread().getThreadGroup().getName() +"-"+Thread.currentThread().getName(); while(true){ System.out.println("I am "+groupAndName); try{ Thread.sleep(3000); }catch(InterruptedException e){ e.printStackTrace(); } } } }

驻守后台-守护线程(Daemon)

JVM内部的实现是如果运行的程序只剩下守护线程的话,程序将终止运行,直接结束。所以守护线程是作为辅助线程存在的

线程优先级

Java可以自定义线程的优先级.

设置优先级用 (Thread).setPriority(优先级(1~10))

线程安全概念与synchronized

线程安全是并行程序开发的一大重点.
线程安全例子:


public class p28 implements Runnable{ static p28 instance=new p28(); static volatile int i=0; public static void increase(){ i++; } @Override public void run(){ for(int j=0;j<10000000;++j){ increase(); } } public static void main(String[] args) throws InterruptedException{ // TODO Auto-generated method stub Thread t1=new Thread(instance); Thread t2=new Thread(instance); t1.start(); t2.start(); t1.join();t2.join(); System.out.println(i); } }

如果把t1.join()放到t1.start()后面的话,输出就为正常结果.但如上这样子放的话,也就代表了两个线程实际上是一起执行的.但在某一时刻t1.join开启以后t2就暂停执行等待t1执行完再继续.

我们可以使用synchronized关键字来对同步的代码加锁.使得每一次只能有一个线程进入同步块.

代码在书上的-P58,之前写过很多次了.

并发下的ArrayList

ArrayList是线程不安全的,用Vector代替线程不安全的ArrayList即可.

并发下的HashMap

并发下的HashMap可能会出现死循环的现象,为什么?下面来看一段代码

这段代码证明HashMap的插入是按照链表的方法插入的.而当死循环时就代表当前的HashMap链表被破坏成了环.也就导致了死循环.
(但JDK8已经避免了这种情况的产生)

jps和jstack工具

jps是查看当前所有大线程
jstack是定位到对应的线程以及代码

Integer其实使用工厂方法进行赋值的

如果我们想要给Integer加锁时,我们不能直接在Integer(int)变量上加锁,因为Integer是用工厂方法进行赋值,每次给int赋值时都会重新生成一个Integer对象.
所以我们需要在改变量所在的实例化对象上加锁.

比如:
public class k implements Runnable{
int a;
public void run(){
Code here.
}
}
我们就需要在实例化后的k对象上加锁,而不是在a上加锁.