Kafka通讯代码01

  • Producer
  • Consumer
  • GroupConsumer

1、MqProducer.java

package com.neohope.kafka.test;

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MqProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "com.neohope.kafka.test.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for(int i=0;i<100;i++) {
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "key"+i, "value"+i);
            producer.send(data);
        }
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "-=END=-", "-=END=-");
        producer.send(data);

        producer.close();
    }
}

2、SimplePartitioner.java

package com.neohope.kafka.test;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }
}

3、测试

#开启zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
#开启kafka-server
bin\windows\kafka-server-start.bat config\server.properties
#开启一个consumer
bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic neoTopic --from-beginning
#运行MqProducer.Main即可

Comments are closed.