QPID通讯代码04

  • TextMessage
  • ListMessage
  • MapMesage
  • StreamMessage

1、MqConsumerStream.java

package com.neohope.qpid.test;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import javax.jms.*;

public class MqConsumerStream {
    public static void main(String[] args) throws Exception {
        Connection connection =
                new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
        MessageConsumer consumer = session.createConsumer(queue);

        System.out.println("Receiving as StreamMessage");
        StreamMessage m = (StreamMessage) consumer.receive();
        System.out.println(m);
        System.out.println("==========================================");
        System.out.println("Printing stream contents:");
        try {
            while (true)
                System.out.println(m.readObject());
        } catch (MessageEOFException e) {
            // DONE
        }
    }
}

2、MqProducerStream.java

package com.neohope.qpid.test;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ListMessage;

import javax.jms.*;
import java.nio.charset.Charset;
import java.util.*;

public class MqProducerStream {
    public static void main(String[] args) throws Exception {
        Connection connection =
                new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
        MessageProducer producer = session.createProducer(queue);

        StreamMessage msg = ((org.apache.qpid.jms.Session) session).createStreamMessage();
        msg.writeInt(99);
        msg.writeString("-=99=-");
        msg.writeDouble(0.99);
        msg.writeBoolean(true);
        msg.writeChar('c');
        msg.writeBytes("QPID你好".getBytes(Charset.forName("UTF-8")));

        producer.send((Message) msg);
        System.out.println("Sent: " + msg);
        connection.close();
    }
}

3、测试

#开启qpid-broker
#运行MqConsumerStream.Main
#运行MqProducerStream.Main

Comments are closed.