成都创新互联网站制作重庆分公司

如何进行Kafka1.0.0d代码示例分析

这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

创新互联建站一直在为企业提供服务,多年的磨炼,使我们在创意设计,网络营销推广到技术研发拥有了开发经验。我们擅长倾听企业需求,挖掘用户对产品需求服务价值,为企业制作有用的创意设计体验。核心团队拥有超过十载以上行业经验,涵盖创意,策化,开发等专业领域,公司涉及领域有基础互联网服务简阳服务器托管成都app软件开发公司、手机移动建站、网页设计、网络整合营销。

package kafka.demo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 *  

Description: kafka 1.0.0

   * @author guangshihao  * @date 2018年9月19日   *  */ public class KafkaProduderDemo { public static void main(String[] args) { Map props = new HashMap<>(); /*          * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1  * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。  * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。  * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。  * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。  * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。  * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。  * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失  */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");         /*  * kafka broker对应的主机,格式为host1:port1,host2:port2  */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord(topic,line ); producer.send(record); } producer.close(); } } //--------------------------------------------------------------------------------------------------------------------------- package kafka.demo; import java.util.Arrays; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; public class KafkaConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); props.put("group.id", "group_test7"); //配置topic的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //自动同步offset         props.put("enable.auto.commit","true");         //自动同步offset的时间间隔         props.put("auto.commit.intervals.ms", "2000");         //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest         props.put("auto.offset.reset", "earliest ");          KafkaConsumer consumer = new KafkaConsumer<>(props);  consumer.subscribe(Arrays.asList("test7")); //consumer.beginningOffsets(""); try { while(true) { ConsumerRecords records = consumer.poll(1000); for(ConsumerRecord record: records) { System.out.println("partition:"+record.partition()+"  "+record.value()); } //consumer.commitSync(); if((new Random(10)).nextInt()>5) { consumer.wakeup(); } } }catch(WakeupException e) { e.printStackTrace(); }finally { consumer.close(); } } }

关于如何进行Kafka 1.0.0 d代码示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


网站栏目:如何进行Kafka1.0.0d代码示例分析
文章URL:http://cxhlcq.cn/article/pccgds.html

其他资讯

在线咨询

微信咨询

电话咨询

028-86922220(工作日)

18980820575(7×24)

提交需求

返回顶部