资讯专栏INFORMATION COLUMN

2018年第18周-Java语言思想-并发

JouyPub / 878人阅读

摘要:某些编程语言被设计为可以将并发任务彼此隔离,这些语言通常被称为函数性语言。通过使用多线程机制,这些独立任务也被称为子任务中的每一个都将由执行线程来驱动。

并发

之前学的都是顺序编程的知识,学习并发编程就好像进入了一个全新的领域,有点类似于学习了一门新的编程语言,或者至少是学习了一整套新的语言概念。要理解并发编程,其难度与理解面向对象编程差不多。如果花点儿功夫,就能明白其基本机制,但想要抓住其本质,就需要深入的学习和理解。所以看完《Java编程思想》或许会变得过分自信,但写复杂的多线程时,应该多看其他多线程的书籍,关键还是多动手。

“并发是一种具有可论证的确定性,但实际上具有不可确定性。”

使用并发时,你的自食其力,并且只有变得多疑而自信,才能用Java编写出可靠的多线程代码。

用并发解决的问题大体可以分为“速度”和“设计可管理性”两种。 速度

并发解决“速度”问题不仅仅是利用多个CPU去解决分片的问题,也就是说并发不仅仅是多个CPU的事情,也是单个CPU的事情。如果提高程序在单个CPU的性能,就得考虑具体情况,正常情况单个CPU运行多任务(task)是有上下文切换的性能损耗。但在阻塞(Blocking)的情况下就不同了。
我们先看看阻塞的定义:如果程序中的某个任务因为该程序控制范围之外的某些条件(通常是I/O),那我们就说这个任务或线程阻塞了。
如果使用并发来写这个阻塞程序,在一个任务阻塞时,程序中的其他任务还可以继续执行。这样性能会有很大的提升。所以如果没有阻塞的情况,在单CPU使用并发,就没必要了。

在单个CPU的系统中性能提高的常见示例:事件驱动编程(event-driven programing)。

实现并发最直接的方式是在操作系统级别使用进程(process)。多任务操作系统可以通过周期性地将CPU从一个进程切换到另一个进程,来实现同时运行多个进程(程序)。

某些编程语言被设计为可以将并发任务彼此隔离,这些语言通常被称为函数性语言。Erlang就是这样的语言,它包含针对任务之间彼此通信的安全机制。如果你发现程序中某个部分必须大量使用并发,并且你在试图构建这个部分时遇到过多的问题。那么你可以考虑使用像Erlang这类专门的并发语言来创建这个部分。

Java语言采用更加传统的方式,在顺序语言的基础上提供对线程的支持。 Java的目的是“编写一次,到处运行”,所以在OSX之前的Macintosh操作系统版本是不支持多任务,因此Java支持多线程机制,让并发Java程序能够移植到Macintosh和类似的平台上。

设计可管理性

设计可管理性,我更愿意说是一个解决问题的方法模型(程序设计)。线程使你能够创建更加松散耦合的设计。
在单CPU上使用多任务的程序(代码)在任意时刻仍然只能执行一项任务,因此理论上讲,肯定可以不用任何任务就可以编写相同的程序。但是,这样写来的代码可能会很混乱,不方便维护。因此并发提供一种重要的组织结构上的好处:你的程序设计可以极大地简化。某些类似的问题,例如仿真,没有并发的支持是很难解决的。

一般线程调度模式分为:抢占式(preemtive)调度和协同式调度(cooperative).

抢占式调度指的是每条线程执行的时间、线程的切换都是由系统控制,每条线程可能都分同样的的执行时间片(CPU切片),也可能是在某些线程执行的时间片较长,甚至某些线程得不到执行时间片。这种机制下,优点是一个线程阻塞不会导致整个进程堵塞,缺点就是上下文切换开销大
协同式调度指的是某一条线程执行完后主动通知系统切到另一条线程上执行。线程的执行时间由线程本身控制,线程切换可以预知。优点是不存在多线程同步问题,上下文切换开销小,缺点是如果一个线程阻塞了,那么可能造成整个系统崩溃

Java线程机制是抢占式.
线程让出cpu的情况:
1.当前运行线程主动放弃CPU,JVM暂时放弃CPU操作(基于时间片轮转调度的JVM操作系统不会让线程永久放弃CPU,或者说放弃本次时间片的执行权),例如调用yield()方法。
2.当前运行线程因为某些原因进入阻塞状态,例如阻塞在I/O上。
3.当前运行线程结束,即运行完run()方法里面的任务

并发需要付出代价,包含复杂性代价。但这些代价与优化程序设计、资源负载均衡以及用户体验上的改进相比,这些代价就显得微不足道。

线程带来设计上的演变

为了获取线程的结果,于是产生轮询,然后再后来为了解决轮询,引进了静态方法的回调,再后来带来实例方法的回调,最后引出设计模式:策略模式 和Java5引进多线程编程的新方法,通过隐藏细节可以更容易地处理回调——ExecutorService和Futrue

轮询例子:

package com.jc.thread;

import com.jc.thinkinjava.io.util.Directory; 

import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
/**
 * 回调例子-前序
 *
 * 计算文件的256位的SHA-2消息摘要
 * 由于瓶颈在IO上,所以采用多线程
 *
 * 尝试去获取线程返回的值,但发现需要另外个线程不停的轮询,这是很耗cpu资源
 */
@SuppressWarnings("Duplicates")
public class ReturnDigest extends Thread {

    private String fileName;

    private byte[] digest;

    public ReturnDigest(String fileName) {
        this.fileName = fileName;
    }


    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            StringBuilder sb = new StringBuilder(fileName);
            sb.append(":").append(DatatypeConverter.printHexBinary(digest));

            System.out.println(sb.toString());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }


    }


    public byte[] getDigest() {
        return this.digest;
    }


    public static void main(String[] args) {
        File[] files = Directory.local(".", ".*");

        List fileList = new ArrayList();

        for (int i = 0; i < files.length; i++) {
            File file = files[i];
            if (!file.isDirectory()) {
                fileList.add(file);
            }
        }

        ReturnDigest[] digests = new ReturnDigest[fileList.size()];
        for (int i = 0; i < fileList.size(); i++) {
            File file = fileList.get(0);
            digests[i] = new ReturnDigest(file.getAbsolutePath());
            digests[i].start();
        }

        for(int i=0;i

然后为了解决轮询,产生了静态方法的回调:

package com.jc.thread.callback;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
 * 回调例子
 * 静态方法的回调
 */
@SuppressWarnings("Duplicates")
public class CallbackDigest  implements  Runnable{
    private String fileName;

    public CallbackDigest(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            CallbackDigestUserInterface.receiveDigest(digest,fileName);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
package com.jc.thread.callback;

import com.jc.thinkinjava.io.util.Directory;
import com.jc.thread.DigestRunnable;

import javax.xml.bind.DatatypeConverter;
import java.io.File;

/**
 * 回调例子
 * 静态方法的回调
 */
public class CallbackDigestUserInterface {

    public static void receiveDigest(byte[] digest,String fileName){
        StringBuilder sb = new StringBuilder(fileName);
        sb.append(":").append(DatatypeConverter.printHexBinary(digest));

        System.out.println(sb.toString());
    }


    public static void main(String[] args) {
        File[] files = Directory.local(".", ".*");
        for (File file : files) {
            if (!file.isDirectory())
                new Thread(new DigestRunnable(file.getAbsolutePath())).start();
        }
    }


}

实例方法的回调:

package com.jc.thread.callback;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class InstanceCallbackDigest   implements  Runnable{
    private String fileName;
    private InstanceCallbackDigestUserInterface callback;

    public InstanceCallbackDigest(String fileName, InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface) {
        this.fileName = fileName;
        this.callback = instanceCallbackDigestUserInterface;
    }

    @Override
    public void run() {
        try {

//            System.out.println(fileName);
            FileInputStream in = new FileInputStream(fileName);
            MessageDigest sha = MessageDigest.getInstance("SHA-256");
            DigestInputStream digestInputStream = new DigestInputStream(in, sha);
            while (digestInputStream.read() != -1) ;
            digestInputStream.close();
            byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦

            callback.receiveDigest(digest);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
package com.jc.thread.callback;

import com.jc.thinkinjava.io.util.Directory;
import com.jc.thread.ReturnDigest;

import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


/**
 * 回调例子
 * 

* 使用实例方法代替静态方法进行回调 *

* 虽然复杂点,但优点很多。如: * 1. 主类(InstanceCallbackDigestUserInterface)的各个实例映射为一个文件,可以很自然地记录跟踪这个文件的信息,而不需要额外的数据结构 * 2. 这个实例在有必要时可以容易地重新计算某个特定文件的摘要 *

* 实际上,经证明,这种机制有更大的灵活性。 *

* 这种机制,也称为:观察者模式,如Swing、AWT */ public class InstanceCallbackDigestUserInterface { private String fileName; private byte[] digest; public InstanceCallbackDigestUserInterface(String fileName) { this.fileName = fileName; } public void calculateDigest() { InstanceCallbackDigest instanceCallbackDigest = new InstanceCallbackDigest(fileName, this); new Thread(instanceCallbackDigest).start(); } public void receiveDigest(byte[] digest) { this.digest = digest; System.out.println(this); } @Override public String toString() { String result = fileName + ": "; if (digest != null) { result += DatatypeConverter.printHexBinary(digest); } else { result += "digest not available"; } return result; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); List fileList = new ArrayList(); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface = new InstanceCallbackDigestUserInterface(file.getAbsolutePath()); instanceCallbackDigestUserInterface.calculateDigest(); } } }

Java5引进的新方法,ExecutorService和Future:

package com.jc.thread.callback;

import java.util.concurrent.Callable;

public class FindMaxTask implements Callable {


    private int[] data;
    private int start;
    private int end;

    public FindMaxTask(int[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    @Override
    public Integer call() throws Exception {
        int max = Integer.MAX_VALUE;
        for (int i = start; i < end; i++) {
            if (data[i] > max) max = data[i];
        }
        return max;
    }
}
package com.jc.thread.callback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 *
 * Java5引入了多线程编程的一个新方法,通过隐藏细节可以更容易地处理回调
 * 使用回调实现的Futrue
 */
public class MultithreadedMaxFinder {

    public static int max(int[] data) throws ExecutionException, InterruptedException {
        if (data.length == 1) {
            return data[0];
        } else if (data.length == 0) {
            throw new IllegalArgumentException();
        }

        FindMaxTask task1 = new FindMaxTask(data,0,data.length/2);
        FindMaxTask task2 = new FindMaxTask(data,data.length/2,data.length);


        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future future1 = executorService.submit(task1);
        Future future2 = executorService.submit(task2);
        //调用future1.get()时,这个方法会进行阻塞,等待第一个FindMaxTask完成。只有当第一个FindMaxTask完成,才会调用future2.get()
        return Math.max(future1.get(),future2.get());
    }
}
基本线程机制

并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过使用多线程机制,这些独立任务(也被称为子任务)中的每一个都将由执行线程来驱动。
线程模型:一个线程就是进程中的一个单一顺序控制流,因此单个进程可以拥有多个并发执行的任务,感觉每个任务都好像有其CPU一样,其底层机制是切分CPU时间,但通常不用考虑CPU的切片。
线程模型为编程带来便利,它简化了在单一程序中同时交织在一起的多个操作的处理。在使用线程时,CPU将轮流给每个任务分配其占用时间。线程的一大好处是可以使你从这一个层次抽身出来,即代码不必知道它是运行在具有一个还是多个CPU的机子上。
所以,使用线程机制是一种建立透明的、可扩展的程序的方法,如果程序运行得太慢,为机器增添一个CPU就能容易地加快程序的运行速度。多任务和多线程往往是使用多处理器系统的最合理方式。

//此方法调用是对 线程调度器 的一种建议:我已经执行完生命周期中最重要的部分了,此刻正是切换给其他任务执行一段时间的大好时机。
Thread.yield();

Thread.yield();这个方法叫“让步”,不过没有任何机制保证它将会被采纳。

术语

在Java中学习并发编程,总是会让人困惑。让人困惑是那些概念,特别是涉及到线程。
要执行的任务和驱动它的线程,这里的任务和线程是不同的,在Java中会更明细,因为你对Thread类实际没有任何控制权(特别是使用Executor时候)。通过某种方式,将任务附着到线程,以使这个线程可以驱动任务。
在Java中,Thread类自身不执行任何操作,它只是驱动赋予它的任务,但是线程的一些研究中,总是使用这样的话语“线程执行这项或那项动作”,仿佛“线程就是任务”。这一点是让新人是十分困惑的。因为会让人觉得任务和线程是一种“是一个”的关系。觉得应该从Thread继承出一个任务。但实际不是,所以用Task名字会更好。
那为什么Java设计者不用Task而用Thread或Runnable呢? 之所以有上述的困惑(概念混淆),那是因为,虽然从概念上讲,我们应该只关注任务,而不需要关注线程的细节,我们只需要定义任务,然后说“开始”就好。但实际情况是,在物理上,创建线程可能会代价很高,因此需要人工去保存和管理它们。而且Java的线程机制是基于C的低级的P线程(pthread)方式。所以才导致任务和线程这两个概念总是混在一起。站在实现和更抽象的角度,这两者应该分开,所以编写代码时,你必须遵守规则。

为了描述更清楚,因为定义为要执行的工作则为“任务”,引用到驱动任务的具体机制时,用“线程”。 如果只是概念级别上讨论系统,则只用“任务”就行。

加入一个线程

一个线程可以调用其他线程的join()方法,其效果是等待一段时间直到第二个线程结束才继续执行。

package com.jc.concurrency;
/**
 * 一个线程可以等待一个线程完成,那就是用join
 * @author 
 *
 */
class Sleeper extends Thread {
    private int duration;

    public Sleeper(String name, int sleepTime) {
        super(name);
        duration = sleepTime;
        start();
    }

    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) { //异常捕获时会将Interrupted这个标志位重置为false,所以在这里输出false
            System.out.println(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted());
            return;
        }
        System.out.println(getName() + " has awakened");
    }
}

class Joiner extends Thread {
    private Sleeper sleeper;

    public Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }

    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        System.out.println(getName() + " join completed");
    }
}

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500);
        Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy);
        grumpy.interrupt();
    }
}
捕获异常

在main方法是无法捕获到线程里的异常。为解决这个问题,我们修改Executor产生线程的方式。Java SE5中的新接口:Thread.UncaughtExceptionHandler

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 使用Thread.UncaughtExceptionHandler处理线程抛出的异常
 * 
 * MyUncaughtExceptionHandler会新建线程去处理其他线程跑出来的异常
 * 
 * @author 
 *
 */
class ExceptionThread2 implements Runnable {
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by " + t);
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught " + t + ""s " + e);
    }
}

class HandlerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
        return t;
    }
}

public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
    }
}/*
     * output:
     * 
     * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread
     * created Thread[Thread-0,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e run() by
     * Thread[Thread-0,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e
     * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread
     * created Thread[Thread-1,5,main] eh =
     * com.jc.concurrency.MyUncaughtExceptionHandler@5490c2f5 caught
     * Thread[Thread-0,5,main]"s java.lang.RuntimeException
     * 
     * 
     * 
     */

还可以设置默认异常处理器:

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 设置默认的线程异常处理类
 * @author 
 *
 */
public class SettingDefaultHandler {
    public static void main(String[] args) {
        Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new ExceptionThread());
    }
}
线程状态(Thread state)1.新建(new):

一个线程可以处于四种状态之一:新建(new),就绪(Runnable),阻塞(Blocked),死亡(Dead)。
1.新建(new):这是个短暂状态,当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。此刻线程已经有资格获取CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。
2.就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,此状态的线程可以运行也可以不运行。不同于死亡和阻塞状态。
3.阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入了就绪状态,它才有可能执行操作。
4.死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以被中断,中断也是属于死亡。

进入阻塞状态

一个任务进入阻塞状态,可能要有如下原因:

通过调用sleep(milliseconds)使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

通过调用wait()使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent类库中等价的signal()或signalAll()消息),线程才会进入就绪状态。

任务在等待某个输入/输出完成。

任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

在较早的代码中,会有suspend()和resume()来阻塞和唤醒线程,因为容易导致死锁,所以被废止了。

中断

在阻塞状态的线程,可以通过中断来终止该阻塞的任务。Thread类包含interrupt()方法来中断。如果使用Executor,则使用Future的cancel()来中断任务。其实Executor的shutdownNow()方法,就是将发送一个interrupt()调用给它所启动的所有线程。

package com.jc.concurrency;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 中断处于阻塞状态的线程例子  
 * 发现只有sleep()操作的才能中断,其余的io和同步都不能被中断
 * @author 
 *
 */
class SleepBlocked implements Runnable {
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;

    public IOBlocked(InputStream is) {
        in = is;
    }

    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while (true) // Never releases lock
            Thread.yield();
    }

    public SynchronizedBlocked() {
        new Thread() {
            public void run() {
                f(); // Lock acquired by this thread
            }
        }.start();
    }

    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    static void test(Runnable r) throws InterruptedException {
        Future f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Interrupting " + r.getClass().getName());
        f.cancel(true); // Interrupts if running
        System.out.println("Interrupt sent to " + r.getClass().getName());
    }

    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0)");
        System.exit(0); // ... since last 2 interrupts failed
    }
}

发现只有sleep()操作的才能中断,其余的io和同步都不能被中断。所以有个比较不优雅,但有效的关闭方式:

package com.jc.concurrency;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 中断IO阻塞的线程的方式:关闭资源
 * @author 
 *
 */
public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream socketInput = new Socket("localhost", 8080).getInputStream();
        exec.execute(new IOBlocked(socketInput));
        exec.execute(new IOBlocked(System.in));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        exec.shutdownNow();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + socketInput.getClass().getName());
        socketInput.close(); // Releases blocked thread
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + System.in.getClass().getName());
        System.in.close(); // Releases blocked thread
    }
}

之所以要sleep,是想要interrupt都传到各个线程里面。以达到中断的效果。

NIO提供了优雅的I/O中断。

/**
 * NIO提供了优雅的I/O中断
 * @author 
 *
 */
class NIOBlocked implements Runnable {
    private final SocketChannel sc;

    public NIOBlocked(SocketChannel sc) {
        this.sc = sc;
    }

    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            sc.read(ByteBuffer.allocate(1));
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException" + this);
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException" + this);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        System.out.println(sc1);
        System.out.println(sc2);
        Future f = exec.submit(new NIOBlocked(sc1));
        exec.execute(new NIOBlocked(sc2));
        exec.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // Produce an interrupt via cancel:
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // Release the block by closing the channel:
        sc2.close();
    }
}

SleepBlocked例子展示了synchronized的锁是不可以中断,这是很危险的。所以ReentrantLock提供了可中断的能力

package com.jc.concurrency;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * SleepBlocked例子展示了synchronized的锁是不可以中断,这是很危险的。
 * 所以ReentrantLock提供了可中断的能力
 * @author 
 *
 */
class BlockedMutex {
    private Lock lock = new ReentrantLock();

    public BlockedMutex() {
        // Acquire it right away, to demonstrate interruption
        // of a task blocked on a ReentrantLock:
        lock.lock();
    }

    public void f() {
        try {
            // This will never be available to a second task
            lock.lockInterruptibly(); // Special call
            System.out.println("lock acquired in f()");
        } catch (InterruptedException e) {
            System.out.println("Interrupted from lock acquisition in f()");
        }
    }
}

class Blocked2 implements Runnable {
    BlockedMutex blocked = new BlockedMutex();

    public void run() {
        System.out.println("Waiting for f() in BlockedMutex");
        blocked.f();
        System.out.println("Broken out of blocked call");
    }
}

public class Interrupting2 {
    public static void main(String[] args) throws Exception {
        Thread t = new Thread(new Blocked2());
        t.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Issuing t.interrupt()");
        t.interrupt();
    }
}/**output:
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
**/

在没有阻塞的语句时,通过Thread.interrupted()判断线程被中断:

package com.jc.concurrency;

import java.util.concurrent.TimeUnit;

/**
 * 在没有阻塞的语句时,通过Thread.interrupted()判断线程被中断
 * @author 
 *
 */
class NeedsCleanup {
    private final int id;

    public NeedsCleanup(int ident) {
        id = ident;
        System.out.println("NeedsCleanup " + id);
    }

    public void cleanup() {
        System.out.println("Cleaning up " + id);
    }
}

class Blocked3 implements Runnable {
    private volatile double d = 0.0;

    public void run() {
//        try {
            while (!Thread.interrupted()) {
                // point1
                NeedsCleanup n1 = new NeedsCleanup(1);
                // Start try-finally immediately after definition
                // of n1, to guarantee proper cleanup of n1:
                try {
                    System.out.println("Sleeping");
//                    TimeUnit.SECONDS.sleep(1);
                    // point2
                    NeedsCleanup n2 = new NeedsCleanup(2);
                    // Guarantee proper cleanup of n2:
                    try {
                        System.out.println("Calculating");
                        // A time-consuming, non-blocking operation:
                        for (int i = 1; i < 2500000; i++)
                            d = d + (Math.PI + Math.E) / d;
                        System.out.println("Finished time-consuming operation");
                    } finally {
                        n2.cleanup();
                    }
                } finally {
                    n1.cleanup();
                }
            }
            System.out.println("Exiting via while() test");
//        } catch (InterruptedException e) {
//            System.out.println("Exiting via InterruptedException");
//        }
    }
}

public class InterruptingIdiom {
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("usage: java InterruptingIdiom delay-in-mS");
            System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
    }
}
线程协作wait()和notify()

wait()、notify()以及nofityAll()有一个比较特殊的方面,那就是这些方法都是基类Object的方法,而不是Thread的一部分。一开始或许有这种困惑,觉得很奇怪。明明是线程的功能,为啥要放在Object里。那时因为这些方法需要操作锁,当一个任务在方法里遇到wait()的调用时,线程的执行被挂起(阻塞状态),对象上的锁会被是否。因此wait()方法需放在同步控制块里(与之相对比是sleep()因为不用操作锁,所以可以放在非同步控制块里,而且还是Thread的方法)。如果在非同步控制调用这些方法,程序能通过编译,但运行时会抛IllegalMonitorStateException差异。例子:

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * wait()和notifyAll()例子,notifyAll会将该对象的wait()方法所阻塞的线程
 * @author 
 *
 */
class Car {
    private boolean waxOn = false;

    public synchronized void waxed() {
        waxOn = true; // Ready to buff
        notifyAll();
    }

    public synchronized void buffed() {
        waxOn = false; // Ready for another coat of wax
        notifyAll();
    }

    public synchronized void waitForWaxing() throws InterruptedException {
        while (waxOn == false)
            wait();
    }

    public synchronized void waitForBuffing() throws InterruptedException {
        while (waxOn == true)
            wait();
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
    }
}
notify()和nofityAll()

因为可能有多个任务在单个Car对象上处于wait()状态,因此调用nofityAll()比只调用notify()要更安全。所以上面那个程序,只有一个任务,因此可以使用notify()来代替notifyAll()。
使用 notify()而不是notifyAll()是一种优化。除非知道notify()会唤醒具体哪个任务,不如还是notifyAll()保守点
在有关Java的线程机制的讨论中,有一个令人困惑的描述:notifyAll()将唤醒“所有正在等待的任务”。其实更准确是:当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒:

package com.jc.concurrency;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒
 * @author 
 *
 */
class Blocker {
    synchronized void waitingCall() {
        try {
            while (!Thread.interrupted()) {
                wait();
                System.out.print(Thread.currentThread() + " ");
            }
        } catch (InterruptedException e) {
            // OK to exit this way
        }
    }

    synchronized void prod() {
        notify();
    }

    synchronized void prodAll() {
        notifyAll();
    }
}

class Task implements Runnable {
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall();
    }
}

class Task2 implements Runnable {
    // A separate Blocker object:
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall();
    }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++)
            exec.execute(new Task());
        exec.execute(new Task2());
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;

            public void run() {
                if (prod) {
                    System.out.print("
notify() ");
                    Task.blocker.prod();
                    prod = false;
                } else {
                    System.out.print("
notifyAll() ");
                    Task.blocker.prodAll();
                    prod = true;
                }
            }
        }, 400, 400); // Run every .4 second
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("
Timer canceled");
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.print("Task2.blocker.prodAll() ");
        Task2.blocker.prodAll();
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.println("
Shutting down");
        exec.shutdownNow(); // Interrupt all tasks
    }
}

使用wait()和notifyAll()实现生产者和消费者:一个饭店,有一个厨师和一个服务员,这个服务员必须等待厨师准备好食物,当厨师准备好后就会通知服务员,之后服务员上菜,然后服务员继续等待。

package com.jc.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 简单的生产者消费者例子
 * 此例子有点局限因为不能有多线程的生产者、多线程的消费者。
 * 这例子仅仅展示如果使用wait()和notify()保证有序
 * @author 
 *
 */
class Meal {
    private final int orderNum;

    public Meal(int orderNum) {
        this.orderNum = orderNum;
    }

    public String toString() {
        return "Meal " + orderNum;
    }
}

class WaitPerson implements Runnable {
    private Restaurant restaurant;

    public WaitPerson(Restaurant r) {
        restaurant = r;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal == null)
                        wait(); // ... for the chef to produce a meal
                }
                System.out.println("Waitperson got " + restaurant.meal);
                synchronized (restaurant.chef) {
                    restaurant.meal = null;
                    restaurant.chef.notifyAll(); // Ready for another
                }
            }
        } catch (InterruptedException e) {
            System.out.println("WaitPerson interrupted");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;
    private int count = 0;

    public Chef(Restaurant r) {
        restaurant = r;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal != null)
                        wait(); // ... for the meal to be taken
                }
                if (++count == 10) {
                    System.out.println("Out of food, closing");
                    restaurant.exec.shutdownNow();
                }
                System.out.println("Order up! ");
                synchronized (restaurant.waitPerson) {
                    restaurant.meal = new Meal(count);
                    restaurant.waitPerson.notifyAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Chef interrupted");
        }
    }
}

public class Restaurant {
    Meal meal;
    ExecutorService exec = Executors.newCachedThreadPool();
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);

    public Restaurant() {
        exec.execute(chef);
        exec.execute(waitPerson);
    }

    public static void main(String[] args) {
        new Restaurant();
    }
}

使用显式锁Lock和Condition对象:

package com.jc.concurrency.waxomatic2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 使用显式的Lock和Condition对象来修改WaxOMatic例子
 * @author 
 *
 */
class Car {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private boolean waxOn = false;

    public void waxed() {
        lock.lock();
        try {
            waxOn = true; // Ready to buff
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void buffed() {
        lock.lock();
        try {
            waxOn = false; // Ready for another coat of wax
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == false)
                condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void waitForBuffing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn == true)
                condition.await();
        } finally {
            lock.unlock();
        }
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.print("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car c) {
        car = c;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.print("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic2 {
    public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

基于Lock和链表存储结构写的一个消息队列:

package com.jc.framework.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class JcBlockingQueue {

    private JcQueueData head;
    private JcQueueData tail;
    private int size = 0;
    private int maxSize = Integer.MAX_VALUE;
    private final Lock lock;
    private final Condition full;
    private final Condition empty;

    public JcBlockingQueue() {
        lock = new ReentrantLock();
        full = lock.newCondition();     //角度是生产者
        empty = lock.newCondition();    //角度是消费者
    }


    public void enQueue(T t) throws InterruptedException {
        lock.lock();
        if (size == maxSize) {
            full.await();
        }
        if (head == null) {
            head = new JcQueueData<>(t, null);
            tail = head;
            size++;
            empty.signalAll();
            lock.unlock();
            return;
        }


        JcQueueData jcQueueData = new JcQueueData<>(t, null);
        tail.setNext(jcQueueData);
        tail = jcQueueData;
        size++;
        if (size == 1)
            empty.signalAll();
        lock.unlock();

    }

    public T deQueue() throws InterruptedException {
        lock.lock();
        while (head == null) {
            empty.await();
        }

        T t = head.getData();
        if (head.next != null) {
            JcQueueData next = head.next;
            head.next = null;
            head = next;
        } else {
            head = null;
            tail = null;
        }
        size--;
        if(size==maxSize-1)
            full.signalAll();
        lock.unlock();
        return t;
    }

    public int size() {
        return size;
    }


    private class JcQueueData {

        private T data;
        private JcQueueData next;

        public JcQueueData(T data, JcQueueData next) {
            this.data = data;
            this.next = next;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }

        public JcQueueData getNext() {
            return next;
        }

        public void setNext(JcQueueData next) {
            this.next = next;
        }
    }

}
ExecutorService的shutdown

ExecutorService的shutdown方法,这有可能还有工作正在执行或准备执行,这情况下,它只是通知线程池再没有更多任务需要增加到它的内部队列,而且一旦完成所有等待的工作,就应当关闭。

对应的还有shutdownNow(),此方法中止当前处理中的任务,并忽略所有等待的任务。

参考:《Java编程思想》

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69285.html

相关文章

  • 2018年第33-javeer对nodejs体会

    摘要:流程源处理源代码,例如过滤任何值。工艺类从编译后处理生成的文件,例如对类进行字节码增强。整合后的测试执行集成测试后执行所需的操作。校验运行任何检查以验证包装是否有效并符合质量标准。 nodejs和es6 nodejs的语法和es6不一样,如模块系统,一个是CommonJS的require、一个是es6的import,写模块也不一样。 nodejs的npm 我来理解,nodejs类似与j...

    xiongzenghui 评论0 收藏0
  • 2018年第23-大数据的HDFS数据流及操作例子

    摘要:以下是客户端和之间的读数据流图客户端通过对象的方法方法通过对象通过调用,获取文件起始块的位置及其副本的地址,并返回对象。 上周已经把Hadoop的HDFS的架构和设计大概说了下,也有部署过程。在这周讲的HDFS的数据流及操作例子 HDFS数据流 HDFS系统依赖于以下服务1.NameNode2.DataNode3.JournalNode4.zkfc 其中JournalNode和zkfc...

    whlong 评论0 收藏0
  • 2018年第32-获取hive进度功能

    摘要:过程编写简单的工具类编写接口获取相关信息由于涉及到敏感数据处理,就简要说一下其中是关键,会以形式将进度信息输出到这个文件中,文件名大概如下此文件是每次执行的时都会生成一个文件。配置,就是配置,重启和 原理 大概原理时,自己写个hook,配置在hive里,然后hive每次运行sql时会执行hook,而我们写的这个hook会以http请求,发送这个hql相关信息,所以在这里我们还得写一个接...

    shmily 评论0 收藏0
  • 2018年第20-Flume概念(简单例子)

    摘要:两个发布版本和。用于暂存接受过来的事件,直到被消费。使用事务方法保证事件传递的可靠性。一个简单的例子以下是单节点的配置信息这配置可以让用户通过工具连接端口,发送文本,然后在和日志中输出。配置文件可以定一个多个,启动进程时可以指定启动。 Flume 1.8.0 简介 概要 Flume是一个分布式,可靠性和可用性的系统,此系统用于收集、聚合和移动大量数据日志数据,从各种各样的数据源将数据移...

    Nekron 评论0 收藏0
  • 2018年第21-以我角度看大数据

    摘要:吞吐量是对单位时间内完成的工作量的量度。所以在处理全量数据的情况下,目标就是高吞吐量,所以其响应速度可能就无法和传统的关系型数据库媲美了。 以我角度看大数据 最近公司开启了大数据项目,有幸的,我能够有个平台学习和实践大数据。研究和实验了两个月,虽然都是属于个人研究,但还是有所体会。在此分享给大家。既然要从我的角度说大数据,那我得说下我的背景,我写Java已有很多很多年了,工作也快有6年...

    eechen 评论0 收藏0

发表评论

0条评论

JouyPub

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<