资讯专栏INFORMATION COLUMN

初探ZeroMQ

Harriet666 / 1183人阅读

摘要:关闭套接字和上下文备注说明如何利用使用首先下载所需的包,解压以后将和文件放到自己电脑中的安装路径中的文件夹下,最后需要将之前解压后的包放在项目的中或者资源下载链接密码项目源码下载链接链接密码

在讲ZeroMQ前先给大家讲一下什么是消息队列。

消息队列简介:

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。其实简单点说,消息队列就是如何使各分载器如何实现负载均衡使得完成分布式目标。

ZeroMQ简介:

ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。ZeroMQ几乎所有的I/O操作都是异步的,主线程不会被阻塞。ZeroMQ会根据用户调用zmq_init函数时传入的接口参数,创建对应数量的I/O Thread。每个I/O Thread都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同操作系统平台使用不同的网络I/O模型(select、poll、epoll、devpoll、kequeue等)。主线程与I/O线程通过Mail Box传递消息来进行通信。Server开始监听或者Client发起连接时,在主线程中创建zmq_connecter或zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程会把zmq_connecter或zmq_listener添加到Poller中用以侦听读/写事件。Server与Client在第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。

ZeroMQ三种模型讲解及实例

【1】Request-Response

由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。该模型主要用于远程调用及任务分配等。Echo服务就是这种经典模型的应用。

下面通过Java实现这一模型:

server port

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Server {

   public static void main(String[] args) throws InterruptedException {
       
       //实现服务器端的上下文及套接字
       Context context = ZMQ.context(1);
       Socket responder = context.socket(ZMQ.REP);
       
       //使服务器端通过tcp协议通信,监听5555端口
       responder.bind("tcp://*:5555");
       while (!Thread.currentThread().isInterrupted()) {
           byte[] request = responder.recv(0);
           System.out.println("Received Hello");
           Thread.sleep(1000);
           String reply = "World";
           responder.send(reply.getBytes(), 0);
       }
       
       //关闭服务器端的上下文及套接字
       responder.close();
       context.close();
   }

}

client port

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Client {

   public static void main(String[] args) {    
       //创立客户端的上下文捷套接字
       Context context = ZMQ.context(1);
       System.out.println("Connecting to hello world server…");
       Socket requester = context.socket(ZMQ.REQ);
       
       //讲客户端绑定在5555端口
       requester.connect("tcp://localhost:5555");
          for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
              String request = "Hello";
              System.out.println("Sending Hello " + requestNbr);
              requester.send(request.getBytes(), 0);
              byte[] reply = requester.recv(0);
              System.out.println("Received " + new String(reply) + " " + requestNbr);
          }      
          //关闭客户端的上下文套接字
          requester.close();
          context.term();
   }

}

【2】Publisher/Subscriber model

发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。

Server Port

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class ZMQ_PUB {

   public static void main(String[] args) throws InterruptedException {
       Context context = ZMQ.context(1);
       Socket publisher = context.socket(ZMQ.PUB);
       publisher.bind("tcp://*:5555");
       Thread.sleep(3000);
       for(int i=0;i<100;i++){
           publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK);
           System.out.println("pub msg " + i);  
           Thread.sleep(1000);  
       }
       context.close();
       publisher.close();
   }

}

Client Port

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class ZMQ_SUB {

    public static void main(String[] args) {
        Context context = ZMQ.context(1);
        Socket subscriber = context.socket(ZMQ.SUB);
        subscriber.connect("tcp://localhost:5555");
         subscriber.subscribe("".getBytes());  
         for (int i=0;i<100;i++) {  
             //Receive a message.       
             String string = new String(subscriber.recv(0));           
             System.out.println("recv 1" + string);  
         }       
         //关闭套接字和上下文  
         subscriber.close();  
         context.term();  
    }
}

【3】push/pull

push port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Push {

    public static void main(String[] args) {
        Context context = ZMQ.context(1);
        Socket push = context.socket(ZMQ.PUSH);
        push.bind("ipc://fjs");
        for (int i = 0; i < 10000000; i++) {  
            push.send("hello".getBytes(), i);  
        }  
        push.close();  
        context.term();
    }
}

pull port
import java.util.concurrent.atomic.AtomicInteger;

import org.zeromq.ZMQ;

public class Pull {

public static void main(String args[]) {  
    final AtomicInteger number = new AtomicInteger(0);  
    for (int i = 0; i < 5; i++) {  
        new Thread(new Runnable(){  
            private int here = 0;  
            public void run() {  
                // TODO Auto-generated method stub  
                ZMQ.Context context = ZMQ.context(1);  
                ZMQ.Socket pull = context.socket(ZMQ.PULL);  
                pull.connect("ipc://fjs");  
                //pull.connect("ipc://fjs");  
                while (true) {  
                    String message = new String(pull.recv());  
                    int now = number.incrementAndGet();  
                    here++;  
                    if (now % 1000000 == 0) {  
                        System.out.println(now + "  here is : " + here);  
                    }  
                }  
            }                
        }).start();               
    }  
}  

}

备注说明:

【1】如何利用Java使用ZeroMQ
首先下载zmq所需的zip包,解压以后将libzmq.dll和jzmq.dll文件放到自己电脑中的jdk安装路径中的bin文件夹下,最后需要将之前解压后的zmq.jar包放在项目的lib中或者

zeromq资源下载:
链接:http://pan.baidu.com/s/1miuvSfQ 密码:ttss

项目源码下载链接:
链接:http://pan.baidu.com/s/1dE5Plr7 密码:vqze

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68009.html

相关文章

  • 一起来学SpringBoot | 第十二篇:初探RabbitMQ消息队列

    摘要:用于控制活动人数,将超过此一定阀值的订单直接丢弃。缓解短时间的高流量压垮应用。目前比较推荐的就是我们手动然后将消费错误的消息转移到其它的消息队列中,做补偿处理消费者该方案是默认的方式不太推荐。 SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相...

    Baoyuan 评论0 收藏0
  • 设备改造——上传结果数据的技术实现

    摘要:设备改造上传结果数据的技术实现一项目需求及分析按照领导的要求,要改造一台仪器,添加点功能,将测量数据上传到服务器。所以选择用提交,的通信可以多线程调度。考虑到新增的上传功能不能影响之前的测量节拍,所以要多线程实现。 **设备改造——上传结果数据的技术实现 一、项目需求及分析 按照领导的要求,要改造一台仪器,添加点功能,将测量数据上传到服务器。仪器测量节拍大概是20s,数据量目前不大,...

    Mr_houzi 评论0 收藏0
  • zeromq的三种简单模式(python实现)

    摘要:的明确目标是成为标准网络协议栈的一部分,之后进入内核。实现端测试消息已发送端正在转发端输出结果已发送已发送已发送正在转发正在转发正在转发测试消息测试消息测试消息 简介 ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间...

    lufficc 评论0 收藏0
  • 关于k8s集群容器日志收集的总结

    摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...

    jeffrey_up 评论0 收藏0

发表评论

0条评论

Harriet666

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<