当前位置:首页 > 服务端 > Kafka的使用

Kafka的使用

2022年11月09日 22:42:16服务端8


参考:

Kafka的使用

一、kafka基本原理

1.1 消息队列的作用

  • 解耦:解除消息生产者和消息消费者的依赖关系
  • 异步:消息生产者发送消息后,可以做其他的事
  • 削峰:缓解流量高峰

1.2 点对点模式

  • 点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。
  • 生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。
  • 点对点模型的的优点:是消费者拉取消息的频率可以由自己控制。
  • 缺点:消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。

1.3 发布订阅模式

  • 生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息
  • 消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题。

1.4 kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力
Kafka的使用 _ JavaClub全栈架构师技术笔记
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster
    Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
    Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
    Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
    Replication: 每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  Message:每一条发送的消息主体。
  Consumer:消费者,即消息的消费方,是消息的出口。
  Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据, 这也是为了提高kafka的吞吐量!
  Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

1.5 工作流程分析

1.5.1 发送数据

producer产生消息,将消息写到leader当中,不会直接将数据写入到follower。写入的流程如下:
Kafka的使用 _ JavaClub全栈架构师技术笔记
注:
消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
Kafka的使用 _ JavaClub全栈架构师技术笔记

  • 分区的目的:
    (1)方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
    (2)提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
  • kafka分发的几个原则:
    (1)partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
    (2) 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
    (3)如果既没指定partition,又没有设置key,则会轮询选出一个partition。
  • ACK应答机制:
    (1)0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
    (2)1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
    (3)all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
    注:如果topic不存在,kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

1.5.2 保存数据

Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

  • Partition结构
    每个topic分为一个或多个Partition,Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
    Kafka的使用 _ JavaClub全栈架构师技术笔记
    如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

  • Message结构
    log文件就实际是存储message的地方,消息主要包含消息体、消息大小、offset、压缩类型……等等。
    (1)offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
    (2)消息大小:消息大小占用4byte,用于描述消息的大小。
    (3)消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

  • 存储策略
    无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
    (1)基于时间,默认配置是168小时(7天)。
    (2)基于大小,默认配置是1073741824。
    需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

1.5.3 消费数据

Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。
Kafka的使用 _ JavaClub全栈架构师技术笔记
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!

  • 数据查找:  
      假如现在需要查找一个offset为368801的message是什么样的过程呢?下图:
    Kafka的使用 _ JavaClub全栈架构师技术笔记
    (1)先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
    (2)打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
    (3)根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
    segment+有序offset+稀疏索引+二分查找+顺序查找

在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中!

二、Kafka的c++使用

  • 生产者
    kafkaproducer.h
#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H
 
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
 
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
  
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
     
public:
     void dr_cb(RdKafka::Message &message) {
     
         std::cout << "Message delivery for (" << message.len() << " bytes): " <<
             message.errstr() << std::endl;
         if (message.key())
             std::cout << "Key: " << *(message.key()) << ";" << std::endl;
     }
 };
class KafkaProducerEventCallBack : public RdKafka::EventCb {
     
public:
     void event_cb(RdKafka::Event &event) {
     
        switch (event.type())
         {
     
         case RdKafka::Event::EVENT_ERROR:
             std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                 event.str() << std::endl;
             if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
             break;
         case RdKafka::Event::EVENT_STATS:
             std::cerr << "\"STATS\": " << event.str() << std::endl;
             break;
         case RdKafka::Event::EVENT_LOG:
             fprintf(stderr, "LOG-%i-%s: %s\n",
                 event.severity(), event.fac().c_str(), event.str().c_str());
             break;
         default:
             std::cerr << "EVENT " << event.type() <<
                 " (" << RdKafka::err2str(event.err()) << "): " <<
                 event.str() << std::endl;
             break;
         }
     }
 };
 class KafkaProducer
 {
     
 public:
     KafkaProducer(const string &brokers, const string &topics, int nPpartition = 0);
     virtual ~KafkaProducer();
     bool Init();
     void Send(const string &msg);
     void Stop();
 private:
     RdKafka::Producer *m_pProducer = NULL;
     RdKafka::Topic *m_pTopic = NULL;
     KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
     KafkaProducerEventCallBack m_producerEventCallBack;
     std::string m_strTopics;
     std::string m_strBroker;
     bool m_bRun = false;
     int m_nPpartition = 0;
 };
#endif // KAFKAPRODUCER_H

kafkaproducer.cpp

#include <iostream>
#include "kafkaproducer.h"  
KafkaProducer::KafkaProducer(const string &brokers, const string &topics, int nPpartition /*= 1*/)
     : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
     
}

KafkaProducer::~KafkaProducer()
{
     
    Stop();
}
  
bool KafkaProducer::Init()
{
     
    string errstr = "";
 
    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 
    /*Set configuration properties,设置broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
     
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);
 
    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
     
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;
    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
     
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducer::Send(const string &msg)
{
     
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,

RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
 
    m_pProducer->poll(0);
  
    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
     
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(1000);
     }
}
 
void KafkaProducer::Stop()
{
     
    delete m_pTopic;
    delete m_pProducer;
}


  
 int main()
{
     
    //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
 
    KafkaProducer* Kafkapr_ = new KafkaProducer("localhost:9092", "test", 0);
    Kafkapr_->Init();
    Kafkapr_->Send("hello world!");
 
    char str_msg[] = "Hello Kafka!";
  
    while (fgets(str_msg, sizeof(str_msg), stdin))
    {
     
        size_t len = strlen(str_msg);
        if (str_msg[len - 1] == '\n')
        {
     
            str_msg[--len] = '\0';
        }
 
        if (strcmp(str_msg, "end") == 0)
        {
     
            break;
        }
 
        Kafkapr_->Send(str_msg);
    }
 
    return 0;
}
  • 消费者
    kafka_comsumer.h
#include <vector>
#include <string>
#include <memory>
#include <getopt.h>
#include <csignal>
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
 
class kafka_consumer_client{
     
public:
    kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1);
    //kafka_consumer_client();
    virtual ~kafka_consumer_client();
 
    bool initClient();
    bool consume(int timeout_ms);    //消费消息
    void finalize();
private:
    void consumer(RdKafka::Message *msg, void *opt);
 
    std::string brokers_;
    std::string topics_;
    std::string groupid_;
 
    int64_t last_offset_ = 0;
    RdKafka::Consumer *kafka_consumer_ = nullptr;   
    RdKafka::Topic    *topic_          = nullptr;
    int64_t           offset_          = RdKafka::Topic::OFFSET_BEGINNING;
    int32_t           partition_       = 0;
};

kafka_comsumer.cpp

#include "kafka_comsumer.h"
 
 
bool run_ = true;
 
static void sigterm (int sig) {
     
  run_ = false;
}
 
kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset):brokers_(brokers), topics_(topics),groupid_(groupid),
offset_(offset){
     
}
 
//kafka_consumer_client::kafka_consumer_client(){}
 
kafka_consumer_client::~kafka_consumer_client(){
     }
 
bool kafka_consumer_client::initClient(){
     
    RdKafka::Conf *conf = nullptr;
    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(!conf){
     
        fprintf(stderr, "RdKafka create global conf failed\n");
        return false;
    }
 
    std::string errstr;
    /*设置broker list*/
    if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
     
        fprintf(stderr, "RdKafka conf set brokerlist failed : %s\n", errstr.c_str());
    }
 
    /*设置consumer group*/
    if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
     
        fprintf(stderr, "RdKafka conf set group.id failed : %s\n", errstr.c_str());
    }
 
    std::string strfetch_num = "10240000";
    /*每次从单个分区中拉取消息的最大尺寸*/
    if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
     
         fprintf(stderr, "RdKafka conf set max.partition failed : %s\n", errstr.c_str());
    }
 
    /*创建kafka consumer实例*/
    kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
    if(!kafka_consumer_){
     
        fprintf(stderr, "failed to ceate consumer\n");
    }
    delete conf;
 
    RdKafka::Conf *tconf = nullptr;
    /*创建kafka topic的配置*/
    tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(!tconf){
     
        fprintf(stderr, "RdKafka create topic conf failed\n");
        return false;
    }
 
    /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
    当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
    ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
    有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
    在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
    消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
    开始位置消费所有消息.*/
    if(tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK){
     
        fprintf(stderr, "RdKafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
    }
 
    topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
    if(!topic_){
     
        fprintf(stderr, "RdKafka create topic failed : %s\n", errstr.c_str());
    }
    delete tconf;
 
    RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
     if (resp != RdKafka::ERR_NO_ERROR){
     
        fprintf(stderr, "failed to start consumer : %s\n", RdKafka::err2str(resp).c_str());
    }
 
    return true;
}
  
void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
     
    switch(message->err()){
     
        case RdKafka::ERR__TIMED_OUT:
            break;
        case RdKafka::ERR_NO_ERROR:
            printf("%.*s\n", 
                static_cast<int>(message->len()),
             static_cast <const char*>(message->payload()));
            last_offset_ = message->offset();
          
             break;
        case RdKafka::ERR__PARTITION_EOF:
            std::cerr << "%% Reached the end of the queue, offset: " << last_offset_ << std::endl;
            break;
        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
            std::cerr << "Consume failed: " << message->errstr() << std::endl;
            run_ = false;
            break;
        default:
            std::cerr << "Consume failed: " << message->errstr() << std::endl;
            run_ = false;
            break;
    }
}
 
bool kafka_consumer_client::consume(int timeout_ms){
     
    RdKafka::Message *msg = nullptr;
    //消费消息
    while(run_){
     
        msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
        consumer(msg, nullptr);
        kafka_consumer_->poll(0);
        delete msg;
    }
 
    kafka_consumer_->stop(topic_, partition_);
    if(topic_){
     
        delete topic_;
        topic_ = nullptr;
    }
    if(kafka_consumer_){
     
        delete kafka_consumer_;
        kafka_consumer_ = nullptr;
    }
 
    /*销毁kafka实例*/
    RdKafka::wait_destroyed(5000);
    return true;
}  

三、Kafka的python使用

  • 安装kafka
pip install kafka-python
  • 简单使用
# test.py

import sys
import time
import json

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFAKA_HOST = "127.0.0.1"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "test123"


class Kafka_producer():
    '''''
    生产模块:根据不同的key,区分消息
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic, key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.key = key
        print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
        bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                kafka_host=self.kafkaHost,
                kafka_port=self.kafkaPort
                )
        print("boot svr:",bootstrap_servers)
        self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
                )

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params,ensure_ascii=False)
            producer = self.producer
            print(parmas_message)
            v = parmas_message.encode('utf-8')
            k = key.encode('utf-8')
            print("send msg:(k,v)",k,v)
            producer.send(self.kafkatopic, key=k, value= v)
            producer.flush()
        except KafkaError as e:
            print (e)

class Kafka_consumer():
    '''
    消费模块: 通过不同groupid消费topic里面的消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                    kafka_host=self.kafkaHost,
                    kafka_port=self.kafkaPort )
                )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
                print("1")
                print(message)
        except KeyboardInterrupt as e:
            print (e)


def main(xtype, group, key):
    '''
    测试consumer和producer
    '''
    if xtype == "p":
        # 生产模块
        producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
        print ("===========> producer:", producer)
        for _id in range(100):
          #  params = '{"msg" : "%s"}' % str(_id)
           params=[{
     "msg0" :_id},{
     "msg1" :_id}]
           producer.sendjsondata(params)
           time.sleep(1)

    if xtype == 'c':
        # 消费模块
        consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
        print ("===========> consumer:", consumer)
        message = consumer.consume_data()
        print('2')
        print(message)
        for msg in message:
            print ('msg---------------->k,v', msg.key,msg.value)
            print ('offset---------------->', msg.offset)

if __name__ == '__main__':
    xtype = sys.argv[1]
    group = sys.argv[2]
    key = sys.argv[3]
    main(xtype, group, key)

  • 启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties

作者:宁静深远
来源链接:https://blog.csdn.net/u012477435/article/details/106834187/

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/69109.html

标签: Kafka
分享给朋友:

“Kafka的使用” 的相关文章

SpringBoot整合消息队列工具kafka

SpringBoot整合消息队列工具kafka

一、前言 之前整理了kafka在windows下的安装过程,也通过shell命令进行了消息产生者和消息消费者的创建及消息发送,所以想到把kafka与最流行的SpringBoot的框架进行整合,与项目结合,进行消息的发送。 二、整合开始 1.SpringBoot工程搭建,此处不多讲,可以...

Kafka 安装和测试

Kafka 安装和测试

1. 简介 kafka (官网地址: http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。 i. 消息的发布(publish)称作produ...

spring-kafka 实战

spring-kafka 实战

spring-kafka实战 1.       kafka介绍   1.1.       主要功能 根据官网的介绍,Apach...

kafka安装步骤

kafka安装步骤

一、kafka介绍 1,kafka简单介绍 kafka是一款分布式、支持分区的、多副本,基于zookeeper协调的分布式消息系统。最大的特性就是可以实时处理大量数据来满足需求。 2,kafka使用场景 1,日志收集:可以用kafka收集各种服务的日志 ,...

Kafka安装之二 在CentOS 7上安装Kafka

Kafka安装之一  Zookeeper Kafka安装之二 在CentOS 7上安装Kafka 一、简介         Kafka是由Apache软件基金会开发的一个开...

centos8 kafka集群安装

##kafka运行涉及zookeeper,kafka和zookeeper都运行在jvm之上。 ###准备三台服务器,ip地址如下: 192.168.137.31 192.168.137.32 192.168.137.33 jdk安装 安...

大聪明教你学kafka | Windows10系统下kafka安装及使用

大聪明教你学kafka | Windows10系统下kafka安装及使用

前言 Apache Kafka 是一款开源的消息系统,在开发各类系统的时候,我们经常会选择使用Kafka来帮助我们削峰、做异步处理、解耦,那么正好借此机会跟大家说说kafka的安装部署、应用场景以及简单的使用。 kafka的安装部署 想...

Kafka中错误:Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.

Kafka中错误:Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.

     错误的描述:   在kafka安装目录下,执行 $ bin/zookeeper-server-start.sh config/zookeeper.properties &    Unrecognized VM op...

kafka监控工具:kafka eagle安装踩坑

kafka监控工具:kafka eagle安装踩坑

1、背景: 本人kafka使用小白,使用kafka produce生产数据的时候,从后台看kafka topic有没有数据很不方便,从网上找了找,发现有个开源工具kafka eagle比较好使。 2、软件准备: 官方网站链接:EFAK 刚开始下载了一个官方...

Ubuntu20.04安装kafka

Ubuntu20.04安装kafka

文章目录 简介 为什么需要消息中间件 Kafka一代 - 消息队列 Kafka二代 - Partition Kafka三代 - Broker集群...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。