资讯专栏INFORMATION COLUMN

高性能的生产者-消费者新选择:无锁的缓存框架 Disruptor

姘搁『 / 1824人阅读

    
        com.lmax
        disruptor
        3.3.4
    

public class PCData {

private long value;

public long getValue() {
    return value;
}

public void setValue(long value) {
    this.value = value;
}

}

public class PCDataFactory implements EventFactory {

@Override
public PCData newInstance() {
    return new PCData();
}

}

public class Producer {

private final RingBuffer ringBuffer;

public Producer(RingBuffer ringBuffer) {
    this.ringBuffer = ringBuffer;
}

public void pushData(ByteBuffer byteBuffer){
    long sequence = ringBuffer.next();
    try{
        PCData event = ringBuffer.get(sequence);
        event.setValue(byteBuffer.getLong(0));
    }finally {
        ringBuffer.publish(sequence);
    }
}

}

public class Consumer implements WorkHandler {

@Override
public void onEvent(PCData pcData) throws Exception {
    System.out.println(Thread.currentThread().getName()+"Event:--"+pcData.getValue()*pcData.getValue()+"--");
}

}

public class App {

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    PCDataFactory factory = new PCDataFactory();

    int bufferSize = 1024;
    Disruptor dataDisruptor = new Disruptor(factory,bufferSize,executorService,
            ProducerType.MULTI,new BlockingWaitStrategy());
    dataDisruptor.handleEventsWithWorkerPool(
            new Consumer(),
            new Consumer(),
            new Consumer(),
            new Consumer()
    );

    dataDisruptor.start();

    RingBuffer ringBuffer = dataDisruptor.getRingBuffer();
    Producer producer = new Producer(ringBuffer);
    ByteBuffer byteBuffer = ByteBuffer.allocate(8);
    for(long l=0;true;l++){
        byteBuffer.putLong(0,l);
        producer.pushData(byteBuffer);
        Thread.sleep(100);
        System.out.println("add data"+l);
    }
}

}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74762.html

相关文章

  • Spring整合Disruptor3

    摘要:我们知道是一个队列,生产者往队列里发布一项事件或称之为消息也可以时,消费者能获得通知如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。实战本文先不具体去阐述的工作具体原理,只是简单地将与其整合。 什么是Disruptor 从功能上来看,Disruptor 是实现了队列的功能,而且是一个有界队列。那么它的应用场景自然就是生产者-消费者模型的应用场合了。可以拿 JDK 的 Block...

    khs1994 评论0 收藏0
  • 性能SPSC无锁队列设计之路

    摘要:当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。 序 本文整理了Single Producer/Consumer lock free Queue step by step这篇文章里头关于高性能的SPSC无锁队列使用遵循的几个原则: 单写原则 使用lazySet替代volatile set 使用位运算替代取模运算 避免伪共享 减少缓存...

    evin2016 评论0 收藏0
  • JAVA多线程使用场景和注意事项

    摘要:一个是线程退出条件,一个是异常处理情况。很方便,每个线程一份数据,也很安全,但要注意内存泄露。线程池参数包最常用的就是线程池,平常工作建议直接使用线程池,类就可以降低优先级了。在线程池的构造参数中,我们使用的队列,一定要注意其特性和边界。 我曾经对自己的小弟说,如果你实在搞不清楚什么时候用HashMap,什么时候用ConcurrentHashMap,那么就用后者,你的代码bug会很少。...

    Joyven 评论0 收藏0
  • 追踪解析 Disruptor 源码

    摘要:分段策略尝试自旋此,然后调用次,如果经过这两百次的操作还未获取到任务,就会尝试阶段性挂起自身线程。 零 前期准备 0 FBI WARNING 文章异常啰嗦且绕弯。 1 版本 Disruptor 版本 : Disruptor 3.4.2 IDE : idea 2018.3 JDK 版本 : OpenJDK 11.0.1 2 Disruptor 简介 高性能线程间消息队列框架 Disrup...

    LiangJ 评论0 收藏0
  • 一次生产 CPU 100% 排查优化实践

    摘要:发现这是的一个堆栈,前段时间正好解决过一个由于队列引起的一次强如也发生内存溢出没想到又来一出。因此初步判断为大量线程执行函数之后互相竞争导致使用率增高,而通过对堆栈发现是和使用有关。 showImg(https://segmentfault.com/img/remote/1460000017395756?w=1816&h=1080); 前言 到了年底果然都不太平,最近又收到了运维报警:...

    roundstones 评论0 收藏0

发表评论

0条评论

姘搁『

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<