pipelinedb_on_kafka

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

一、pipelinedb安装安装服务:rpm-ivh--prefix=/opt/pipelinedbpipelinedb-0.9.6-centos6-x86_64.rpm初始化一个数据库的路径:cd/opt/pipelinedb/bin./pipeline-init-D/data/pipelinedb/kafka启动数据库服务./pipeline-ctl-D/data/pipelinedb/kafka-llogfilestart停止./pipeline-ctl-D/data/pipelinedb/kafkastop连接到一个已经运行的服务./pipelinepipeline或者使用psql./psql-p5432-hlocalhostpipelinedebug模式pipeline-ctl-d-D/data/pipelinedb/kafkastartpipeline-ctl--debug-D/data/pipelinedb/kafkastart配置:在/data/pipelinedb/kafka目录下配置pipelinedb.conf,添加监听listen_addresses='*'配置pg_hba.confhostallall0.0.0.0/0md5二、pipeline集成kafka安装C++编译环境yuminstallgcc-c++安装librdkafkagitclone-b0.9.1~/librdkafkacd~/librdkafka./configure--prefix=/usrmakemakeinstall安装pipeline_kafka从git上下载源码包并解压到/usr/local路径下./configuremakemakeinstall可以看到pipeline_kafka.control被添加到了/opt/pipelinedb/share/pipelinedb/extension/中--------------------------------------------------------------------------在pipelineDB的配置文件pipelinedb.conf中添加shared_preload_libraries='pipeline_kafka'打开psql创建extensionCREATEEXTENSIONpipeline_kafka;添加broker,让pipeline知道SELECTpipeline_kafka.add_broker('localhost:9092');创建一个流CREATESTREAMlogs_stream(payloadjson);创建一个视图CREATECONTINUOUSVIEWmessage_countASSELECTCOUNT(*)FROMlogs_stream;创建一个kafka的topic./kafka-topics.sh--create--zookeepermaster:2181,node1:2181,node3:2181--replication-factor1--partitions3--topiclogs_topic从kafka中查询SELECTpipeline_kafka.consume_begin('logs_topic','logs_stream',format:='json');启动一个kafka生产者:./kafka-topics.sh--list--zookeeperlocalhost:2181放入json类型数据:{ts:2015-09-14T10:30:21-07:00,status:200,request_method:GET,user_agent:Mozilla/5.0(pc-x86_64-linux-gnu)Siege/3.0.5,url:/page38/path7?user=24746,latency:0.001,user:24746}{ts:2015-09-14T10:30:21-07:00,status:200,request_method:GET,user_agent:Mozilla/5.0(pc-x86_64-linux-gnu)Siege/3.0.5,url:/page66/path7?user=8846,latency:0.001,user:8846}{ts:2015-09-14T10:30:21-07:00,status:200,request_method:GET,user_agent:Mozilla/5.0(pc-x86_64-linux-gnu)Siege/3.0.5,url:/page33/path3?user=6006,latency:0.001,user:6006}{ts:2015-09-14T10:30:21-07:00,status:200,request_method:GET,user_agent:Mozilla/5.0(pc-x86_64-linux-gnu)Siege/3.0.5,url:/page85/path2?user=28043,latency:0.000,user:28043}查询SELECT*FROMmessage_count;消费接口:--指定文本格式和分隔符SELECTpipeline_kafka.consume_begin('kafka_topic','topic_stream',format:='text',delimiter:=E'\t');--指定并行度SELECTpipeline_kafka.consume_begin('kafka_topic','topic_stream',parallelism:=4);--停止SELECTpipeline_kafka.consume_end('kafka_topic','topic_stream');pipeline_kafka.consume_begin(topictext,streamtext,format:=‘text’,delimiter:=E’\t’,quote:=NULL,escape:=NULL,batchsize:=1000,maxbytes:=32000000,parallelism:=1,start_offset:=NULL)生产接口:SELECTpipeline_kafka.produce_message('kafka_topic','helloworld!');SELECTpipeline_kafka.produce_message('kafka_topic','helloworld!',partition:=2);创建一个触发器CREATETRIGGERtgAFTERUPDATEONtFOREACHROWWHEN(x=1)EXECUTEPROCEDUREpipeline_kafka.emit_tuple('kafka_topic');pipelineDB支持JDBC连接驱动URL:jdbc:postgresql://+HOST+:5432/+DATABASE;

1 / 4
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功