资讯专栏INFORMATION COLUMN

如何以并发方式在同一个流上执行多种操作?--复制流

王晗 / 3317人阅读

摘要:正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量,也许性能会有提高。

正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。
Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。

public class StreamForker {
    private final Stream stream;
    private final Map, ?>> forks = new HashMap<>();

    public StreamForker(Stream stream) {
        this.stream = stream;
    }

    public StreamForker fork(Object key, Function, ?> f) {
        forks.put(key, f);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer consumer = build();
        try {
            stream.sequential().forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer build() {
        List> queues = new ArrayList<>();

        Map> actions = forks.entrySet().stream().reduce(new HashMap>(),
                (map, e) -> {
                    map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                    return map;
                }, (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                });

        return new ForkingStreamConsumer<>(queues, actions);
    }

    private Future getOperationResult(List> queues, Function, ?> f) {
        BlockingQueue queue = new LinkedBlockingQueue<>();
        queues.add(queue);
        Spliterator spliterator = new BlockingQueueSpliterator<>(queue);
        Stream source = StreamSupport.stream(spliterator, false);
        return CompletableFuture.supplyAsync(() -> f.apply(source));
    }
}

accept方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制

class ForkingStreamConsumer implements Consumer, Results {
    static final Object END_OF_STREAM = new Object();

    private final List> queues;
    private final Map> actions;

    public ForkingStreamConsumer(List> queues, Map> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public void accept(T t) {
        queues.forEach(q -> q.add(t));
    }

    @SuppressWarnings("unchecked")
    void finish() {
        accept((T) END_OF_STREAM);
    }

    @SuppressWarnings("unchecked")
    @Override
    public  R get(Object key) {
        try {
            return ((Future) actions.get(key)).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

此处重写了tryAdvance接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中数据结束的标示

class BlockingQueueSpliterator implements Spliterator {
    private final BlockingQueue q;

    BlockingQueueSpliterator(BlockingQueue q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer action) {
        T t;
        while (true) {
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {
            }
        }

        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }

        return false;
    }

    @Override
    public Spliterator trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77289.html

相关文章

  • 从命令式到响应式(五)

    摘要:输出流只有在所有的输入流都完成以后才能完成,任何一条输入流上的错误都将立即推送到输出流上。如果没有转入输入流,输出流将会立即发出结束通知。返回值以数组形式获取到的每一个输入流的值,或者来自映射函数的值。返回值仅从最新的内部流上取值的流。 接着上一节的操作符继续,让我们直奔主题。 组合类操作符 组合类的操作符可以将不同流数据按一定的规则进行合并,从而获得所需要的完整数据。 combine...

    CoderBear 评论0 收藏0
  • 从命令式到响应式(四)

    摘要:使用的操作符这条从左到右的横线代表经过操作符转换后的输出流。返回值通过判定函数检测的值组成的流。返回值持续发出输入流上的值,直到通知流上发出值为止。 上期介绍过了rxjs中的三大件,Observable,subscription,subject,但是在开发过程我们最常接触到的东西非操作符莫属。比如上期代码中曾出现过的from就是一个操作符。rxjs中的操作符大致上可以分为几类,创建类,...

    jaysun 评论0 收藏0
  • 巧妙复制一个

    摘要:场景实际业务中可能出现重复消费一个可读流的情况,比如在前置过滤器解析请求体,拿到进行相关权限及身份认证认证通过后框架或者后置过滤器再次解析请求体传递给业务上下文。 场景 实际业务中可能出现重复消费一个可读流的情况,比如在前置过滤器解析请求体,拿到body进行相关权限及身份认证;认证通过后框架或者后置过滤器再次解析请求体传递给业务上下文。因此,重复消费同一个流的需求并不奇葩,这类似于js...

    wenzi 评论0 收藏0
  • 探索 RxJS - Core Concept

    摘要:但不同的是,在的遍历调用过程中,如果一个事件还没有触发完毕获取到返回值,就触发了下一个事件,则将忽略返回的值。这样,我们就可以避免异步的返回值因为返回较慢,反而覆盖了之后异步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又称 Reactive Ex...

    Neilyo 评论0 收藏0

发表评论

0条评论

王晗

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<