当前位置: 首页 >服务端 > kafka restful api功能介绍与使用

kafka restful api功能介绍与使用

  • 前述

采用confluent kafka-rest proxy实现kafka restful service时候(具体参考上一篇笔记),通过http协议数据传输,需要注意的是采用了base64编码(或者称之为加密),如果消息再post之前不采用base64处理将会出现:服务端消息乱码、程序报错等,因此正常的处理流程是:
1.先对待post的消息做UTF-8统一处理
2.采用base64编码包处理消息

s='Kafka,hi'ad="hi,kafka,i'm xnchall"aa=ad.encode()#UTF-8统一处理print(aa)b64=base64.b64encode(ad.encode())#base64编码包统一处理
  • 利用kafka-rest生产消息
POST /topics/(string:topic_name)

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
data={"records":[{"key":"a2V5","value":"Y29uZmx1ZW50"},{"value":"a2Fma2E=","partition":1},{"value":"bG9ncw=="}]}data1={"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}header={"Content-Type":"application/vnd.kafka.v1+json"}r=requests.post(url=url,json=data,headers=header)r=requests.post(url=url,json=data1,headers=header)
View Code
  • 向指定分区生产消息:Produce messages to one partition of the topic
POST /topics/(string:topic_name)/partitions/(int:partition_id)
kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
ad="hi kafka,i'm xnchall"url11="http://192.168.160.101:8082/topics/test_kfk_lk/partitions/1"data2={"records":[{"value":(base64.b64encode(ad.encode())).decode()}]}print(data2)r2=requests.post(url=url11,json=data2,headers=header)print(r2)print(r2.content)
View Code
  • 创建或者注册消费实例:Create a new consumer instance in the consumer group
POST /consumers/(string:group_name)

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
url3="http://192.168.160.101:8082/consumers/my_group"data3={"id":"my_consumer1","format":"binary","auto.offset.reset":"smallest","auto.commit.enable":"false"}r3=requests.post(url=url3,json=data3,headers=header)
View Code
  • 提交偏移  Commit offsets for the consumer
POST /consumers/(string:group_name)/instances/(string:instance)/offsets

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
url4="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/offsets"r4=requests.post(url=url4,headers=header)
View Code
  • 消费消息
GET /consumers/(string:group_name)/instances/(string:instance)/topics/(string:topic_name)

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
url_get2="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/topics/test_kfk_lk"rr2=requests.get(url=url_get2,headers=header)#,params={"timeout":3000000}print(rr2)print(rr2.content)print(rr2.text)
View Code
  • 删除消费者实例 Destroy the consumer instance
DELETE /consumers/(string:group_name)/instances/(string:instance)
kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
#url_del="http://192.168.160.101:8082/consumers/test_kfk_lk/instances/my_consumer"#d1=requests.delete(url_del)#删除消费者实例#print(d1)
View Code
  • 获取指定分区、偏移消息: Consume messages from one partition of the topic.(api V2)
GET /topics/(string:topic_name)/partitions/(int:partition_id)/messages?offset=(int)[&count=(int)]

kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记

Fetch Response v1 only contains message format v0.Fetch Response v2 might either contain message format v0 or message format v1.Possible Error Codes* OFFSET_OUT_OF_RANGE (1)* UNKNOWN_TOPIC_OR_PARTITION (3)* NOT_LEADER_FOR_PARTITION (6)* REPLICA_NOT_AVAILABLE (9)* UNKNOWN (-1)
kafka restful api功能介绍与使用 _ JavaClub全栈架构师技术笔记
url_p="http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages"rst=requests.get(url_p,headers=header,params={"offset":3,"count":2})#,"count":2})print(rst)print(len(rst.json()))if(rst.status_code!=500):For itr in rst.json():print(base64.b64decode(itr['value']).decode())print(rst.url)#http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages?offset=3&count=2
View Code
  • 获取当前订阅的topic列表.(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/subscription
  • 获取手工指定的消费者的分区(api V2)
GET /consumers/(string:group_name)/instances/(string:instance)/assignments
GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1Host: proxy-instance.kafkaproxy.example.comAccept: application/vnd.kafka.v2+jsonHTTP/1.1 200 OKContent-Type: application/vnd.kafka.v2+json{  "partitions": [{  "topic": "test",  "partition": 0},{  "topic": "test",  "partition": 1}]}
  • 覆盖消费者即将消费的消息的偏移量(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/positions
POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1Host: proxy-instance.kafkaproxy.example.comContent-Type: application/vnd.kafka.v2+json{  "offsets": [{  "topic": "test",  "partition": 0,  "offset": 20},{  "topic": "test",  "partition": 1,  "offset": 30}  ]}
  • 获取给定topic的分区的最后偏移
POST /consumers/(string:group_name)/instances/(string:instance)/positions/end
POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1Host: proxy-instance.kafkaproxy.example.comContent-Type: application/vnd.kafka.v2+json{  "partitions": [{  "topic": "test",  "partition": 0},{  "topic": "test",  "partition": 1}]}
  • 使用分配和订阅api消费topic或者分区数据
GET /consumers/(string:group_name)/instances/(string:instance)/records
GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1Host: proxy-instance.kafkaproxy.example.comAccept: application/vnd.kafka.binary.v2+jsonExample binary response:HTTP/1.1 200 OKContent-Type: application/vnd.kafka.binary.v2+json[  {"topic": "test","key": "a2V5","value": "Y29uZmx1ZW50","partition": 1,"offset": 100,  },  {"topic": "test","key": "a2V5","value": "a2Fma2E=","partition": 2,"offset": 101,  }]

  

 

作者:xnchall
来源链接:https://www.cnblogs.com/xnchll/p/9618432.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/server/112595.html

标签:RESTful
分享给朋友:

“kafka restful api功能介绍与使用” 的相关文章