今日总结 – CKafka系列学习文章 – 云上消息队列它香不香?(十七)

一、先买买买,一个

1、供上购买链接:https://buy.cloud.tencent.com/ckafka?rid=1

2、先创建一个Ckafka实例

不同规格实例,有不同数量的topic和partition

这里可以选择自己业务所在私有网络,不过Ckafka提供多种接入方式,可以满足不同环境使用Ckafka

创建好的ckafka实例就是这样的,要给它取个实例名称,跟业务关联

这里的接多方式,就是不同类型的接入方式,这里创建一个公网域名接入,用来开发测试,生产业务建议使用内网

创建一个topic

创建一个用户名,用户公网域名用户名认证访问

配置ACL策略,限制那个用户可以访问这个topic

二、使用IntelliJ IDEA搭建Maven工程

好用的工具就自己下载吧:

https://www.jetbrains.com/products.html#type=ide
安装JDK、Maven、nexus我就不教了,看前面的教程

攻城狮建工程

选择maven工程

取个程序名的名字

精髓来了,pom.xml,仔细研究吧

三、生产者来了:Producer.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("session.timeout.ms", 30000);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 232840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"ckafka-in5xxxxf#Jensen\" password=\"xxxx\";");
        Producer.asynSendRecord(props);



    }
    //异步发送消息,是不是有点浪漫
    public static void asynSendRecord(Properties props){
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
        ProducerRecord<String,String> record=new ProducerRecord<String,String>("jasen", Integer.toString(i), Integer.toString(i));
        System.out.println("record:"+record.value());
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    System.out.println("消息发送:"+"offset:"+recordMetadata.offset()+" timestamp:"+recordMetadata.timestamp()+" topic:"+recordMetadata.topic()+" partition:"+recordMetadata.partition());
                    System.out.println("消息发送成功");
                } else {
                    System.out.println(String.format("消息发送失败: %s", e.getMessage()));
                }
            }
        });
        }
        producer.close();
    }
}

结果来了:

C:\Program Files\Java\jdk1.8.0_161\bin\java.exe" "-javaagent:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=51023:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\programming\kafka-demo2\target\test-classes;C:\Users\Jensen\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Jensen\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Jensen\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Jensen\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Jensen\.m2\repository\org\slf4j\slf4j-simple\1.7.2\slf4j-simple-1.7.2.jar" Producer
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 232840
	bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 10
	max.block.ms = 3000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = PLAIN
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'session.timeout.ms' was supplied but isn't a known config.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
record:0
record:1
record:2
record:3
record:4
record:5
record:6
record:7
record:8
record:9
record:10
record:11
。。。。。
record:90
record:91
record:92
record:93
record:94
record:95
record:96
record:97
record:98
record:99
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
消息发送:offset:1382 timestamp:1592221545987 topic:jasen partition:0
消息发送成功
消息发送:offset:1383 timestamp:1592221546002 topic:jasen partition:0
消息发送成功
消息发送:offset:1384 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1385 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1386 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1387 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1388 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1389 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1390 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1391 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1392 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1393 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1394 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1395 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1396 timestamp:1592221546005 topic:jasen partition:0
消息发送成功
消息发送:offset:1397 timestamp:1592221546005 topic:jasen partition:0
消息发送成功
......
消息发送:offset:1476 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1477 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1478 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1479 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1480 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1481 timestamp:1592221546014 topic:jasen partition:0
消息发送成功

Process finished with exit code 0

四、尽情的消费吧–Consumer.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        String topicName = "jasen";
        String groupId = "test-group";

        Properties props = new Properties();
        props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002");
        props.put("group.id", groupId);
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("session.timeout.ms", 30000);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"ckafka-in5yxxxx#Jensen\" password=\"xxxx\";");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 订阅主题
        consumer.subscribe(Collections.singletonList(topicName));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

成不成功就看你的:

"C:\Program Files\Java\jdk1.8.0_161\bin\java.exe" "-javaagent:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=51176:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\programming\kafka-demo2\target\test-classes;C:\Users\Jensen\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Jensen\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Jensen\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Jensen\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Jensen\.m2\repository\org\slf4j\slf4j-simple\1.7.2\slf4j-simple-1.7.2.jar" Consumer
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = test-group
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = PLAIN
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 30000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Discovered group coordinator 111.230.124.164:18006 (id: 2147472928 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions [jasen-0]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=test-group] Fetch offset 942 is out of range for partition jasen-0, resetting offset
offset = 1382, key = 0, value = 0
offset = 1383, key = 1, value = 1
offset = 1384, key = 2, value = 2
offset = 1385, key = 3, value = 3
offset = 1386, key = 4, value = 4
offset = 1387, key = 5, value = 5
offset = 1388, key = 6, value = 6
offset = 1389, key = 7, value = 7
offset = 1390, key = 8, value = 8
offset = 1391, key = 9, value = 9
offset = 1392, key = 10, value = 10
offset = 1393, key = 11, value = 11
offset = 1394, key = 12, value = 12
offset = 1395, key = 13, value = 13
offset = 1396, key = 14, value = 14
offset = 1397, key = 15, value = 15
offset = 1398, key = 16, value = 16
。。。。。就是一百条数据,绝对没少
offset = 1464, key = 82, value = 82
offset = 1465, key = 83, value = 83
offset = 1466, key = 84, value = 84
offset = 1467, key = 85, value = 85
offset = 1468, key = 86, value = 86
offset = 1469, key = 87, value = 87
offset = 1470, key = 88, value = 88
offset = 1471, key = 89, value = 89
offset = 1472, key = 90, value = 90
offset = 1473, key = 91, value = 91
offset = 1474, key = 92, value = 92
offset = 1475, key = 93, value = 93
offset = 1476, key = 94, value = 94
offset = 1477, key = 95, value = 95
offset = 1478, key = 96, value = 96
offset = 1479, key = 97, value = 97
offset = 1480, key = 98, value = 98
offset = 1481, key = 99, value = 99

五、总结一下:

香不香还得用了才知道吧,不信你自己搭建一 个kafka集群吧,zookeeper集群也不能少,万一要扩容怎么办?

正文完