java8并发学习

in Java with 0 comment

这里继续学习记录java8的并发知识。关于什么是并发,什么是并行,什么是进程,什么是线程,有什么关系区别等等就不贴出来啦。

并发在Java5中首次被引入并在后续的版本中不断得到增强。Java从JDK1.0开始执行线程。在开始一个新的线程之前,你必须指定由这个线程执行的代码,通常称为task。

线程与执行器

Runnable

我们可以通过实现Runnable--一个定义了一个无返回值无参数的run()方法的函数接口,来实现task。

/**
     * Runnable
     * 
     * @author wenqy
     * @date 2020年1月17日 下午3:19:58
     */
    private void testRunnable() {
        Runnable task = () -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(“Hello “ + threadName);
        };
        task.run(); // 非线程方式调用,还在主线程里
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(“Done!”);  // runnable是在打印’done’前执行还是在之后执行,顺序是不确定的
    }

我们可以将线程休眠确定的时间。通过这种方法来模拟长时间运行的任务。

/**
     * 设置线程休眠时间,模拟长任务
     * 
     * @author wenqy
     * @date 2020年1月17日 下午3:25:01
     */
    private void testRunnableWithSleep() {
        Runnable runnable = () -> {
            try {
                String name = Thread.currentThread().getName();
                System.out.println(“Foo “ + name);
                TimeUnit.SECONDS.sleep(1); // 休眠1s
                System.out.println(“Bar “ + name);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
    }

Executor

并发API引入了ExecutorService作为一个在程序中直接使用Thread的高层次的替换方案。Executors支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用,Java进程从没有停止!Executors必须显式的停止-否则它们将持续监听新的任务。

ExecutorService提供了两个方法来达到这个目的--shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止所有正在执行的任务并立即关闭executor。

ExecutorService executor = Executors.newSingleThreadExecutor(); // 单线程线程池
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(“Hello “ + threadName);
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                System.err.println(“my is interrupted”);
            } // 休眠1s
        });
        // => Hello pool-1-thread-1
        // Executors必须显式的停止-否则它们将持续监听新的任务
        try {
            System.out.println(“attempt to shutdown executor”);
            executor.shutdown(); // 等待正在执行的任务执行完
            executor.awaitTermination(5, TimeUnit.SECONDS); // 等待指定时间优雅关闭executor。在等待最长5s的时间后,executor最终会通过中断所有的正在执行的任务关闭
            System.out.println(“wait for 5s to shutdown”);
        } catch (InterruptedException e) {
            System.err.println(“tasks interrupted”);
        } finally {
            if (!executor.isTerminated()) {
                System.err.println(“cancel non-finished tasks”);
            }
            executor.shutdownNow(); // 终止所有正在执行的任务并立即关闭executor
            System.out.println(“shutdown finished”);
        }

Callable

Callables也是类似于runnables的函数接口,不同之处在于,Callable返回一个值。一样提交给 executor services。在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果 123 之前执行完成。

Callable<Integer> task = () -> {
            try {
                TimeUnit.SECONDS.sleep(5); // 休眠5s后返回整数
                return 123;
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(“task interrupted”, e);
            }
        };
        ExecutorService executor = Executors.newFixedThreadPool(1); // 固定线程池
        Future<Integer> future = executor.submit(task);
        System.out.println(“future done? “ + future.isDone());
//      executor.shutdownNow(); // 如果关闭executor,所有的未中止的future都会抛出异常。
        Integer result = future.get(); // 在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果123之前执行完成
        System.out.println(“future done? “ + future.isDone());
        System.out.println(“result: “ + result);
        executor.shutdownNow(); // 需要显式关闭
        System.out.println(“result: “ + future.get());
    }

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行--因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<Integer> future = executor.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                return 123;
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(“task interrupted”, e);
            }
        });
        // 任何future.get()调用都会阻塞,然后等待直到callable中止,传入超时时长终止
        future.get(1, TimeUnit.SECONDS);  // 抛出 java.util.concurrent.TimeoutException

invokeAll

Executors支持通过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。

ExecutorService executor = Executors.newWorkStealingPool(); // ForkJoinPool 一个并行因子数来创建,默认值为主机CPU的可用核心数
        List<Callable<String>> callables = Arrays.asList(
                () -> “task1”,
                () -> “task2”,
                () -> “task3”);
        executor.invokeAll(callables)
            .stream()
            .map(future -> { // 返回的所有future,并每一个future映射到它的返回值
                try {
                    return future.get();
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            })
            .forEach(System.out::println);

invokeAny

批量提交callable的另一种方式就是invokeAny(),它的工作方式与invokeAll()稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。

private static Callable<String> callable(String result, long sleepSeconds) {
        return () -> {
            TimeUnit.SECONDS.sleep(sleepSeconds);
            return result;
        };
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newWorkStealingPool();
        List<Callable<String>> callables = Arrays.asList(
        callable(“task1”, 2),
        callable(“task2”, 1),
        callable(“task3”, 3));
        String result = executor.invokeAny(callables);
        System.out.println(result); // task2
    }

这个例子又使用了另一种方式来创建executor--调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools使用一个并行因子数来创建,默认值为主机CPU的可用核心数。

ScheduledExecutor

为了持续的多次执行常见的任务,我们可以利用调度线程池。ScheduledExecutorService支持任务调度,持续执行或者延迟一段时间后执行。调度一个任务将会产生一个专门的future类型--ScheduleFuture,它除了提供了Future的所有方法之外,他还提供了getDelay()方法来获得剩余的延迟。在延迟消逝后,任务将会并发执行。

为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一个方法用来以固定频率来执行一个任务,另一个方法等待时间是在一次任务的结束和下一个任务的开始之间

/**
     * 获取剩余延迟
     * @throws InterruptedException
     * @author wenqy
     * @date 2020年1月17日 下午4:56:58
     */
    private static void scheduleDelay() throws InterruptedException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Runnable task = () -> System.out.println(“Scheduling: “ + System.nanoTime());
        ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS); // 3s后执行
        TimeUnit.MILLISECONDS.sleep(1337);
        long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
        System.out.printf(“Remaining Delay: %sms\n”, remainingDelay); // 剩余的延迟
        executor.shutdown();
    }
    /**
     * 以固定频率来执行一个任务
     * 
     * @throws InterruptedException
     * @author wenqy
     * @date 2020年1月17日 下午4:57:45
     */
    private static void scheduleAtFixedRate() throws InterruptedException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Runnable task = () -> {
            System.out.println(“at fixed rate Scheduling: “ + System.nanoTime());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        int initialDelay = 0; // 初始化延迟,用来指定这个任务首次被执行等待的时长
        int period = 1;
        executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); // 不考虑任务的实际用时
    }
    /**
     * 以固定延迟来执行一个任务
     *  等待时间 period 是在一次任务的结束和下一个任务的开始之间
     * @throws InterruptedException
     * @author wenqy
     * @date 2020年1月17日 下午5:03:28
     */
    private static void scheduleWithFixedDelay() throws InterruptedException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Runnable task = () -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(“WithFixedDelay Scheduling: “ + System.nanoTime());
            }
            catch (InterruptedException e) {
                System.err.println(“task interrupted”);
            }
        };
        executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
    }

同步与锁

我们学到了如何通过执行器服务同时执行代码。当我们编写这种多线程代码时,我们需要特别注意共享可变变量的并发访问。

int count = 0;
    void increment() {
        count = count + 1;
    }

我们在不同的线程上共享可变变量,并且变量访问没有同步机制,这会产生竞争条件。上面例子被多个线程同时访问,就会出现未知的错误。

我们可以用synchronized关键字支持线程同步

synchronized void incrementSync() {
      count = count + 1;
}

synchronized关键字也可用于语句块

void incrementSync2() {
        synchronized (this) {
            count = count + 1;
        }
    }

java在内部使用所谓的**"监视器"**(monitor),也称为监视器锁(monitor lock)或内在锁( intrinsic lock)来管理同步。监视器绑定在对象上,例如,当使用同步方法时,每个方法都共享相应对象的相同监视器。

所有隐式的监视器都实现了重入(reentrant)特性。重入的意思是锁绑定在当前线程上。线程可以安全地多次获取相同的锁,而不会产生死锁(例如,同步方法调用相同对象的另一个同步方法)

并发API支持多种显式的锁,它们由Lock接口规定,用于代替synchronized的隐式锁。锁对细粒度的控制支持多种方法,因此它们比隐式的监视器具有更大的开销。

ReentrantLock

ReentrantLock类是互斥锁,与通过synchronized访问的隐式监视器具有相同行为,但是具有扩展功能。就像它的名称一样,这个锁实现了重入特性,就像隐式监视器一样。

锁可以通过lock()来获取,通过unlock()来释放。把你的代码包装在try-finally代码块中来确保异常情况下的解锁非常重要。

/**
 * 可重入锁
 * 
 * @author wenqy
 * @date 2020年1月18日 下午3:41:50
 */
private void safeIncreByLock() {
    count = 0;
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.submit(() -> {
        lock.lock();
        try {
            ConcurrentUtils.sleep(1);
        } finally {
            lock.unlock();
        }
    });
    executor.submit(() -> {
        System.out.println(“Locked: “ + lock.isLocked());
        System.out.println(“Held by me: “ + lock.isHeldByCurrentThread());
        boolean locked = lock.tryLock(); // 尝试拿锁而不阻塞当前线程
        // 在访问任何共享可变变量之前,必须使用布尔值结果来检查锁是否已经被获取
        System.out.println(“Lock acquired: “ + locked);
    });
    ConcurrentUtils.stop(executor);
}

ReadWriteLock

ReadWriteLock接口规定了锁的另一种类型,包含用于读写访问的一对锁。读写锁的理念是,只要没有任何线程写入变量,并发读取可变变量通常是安全的。所以读锁可以同时被多个线程持有,只要没有线程持有写锁。这样可以提升性能和吞吐量,因为读取比写入更加频繁。

/**
 * 读写锁
 * 
 * @author wenqy
 * @date 2020年1月18日 下午3:41:21
 */
private void readWriteLock() {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    Map<String, String> map = new HashMap<>();
    ReadWriteLock lock = new ReentrantReadWriteLock();
    executor.submit(() -> {
        lock.writeLock().lock();
        try {
            ConcurrentUtils.sleep(1);
            map.put(“foo”, “bar”);
        } finally {
            lock.writeLock().unlock();
        }
    });
    Runnable readTask = () -> {
        lock.readLock().lock();
        try {
            System.out.println(map.get(“foo”));
            ConcurrentUtils.sleep(1);
        } finally {
            lock.readLock().unlock();
        }
    };
    executor.submit(readTask);
    executor.submit(readTask);
    ConcurrentUtils.stop(executor);
    // 两个读任务需要等待写任务完成。在释放了写锁之后,两个读任务会同时执行,并同时打印结果。
    // 它们不需要相互等待完成,因为读锁可以安全同步获取
}

StampedLock

Java 8 自带了一种新的锁,叫做StampedLock,它同样支持读写锁,就像上面的例子那样。与ReadWriteLock不同的是,StampedLock的锁方法会返回表示为long的标记。你可以使用这些标记来释放锁,或者检查锁是否有效。

/**
     * java8 StampedLock
     *      tampedLock并没有实现重入特性,相同线程也要注意死锁
     * @author wenqy
     * @date 2020年1月18日 下午3:40:46
     */
    private void stampedLock() {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Map<String, String> map = new HashMap<>();
        StampedLock lock = new StampedLock();
        executor.submit(() -> {
            long stamp = lock.writeLock(); // 读锁或写锁会返回一个标记
            try {
                ConcurrentUtils.sleep(1);
                map.put(“foo”, “bar”);
            } finally {
                lock.unlockWrite(stamp);
            }
        });
        Runnable readTask = () -> {
            long stamp = lock.readLock();
            try {
                System.out.println(map.get(“foo”));
                ConcurrentUtils.sleep(1);
            } finally {
                lock.unlockRead(stamp);
            }
        };
        executor.submit(readTask);
        executor.submit(readTask);
        ConcurrentUtils.stop(executor);
    }

此外,StampedLock支持另一种叫做乐观锁(optimistic locking)的模式。乐观的读锁通过调用tryOptimisticRead()获取,它总是返回一个标记而不阻塞当前线程,无论锁是否真正可用。如果已经有写锁被拿到,返回的标记等于0。你需要总是通过lock.validate(stamp)检查标记是否有效。

/**
 * 乐观锁
 *  乐观锁在刚刚拿到锁之后是有效的。和普通的读锁不同的是,乐观锁不阻止其他线程同时获取写锁。
 *  在第一个线程暂停一秒之后,第二个线程拿到写锁而无需等待乐观的读锁被释放。
 *  此时,乐观的读锁就不再有效了。甚至当写锁释放时,乐观的读锁还处于无效状态。
 *  所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。
 * 
 * @author wenqy
 * @date 2020年1月18日 下午3:49:31
 */
private void optimisticLock() {
    System.out.println(“—–>optimisticLock—->”);
    ExecutorService executor = Executors.newFixedThreadPool(2);
    StampedLock lock = new StampedLock();
    executor.submit(() -> {
        long stamp = lock.tryOptimisticRead();
        try {
            System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
            ConcurrentUtils.sleep(1);
            System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
            ConcurrentUtils.sleep(2);
            System.out.println(“Optimistic Lock Valid: “ + lock.validate(stamp));
        } finally {
            lock.unlock(stamp);
        }
    });
    executor.submit(() -> {
        long stamp = lock.writeLock();
        try {
            System.out.println(“Write Lock acquired”);
            ConcurrentUtils.sleep(2);
        } finally {
            lock.unlock(stamp);
            System.out.println(“Write done”);
        }
    });
    ConcurrentUtils.stop(executor);
}
/**
 * 读锁转换为写锁
 * 
 * @author wenqy
 * @date 2020年1月18日 下午4:00:10
 */
private void convertToWriteLock() {
    count = 0;
    System.out.println(“—–>convertToWriteLock—->”);
    ExecutorService executor = Executors.newFixedThreadPool(2);
    StampedLock lock = new StampedLock();
    executor.submit(() -> {
        long stamp = lock.readLock();
        try {
            if (count == 0) {
                stamp = lock.tryConvertToWriteLock(stamp); // 读锁转换为写锁而不用再次解锁和加锁
                if (stamp == 0L) { // 调用不会阻塞,但是可能会返回为零的标记,表示当前没有可用的写锁
                    System.out.println(“Could not convert to write lock”);
                    stamp = lock.writeLock(); // 阻塞当前线程,直到有可用的写锁
                }
                count = 23;
            }
            System.out.println(count);
        } finally {
            lock.unlock(stamp);
        }
    });
    ConcurrentUtils.stop(executor);
}

信号量

除了锁之外,并发API也支持计数的信号量。不过锁通常用于变量或资源的互斥访问,信号量可以维护整体的准入许可。

/**
     * 信号量
     * 
     * @author wenqy
     * @date 2020年1月18日 下午4:13:11
     */
    private void doSemaphore() {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Semaphore semaphore = new Semaphore(5); // 并发访问总数
        Runnable longRunningTask = () -> {
            boolean permit = false;
            try {
                permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
                if (permit) {
                    System.out.println(“Semaphore acquired”);
                    ConcurrentUtils.sleep(5);
                } else { // 等待超时之后,会向控制台打印不能获取信号量的结果
                    System.out.println(“Could not acquire semaphore”);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            } finally {
                if (permit) {
                    semaphore.release();
                }
            }
        };
        IntStream.range(0, 10)
            .forEach(i -> executor.submit(longRunningTask));
        ConcurrentUtils.stop(executor);
    }

原子变量

java.concurrent.atomic包包含了许多实用的类,用于执行原子操作。如果你能够在多线程中同时且安全地执行某个操作,而不需要synchronized关键字或锁,那么这个操作就是原子的。

本质上,原子操作严重依赖于比较与交换(CAS),它是由多数现代CPU直接支持的原子指令。这些指令通常比同步块要快。所以在只需要并发修改单个可变变量的情况下,我建议你优先使用原子类,而不是锁。可以看下java包提供的一些原子变量例子AtomicInteger、LongAdder 、LongAccumulator、concurrentHashMap等等。

/**
     * AtomicInteger
     *      incrementAndGet
     * @author wenqy
     * @date 2020年1月18日 下午4:27:10
     */
    private void atomicIntegerIncre() {
        AtomicInteger atomicInt = new AtomicInteger(0);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        IntStream.range(0, 1000)
            .forEach(i -> executor.submit(atomicInt::incrementAndGet)); // ++
        ConcurrentUtils.stop(executor);
        System.out.println(atomicInt.get());    // => 1000
    }
    /**
     * AtomicInteger
     *  updateAndGet
     * @author wenqy
     * @date 2020年1月18日 下午4:29:26
     */
    private void atomicIntegerUpdateAndGet() {
        AtomicInteger atomicInt = new AtomicInteger(0);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        IntStream.range(0, 1000)
            .forEach(i -> {
                Runnable task = () ->
                    atomicInt.updateAndGet(n -> n + 2); // 结果累加2
                executor.submit(task);
            });
        ConcurrentUtils.stop(executor);
        System.out.println(atomicInt.get());    // => 2000
    }
    /**
     * LongAdder
     *      AtomicLong的替代,用于向某个数值连续添加值
     *      内部维护一系列变量来减少线程之间的争用,而不是求和计算单一结果
     *      当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。
     *      这种情况在抓取统计数据时经常出现,例如,你希望统计Web服务器上请求的数量。
     *      LongAdder缺点是较高的内存开销,因为它在内存中储存了一系列变量。
     * @author wenqy
     * @date 2020年1月18日 下午4:33:29
     */
    private void longAdder() {
        LongAdder adder = new LongAdder();
        ExecutorService executor = Executors.newFixedThreadPool(2);
        IntStream.range(0, 1000)
            .forEach(i -> executor.submit(adder::increment));
        ConcurrentUtils.stop(executor);
        System.out.println(adder.sumThenReset());   // => 1000
    }
    /**
     * LongAccumulator
     *      LongAccumulator是LongAdder的更通用的版本
     *      内部维护一系列变量来减少线程之间的争用
     * @author wenqy
     * @date 2020年1月18日 下午4:35:11
     */
    private void longAccumulator() {
        LongBinaryOperator op = (x, y) -> 2 * x + y;
        LongAccumulator accumulator = new LongAccumulator(op, 1L);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        // i=0  2 * 1 + 0 = 2;
        // i=2  2 * 2 + 2 = 6;
        // i=3  2 * 6 + 3 = 15;
        // i=4  2 * 15 + 4 = 34;
        IntStream.range(0, 10)
            .forEach(i -> executor.submit(() -> {
                        accumulator.accumulate(i);
                        System.out.println(“i:” + i + ” result:” + accumulator.get());
                    })
                );
        // 初始值为1。每次调用accumulate(i)的时候,当前结果和值i都会作为参数传入lambda表达式。
        ConcurrentUtils.stop(executor);
        System.out.println(accumulator.getThenReset());     // => 2539
    }
    /**
     * concurrentMap
     * 
     * @author wenqy
     * @date 2020年1月18日 下午4:38:09
     */
    private void concurrentMap() {
        System.out.println(“—–>concurrentMap—–>”);
        ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
        map.put(“foo”, “bar”);
        map.put(“han”, “solo”);
        map.put(“r2”, “d2”);
        map.put(“c3”, “p0”);
        map.forEach((key, value) -> System.out.printf(“%s = %s\n”, key, value));
        String value = map.putIfAbsent(“c3”, “p1”);
        System.out.println(value);    // p0  提供的键不存在时,将新的值添加到映射
        System.out.println(map.getOrDefault(“hi”, “there”));    // there 传入的键不存在时,会返回默认值
        map.replaceAll((key, val) -> “r2”.equals(key) ? “d3” : val);
        System.out.println(map.get(“r2”));    // d3
        map.compute(“foo”, (key, val) -> val + val);
        System.out.println(map.get(“foo”));   // barbar 转换单个元素,而不是替换映射中的所有值
        map.merge(“foo”, “boo”, (oldVal, newVal) -> newVal + ” was “ + oldVal);
        System.out.println(map.get(“foo”));   // boo was foo
    }
    /**
     * concurrentHashMap
     * 
     * @author wenqy
     * @date 2020年1月18日 下午4:38:42
     */
    private void concurrentHashMap() {
        System.out.println(“—–>concurrentHashMap—–>”);
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put(“foo”, “bar”);
        map.put(“han”, “solo”);
        map.put(“r2”, “d2”);
        map.put(“c3”, “p0”);
        map.forEach(1, (key, value) ->
        System.out.printf(“key: %s; value: %s; thread: %s\n”,
            key, value, Thread.currentThread().getName())); // 可以并行迭代映射中的键值对
        String result = map.search(1, (key, value) -> {
            System.out.println(Thread.currentThread().getName());
            if (“foo”.equals(key)) { // 当前的键值对返回一个非空的搜索结果
                return value; // 只要返回了非空的结果,就不会往下搜索了
            }
            return null;
        }); // ConcurrentHashMap是无序的。搜索函数应该不依赖于映射实际的处理顺序
        System.out.println(“Result: “ + result);
        String searchResult = map.searchValues(1, value -> {
            System.out.println(Thread.currentThread().getName());
            if (value.length() > 3) {
                return value;
            }
            return null;
        }); // 搜索映射中的值
        System.out.println(“Result: “ + searchResult);
        String reduceResult = map.reduce(1,
            (key, value) -> {
                System.out.println(“Transform: “ + Thread.currentThread().getName());
                return key + “=” + value;
            },
            (s1, s2) -> {
                System.out.println(“Reduce: “ + Thread.currentThread().getName());
                return s1 + “, “ + s2;
            });
        // 第一个函数将每个键值对转换为任意类型的单一值。
        // 第二个函数将所有这些转换后的值组合为单一结果,并忽略所有可能的null值
        System.out.println(“Result: “ + reduceResult);
    }

我们学了java8的一些新特性和并发编程例子,暂且告一段落了,demo已上传至github:https://github.com/wenqy/java-study

参考

https://github.com/winterbe/java8-tutorial java8教程

https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站

https://github.com/wenqy/java-study 学习例子