nanomsg通信库的pubsub及survey

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

nanomsg通信库的pubsub及surveynanomsg实验——pubsub发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,同时也大致了解了下nanomsg的基础用法。服务端代码如下复制代码#include<stdio.h>#include<stdlib.h>#include<time.h>#include<string.h>#include<unistd.h>#include<nanomsg/nn.h>#include<nanomsg/pubsub.h>voidusage(constchar*name){fprintf(stderr,%s[bindurl]n,name);}intmain(intargc,char**argv){if(argc!=2){usage(argv[0]);exit(-1);}constchar*url=argv[1];intsock=nn_socket(AF_SP,NN_PUB);if(sock<0){fprintf(stderr,nn_socketfailed:%sn,nn_strerror(errno));exit(-1);}if(nn_bind(sock,url)<0){fprintf(stderr,nn_bindfailed:%sn,nn_strerror(errno));exit(-1);}while(1){time_trawtime;structtm*timeinfo;time(&rawtime);timeinfo=localtime(&rawtime);char*text=asctime(timeinfo);inttextLen=strlen(text);text[textLen-1]='';printf(SERVER:PUBLISHINGDATE%sn,text);nn_send(sock,text,textLen,0);sleep(1);}return0;}nanomsg使用非常简单,只要直接includenanomsg/nn.h,即可使用基本API。使用内置的通信模式,需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。pubsubserver,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。和berkeleysockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:*进程内通信(inproc):url格式为inproc://test*进程间同in想(ipc):url格式为ipc:///tmp/test.ipc*tcp通信:url格式为tcp://*:5555github上源码貌似已经支持websocket了。nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeleysocketsapi接口类似。这里直接获取当前时间,然后发出给所有订阅者。客户端代码如下复制代码#include<stdio.h>#include<stdlib.h>#include<nanomsg/nn.h>#include<nanomsg/pubsub.h>intmain(intargc,char**argv){if(argc!=3){fprintf(stderr,usage:%sNAMEBIND_URLn,argv[0]);exit(-1);}constchar*name=argv[1];constchar*url=argv[2];intsock=nn_socket(AF_SP,NN_SUB);if(sock<0){fprintf(stderr,failtocreatesocket:%sn,nn_strerror(errno));exit(-1);}if(nn_setsockopt(sock,NN_SUB,NN_SUB_SUBSCRIBE,,0)<0){fprintf(stderr,failtosetsorketopts:%sn,nn_strerror(errno));exit(-1);}if(nn_connect(sock,url)<0){fprintf(stderr,failtoconnectto%s:%sn,url,nn_strerror(errno));exit(-1);}while(1){char*buf=NULL;intbytes=nn_recv(sock,&buf,NN_MSG,0);printf(CLIENT(%s):RECEIVED%sn,name,buf);nn_freemsg(buf);}nn_shutdown(sock,0);return0;}客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。测试首先是编译,和普通c程序相同,只是增加链接nanomsg。gcc-opubserverpubserver.c-lnanomsggcc-opubclientpubclient.c-lnanomsg为了方便测试,写了一个简单的shell脚本:代码如下复制代码#!/bin/bashBASE=$(cd$(dirname$0)&&pwd)PUB=$BASE/pubserverSUB=$BASE/pubclientURL=tcp://127.0.0.1:1234echostartpubservertobindtcp:$URL$PUBtcp://127.0.0.1:1234&echostarttostartpubclientfor((i=0;i<10;i++))doechostartclient$i$SUBclient$i$URL&sleep1donesleep20echokillallprocessandexitforpidin`jobs-p`doechokill$pidkill$piddonewait脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。脚本的输出:代码如下复制代码startpubservertobindtcp:tcp://127.0.0.1:1234starttostartpubclientstartclient0SERVER:PUBLISHINGDATETueFeb1715:12:112015startclient1SERVER:PUBLISHINGDATETueFeb1715:12:122015CLIENT(client0):RECEIVEDTueFeb1715:12:122015CLIENT(client1):RECEIVEDTueFeb1715:12:122015startclient2SERVER:PUBLISHINGDATETueFeb1715:12:132015CLIENT(client0):RECEIVEDTueFeb1715:12:132015CLIENT(client1):RECEIVEDTueFeb1715:12:132015CLIENT(client2):RECEIVEDTueFeb1715:12:132015startclient3SERVER:PUBLISHINGDATETueFeb1715:12:142015CLIENT(client0):RECEIVEDTueFeb1715:12:142015CLIENT(client1):RECEIVEDTueFeb1715:12:142015CLIENT(client2):RECEIVEDTueFeb1715:12:142015...SERVER:PUBLISHINGDATETueFeb1715:12:412015CLIENT(client0):RECEIVEDTueFeb1715:12:412015CLIENT(client1):RECEIVEDTueFeb1715:12:412015CLIENT(client2):RECEIVEDTueFeb1715:12:412015CLIENT(client3):RECEIVEDTueFeb1715:12:412015CLIENT(client4):RECEIVEDTueFeb1715:12:412015CLIENT(client5):RECEIVEDTueFeb1715:12:412015CLIENT(client6):RECEIVEDTueFeb1715:12:412015CLIENT(client7):RECEIVEDTueFeb1715:12:412015CLIENT(client8):RECEIVEDTueFeb1715:12:412015CLIENT(client9):RECEIVEDTueFeb1715:12:412015killallprocessandexit可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。nanomsg实验——surveysurvey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,可以用来做服务发现、分布式事物等分布式询问。客户端客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应(例子中是发送服务端当前时间)代码如下复制代码#include<cstdio>#include<cstdlib>#include<cstring>#include<ctime>#include<nanomsg/nn.h>#include<nanomsg/survey.h>usingnamespacestd;intmain(intargc,constchar**argv){if(argc!=3){fprintf(stderr,usage:%sNAMEURLn,argv[0]);exit(-1);}constchar*name=argv[1];constchar*url=argv[2];intsock=nn_socket(AF_SP,NN_RESPONDENT);if(sock<0){fprintf(stderr,nn_socketfail:%sn,nn_strerror(errno));exit(-1);}if(nn_connect(sock,url)<0){fprintf(stderr,nn_connectfail:%sn,nn_strerror(errno));exit(-1);}while(1){char*buf=NULL;intbytes=nn_recv(sock,&buf,NN_MSG,0);if(bytes>0){printf(CLIENT(%s):RECEIVED%sSURVEYREQUESTn,name,buf);nn_freemsg

1 / 15
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功