摘要:正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量,也许性能会有提高。
正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。
Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。
</>复制代码
public class StreamForker {
private final Stream stream;
private final Map
public StreamForker(Stream stream) {
this.stream = stream;
}
public StreamForker fork(Object key, Function, ?> f) {
forks.put(key, f);
return this;
}
public Results getResults() {
ForkingStreamConsumer consumer = build();
try {
stream.sequential().forEach(consumer);
} finally {
consumer.finish();
}
return consumer;
}
private ForkingStreamConsumer build() {
List> queues = new ArrayList<>();
Map
(map, e) -> {
map.put(e.getKey(), getOperationResult(queues, e.getValue()));
return map;
}, (m1, m2) -> {
m1.putAll(m2);
return m1;
});
return new ForkingStreamConsumer<>(queues, actions);
}
private Future getOperationResult(List> queues, Function, ?> f) {
BlockingQueue queue = new LinkedBlockingQueue<>();
queues.add(queue);
Spliterator spliterator = new BlockingQueueSpliterator<>(queue);
Stream source = StreamSupport.stream(spliterator, false);
return CompletableFuture.supplyAsync(() -> f.apply(source));
}
}
accept方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制
</>复制代码
class ForkingStreamConsumer implements Consumer, Results {
static final Object END_OF_STREAM = new Object();
private final List> queues;
private final Map> actions;
public ForkingStreamConsumer(List> queues, Map> actions) {
this.queues = queues;
this.actions = actions;
}
@Override
public void accept(T t) {
queues.forEach(q -> q.add(t));
}
@SuppressWarnings("unchecked")
void finish() {
accept((T) END_OF_STREAM);
}
@SuppressWarnings("unchecked")
@Override
public R get(Object key) {
try {
return ((Future) actions.get(key)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
此处重写了tryAdvance接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中数据结束的标示
</>复制代码
class BlockingQueueSpliterator implements Spliterator {
private final BlockingQueue q;
BlockingQueueSpliterator(BlockingQueue q) {
this.q = q;
}
@Override
public boolean tryAdvance(Consumer action) {
T t;
while (true) {
try {
t = q.take();
break;
} catch (InterruptedException e) {
}
}
if (t != ForkingStreamConsumer.END_OF_STREAM) {
action.accept(t);
return true;
}
return false;
}
@Override
public Spliterator trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
@Override
public int characteristics() {
return 0;
}
}
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77289.html
摘要:输出流只有在所有的输入流都完成以后才能完成,任何一条输入流上的错误都将立即推送到输出流上。如果没有转入输入流,输出流将会立即发出结束通知。返回值以数组形式获取到的每一个输入流的值,或者来自映射函数的值。返回值仅从最新的内部流上取值的流。 接着上一节的操作符继续,让我们直奔主题。 组合类操作符 组合类的操作符可以将不同流数据按一定的规则进行合并,从而获得所需要的完整数据。 combine...
摘要:使用的操作符这条从左到右的横线代表经过操作符转换后的输出流。返回值通过判定函数检测的值组成的流。返回值持续发出输入流上的值,直到通知流上发出值为止。 上期介绍过了rxjs中的三大件,Observable,subscription,subject,但是在开发过程我们最常接触到的东西非操作符莫属。比如上期代码中曾出现过的from就是一个操作符。rxjs中的操作符大致上可以分为几类,创建类,...
摘要:但不同的是,在的遍历调用过程中,如果一个事件还没有触发完毕获取到返回值,就触发了下一个事件,则将忽略返回的值。这样,我们就可以避免异步的返回值因为返回较慢,反而覆盖了之后异步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又称 Reactive Ex...
阅读 2163·2021-09-29 09:35
阅读 2051·2019-08-30 14:15
阅读 3045·2019-08-30 10:56
阅读 1031·2019-08-29 16:59
阅读 650·2019-08-29 14:04
阅读 1403·2019-08-29 12:30
阅读 1103·2019-08-28 18:19
阅读 574·2019-08-26 11:51