close

Windows

https://blog.yowko.com/2017/03/windows-os-apache-kafka.html

https://blog.yowko.com/2017/05/kafka-client-produce-consume.html

 

Linux

 

 

tar -xzf kafka 2.11-0.10.1.0.tgz

bin/zookeeper-server-start.sh config/zookepper.properties

bin/kafka-server-start.sh config/zookepper.properties

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic --partitions 2 --replication-factor 1

bin/kafka-console-producer.sh --broker-list localhost:9092 --create --topic MyFirstTopic

message test

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyFirstTopic


Kafka是使用scala语言开发,类似于RabbitMQ的分布式消息系统。
Kafka是分布式的,它通过可以多个broker组成一个集群。
Kafka依赖于Zookeeper。

Topic(话题) 特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类或种子(Feed)名。


Producer(生产者) 能够发布消息到话题的任何对象。


Broker(代理) 或称Kafka集群。用于保存消息的服务器。


Consumer(消费者) 可以订阅一个或多个话题,并从Broker拉取数据,从而消费这些已发布的消息。

https://www.jianshu.com/p/425a7d8735e2

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {

public static void testProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.1.1:9092");
props.put("serializer.class", StringEncoder.class.getName());
//props.put("partitioner.class", );
props.put("request.required.arks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

String msg = new Date() + " - hello world : 测试 " ;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg);
producer.send(data);
producer.close();
System.out.println("--> producer sended: " + msg);
}

public static void main(String[] args) {
testProducer();
}
}


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {

private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public KafkaConsumer(String a_zookeeper, String a_groupId, String a_topic) {
this.consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,
a_groupId));

this.topic = a_topic;
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "1000");
props.put("zookeeper.sync.time.ms", "1000");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");

return new ConsumerConfig(props);
}

public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

System.out.println("streams.size = " + streams.size());

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}

public static void main(String[] args) {

String zooKeeper = "192.168.212.100:2181";
String groupId = "group1";
String topic = "test";

int threads = 3;

KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topic);

example.run(threads);

}

public class ConsumerTest implements Runnable {

private KafkaStream m_stream;
private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}

public void run() {
System.out.println("calling ConsumerTest.run()");
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

while (it.hasNext()) {
System.out.println("--> consumer Thread " + m_threadNumber + ": "
+ new String(it.next().message()));
}

System.out.println("Shutting down Thread: " + m_threadNumber);
}
}

}

 


https://tw.saowen.com/a/0b58f28f46632fd40a482231b9cb6de7dc38346db7f2d1d848d85ad8e4f908cc

https://itw01.com/GRJSEGU.html

 

 

 

 

KafkaResult  

 

//指定 kafka 所在位置及 port
string brokerList = "localhost:9092";
//指定發送的 topic
string topicName = "milestest";

//將 kafka 位置設定給 config
var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };

//將 config 餵給 producer
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit.");

string text;
//持續等待 user 輸入
while ((text = Console.ReadLine()) != "q")
{
// 非同步發送訊息至 broker
var deliveryReport = producer.ProduceAsync(topicName, null, text);
//發送成功後輸出訊息
deliveryReport.ContinueWith(task =>
{
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}, Message: {text}");
});
}

//將 producer request 保留至 disk,確保資料不會遺失
producer.Flush(10000);
}

 

 

 

//指定 kafka 所在位置及 port
string brokerList = "localhost:9092";
//指定要監聽的 topic,可以監聽多個
var topics = new List<string>() { "milestest" };
//這個 group.id 沒什麼作用,可以隨便給,將 kafka 位置設定給 config
var config = new Dictionary<string, object>
{
{ "group.id", "miles" },
{ "bootstrap.servers", brokerList }
};
//將 config 餵給 consumer
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics.First(), 0, 0) });
//持續監聽
while (true)
{
Message<Null, string> msg;
//接受訊息
if (consumer.Consume(out msg, 10000))
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}

 

 

 

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 miles0722 的頭像
    miles0722

    文言文

    miles0722 發表在 痞客邦 留言(1) 人氣()