资讯专栏INFORMATION COLUMN

Java8流的复制

smartlion / 1962人阅读

摘要:只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。方法将原始流中所有的数据添加到各个内,此处实现了复制此处重写了接口,只是简单的从中取出数据,执行。是中数据结束的标示

正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。

Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。

</>复制代码

  1. public class StreamForker {
  2. private final Stream stream;
  3. private final Map, ?>> forks = new HashMap<>();
  4. public StreamForker(Stream stream) {
  5. this.stream = stream;
  6. }
  7. public StreamForker fork(Object key, Function, ?> f) {
  8. forks.put(key, f);
  9. return this;
  10. }
  11. public Results getResults() {
  12. ForkingStreamConsumer consumer = build();
  13. try {
  14. stream.sequential().forEach(consumer);
  15. } finally {
  16. consumer.finish();
  17. }
  18. return consumer;
  19. }
  20. private ForkingStreamConsumer build() {
  21. List> queues = new ArrayList<>();
  22. Map> actions = forks.entrySet().stream().reduce(new HashMap>(),
  23. (map, e) -> {
  24. map.put(e.getKey(), getOperationResult(queues, e.getValue()));
  25. return map;
  26. }, (m1, m2) -> {
  27. m1.putAll(m2);
  28. return m1;
  29. });
  30. return new ForkingStreamConsumer<>(queues, actions);
  31. }
  32. private Future getOperationResult(List> queues, Function, ?> f) {
  33. BlockingQueue queue = new LinkedBlockingQueue<>();
  34. queues.add(queue);
  35. Spliterator spliterator = new BlockingQueueSpliterator<>(queue);
  36. Stream source = StreamSupport.stream(spliterator, false);
  37. return CompletableFuture.supplyAsync(() -> f.apply(source));
  38. }
  39. }

accept方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制

</>复制代码

  1. class ForkingStreamConsumer implements Consumer, Results {
  2. static final Object END_OF_STREAM = new Object();
  3. private final List> queues;
  4. private final Map> actions;
  5. public ForkingStreamConsumer(List> queues, Map> actions) {
  6. this.queues = queues;
  7. this.actions = actions;
  8. }
  9. @Override
  10. public void accept(T t) {
  11. queues.forEach(q -> q.add(t));
  12. }
  13. @SuppressWarnings("unchecked")
  14. void finish() {
  15. accept((T) END_OF_STREAM);
  16. }
  17. @SuppressWarnings("unchecked")
  18. @Override
  19. public R get(Object key) {
  20. try {
  21. return ((Future) actions.get(key)).get();
  22. } catch (Exception e) {
  23. throw new RuntimeException(e);
  24. }
  25. }
  26. }

此处重写了tryAdvance接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中数据结束的标示

</>复制代码

  1. class BlockingQueueSpliterator implements Spliterator {
  2. private final BlockingQueue q;
  3. BlockingQueueSpliterator(BlockingQueue q) {
  4. this.q = q;
  5. }
  6. @Override
  7. public boolean tryAdvance(Consumer action) {
  8. T t;
  9. while (true) {
  10. try {
  11. t = q.take();
  12. break;
  13. } catch (InterruptedException e) {
  14. }
  15. }
  16. if (t != ForkingStreamConsumer.END_OF_STREAM) {
  17. action.accept(t);
  18. return true;
  19. }
  20. return false;
  21. }
  22. @Override
  23. public Spliterator trySplit() {
  24. return null;
  25. }
  26. @Override
  27. public long estimateSize() {
  28. return 0;
  29. }
  30. @Override
  31. public int characteristics() {
  32. return 0;
  33. }
  34. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68084.html

相关文章

  • 乐字节-Java8新特性之Stream流(上)

    摘要:需要注意的是很多流操作本身就会返回一个流,所以多个操作可以直接连接起来,如下图这样,操作可以进行链式调用,并且并行流还可以实现数据流并行处理操作。为集合创建并行流。 上一篇文章,小乐给大家介绍了《Java8新特性之方法引用》,下面接下来小乐将会给大家介绍Java8新特性之Stream,称之为流,本篇文章为上半部分。 1、什么是流? Java Se中对于流的操作有输入输出IO流,而Jav...

    dingda 评论0 收藏0
  • Java8特性②Stream简介

    摘要:元素序列流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间空间复杂度存储和访问元素如与。请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。 流是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外...

    EasonTyler 评论0 收藏0
  • Java8实战》-第四章读书笔记(引入流Stream)

    摘要:内部迭代与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。流只能遍历一次请注意,和迭代器类似,流只能遍历一次。 流(Stream) 流是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我会在后面的笔记中...

    _ivan 评论0 收藏0
  • 乐字节-Java8核心特性实战之Stream(流)

    摘要:大家好,我是乐字节的小乐。需要注意的是很多流操作本身就会返回一个流,所以多个操作可以直接连接起来,如下图这样,操作可以进行链式调用,并且并行流还可以实现数据流并行处理操作。为集合创建并行流。 大家好,我是乐字节的小乐。说起流,我们会联想到手机、电脑组装流水线,物流仓库商品包装流水线等等,如果把手机 ,电脑,包裹看做最终结果的话,那么加工商品前的各种零部件就可以看做数据源,而中间一系列的...

    wenshi11019 评论0 收藏0
  • 用Java 8 的 Stream 来写代码,干净优雅!

    摘要:补充一点使用数值流可以避免计算过程中拆箱装箱,提高性能。其目的主要是打开流,做出某种程度的数据映射过滤,然后返回一个新的流,交给下一个操作使用。终端操作的执行,才会真正开始流的遍历。 Java8的新特性主要是Lambda表达式和流,当流和Lambda表达式结合起来一起使用时,因为流申明式处理数据集合的特点,可以让代码变得简...

    wemall 评论0 收藏0

发表评论

0条评论

smartlion

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<