forked from souyang/ActiveMQ-Demo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReceiver.java
More file actions
104 lines (98 loc) · 3.08 KB
/
Receiver.java
File metadata and controls
104 lines (98 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.activeMQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import java.io.IOException;
public class Receiver implements MessageListener {
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private String currentQueueName;
public static void main(String[] args) throws JMSException, IOException {
Receiver receiver = new Receiver("TESTQueue");
receiver.run();
}
public Receiver(String queueName)
{
this.currentQueueName = queueName;
}
/**
* Receiver set the message listener to receive the message
* */
public void run() throws JMSException, IOException {
try {
MessageConsumer consumer1 = createConsumer();
System.out.println("consumer1 created.");
consumer1.setMessageListener(this);
System.in.read();
consumer1.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
throw e;
}
catch (IOException e2){
e2.printStackTrace();
throw e2;
}catch (Exception e) {
System.out.println("Exception while sending message to the queue" + e);
throw e;
}
}
/**
* This method is to create a consumer
*
* */
private MessageConsumer createConsumer() throws JMSException
{
MessageConsumer consumer = null;
try {
// create the connection factory using URL
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
// create connection
connection = factory.createConnection();
connection.start();
System.out.println("connection.start() successfully");
// create session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create queue if queue doesn't exist
destination = session.createQueue(this.currentQueueName);
// create consumer
consumer = session.createConsumer(destination);
} catch (JMSException e) {
e.printStackTrace();
throw e;
}
return consumer;
}
/**
* Implement onMessage to receive any messages from queue
*
* */
@Override
public void onMessage(Message message) {
try {
System.out.println("Messages received for the consumer");
System.out.println("onMessage triggered: --end ");
if (message instanceof TextMessage) {
TextMessage txtMessage = (TextMessage) message;
System.out.println("Message received: " + txtMessage.getText());
} else {
System.out.println("Invalid message received.");
}
System.out.println("onMessage triggered: --end ");
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}