|
|||||||||||||||||||
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover | |||||||||||||||||||
Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
JMS10BroadcastingListener.java | 0% | 0% | 0% | 0% |
|
1 |
/*
|
|
2 |
* Copyright (c) 2002-2003 by OpenSymphony
|
|
3 |
* All rights reserved.
|
|
4 |
*/
|
|
5 |
package com.opensymphony.oscache.plugins.clustersupport;
|
|
6 |
|
|
7 |
import com.opensymphony.oscache.base.Cache;
|
|
8 |
import com.opensymphony.oscache.base.Config;
|
|
9 |
import com.opensymphony.oscache.base.FinalizationException;
|
|
10 |
import com.opensymphony.oscache.base.InitializationException;
|
|
11 |
|
|
12 |
import org.apache.commons.logging.Log;
|
|
13 |
import org.apache.commons.logging.LogFactory;
|
|
14 |
|
|
15 |
import javax.jms.*;
|
|
16 |
|
|
17 |
import javax.naming.InitialContext;
|
|
18 |
|
|
19 |
/**
|
|
20 |
* A JMS 1.0.x based clustering implementation. This implementation is independent of
|
|
21 |
* the JMS provider and uses non-persistent messages on a publish subscribe protocol.
|
|
22 |
*
|
|
23 |
* @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
|
|
24 |
*/
|
|
25 |
public class JMS10BroadcastingListener extends AbstractBroadcastingListener { |
|
26 |
private final static Log log = LogFactory.getLog(JMS10BroadcastingListener.class); |
|
27 |
|
|
28 |
/**
|
|
29 |
* The name of this cluster. Used to identify the sender of a message.
|
|
30 |
*/
|
|
31 |
private String clusterNode;
|
|
32 |
|
|
33 |
/**
|
|
34 |
*The JMS connection used
|
|
35 |
*/
|
|
36 |
private TopicConnection connection;
|
|
37 |
|
|
38 |
/**
|
|
39 |
* Th object used to publish new messages
|
|
40 |
*/
|
|
41 |
private TopicPublisher publisher;
|
|
42 |
|
|
43 |
/**
|
|
44 |
* The current JMS session
|
|
45 |
*/
|
|
46 |
private TopicSession publisherSession;
|
|
47 |
|
|
48 |
/**
|
|
49 |
* <p>Called by the cache administrator class when a cache is instantiated.</p>
|
|
50 |
* <p>The JMS broadcasting implementation requires the following configuration
|
|
51 |
* properties to be specified in <code>oscache.properties</code>:
|
|
52 |
* <ul>
|
|
53 |
* <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>
|
|
54 |
* <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>
|
|
55 |
* <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This should
|
|
56 |
* be unique for each node.</li>
|
|
57 |
* Please refer to the clustering documentation for further details on configuring
|
|
58 |
* the JMS clustered caching.</p>
|
|
59 |
*
|
|
60 |
* @param cache the cache instance that this listener is attached to.
|
|
61 |
*
|
|
62 |
* @throws InitializationException thrown when there was a
|
|
63 |
* problem initializing the listener. The cache administrator will log this error and
|
|
64 |
* disable the listener.
|
|
65 |
*/
|
|
66 | 0 |
public void initialize(Cache cache, Config config) throws InitializationException { |
67 | 0 |
super.initialize(cache, config);
|
68 |
|
|
69 |
// Get the name of this node
|
|
70 | 0 |
clusterNode = config.getProperty("cache.cluster.jms.node.name");
|
71 |
|
|
72 | 0 |
String topic = config.getProperty("cache.cluster.jms.topic.name");
|
73 | 0 |
String topicFactory = config.getProperty("cache.cluster.jms.topic.factory");
|
74 |
|
|
75 | 0 |
if (log.isInfoEnabled()) {
|
76 | 0 |
log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")"); |
77 |
} |
|
78 |
|
|
79 | 0 |
try {
|
80 |
// Make sure you have specified the necessary JNDI properties (usually in
|
|
81 |
// a jndi.properties resource file, or as system properties)
|
|
82 | 0 |
InitialContext jndi = new InitialContext();
|
83 |
|
|
84 |
// Look up a JMS connection factory
|
|
85 | 0 |
TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi.lookup(topicFactory); |
86 |
|
|
87 |
// Create a JMS connection
|
|
88 | 0 |
connection = connectionFactory.createTopicConnection(); |
89 |
|
|
90 |
// Create session objects
|
|
91 | 0 |
publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
92 |
|
|
93 | 0 |
TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
94 |
|
|
95 |
// Look up the JMS topic
|
|
96 | 0 |
Topic chatTopic = (Topic) jndi.lookup(topic); |
97 |
|
|
98 |
// Create the publisher and subscriber
|
|
99 | 0 |
publisher = publisherSession.createPublisher(chatTopic); |
100 |
|
|
101 | 0 |
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic); |
102 |
|
|
103 |
// Set the message listener
|
|
104 | 0 |
subscriber.setMessageListener(new MessageListener() {
|
105 | 0 |
public void onMessage(Message message) { |
106 | 0 |
try {
|
107 |
//check the message type
|
|
108 | 0 |
ObjectMessage objectMessage = null;
|
109 |
|
|
110 | 0 |
if (!(message instanceof ObjectMessage)) { |
111 | 0 |
log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored."); |
112 | 0 |
return;
|
113 |
} |
|
114 |
|
|
115 | 0 |
objectMessage = (ObjectMessage) message; |
116 |
|
|
117 |
//check the message content
|
|
118 | 0 |
if (!(objectMessage.getObject() instanceof ClusterNotification)) { |
119 | 0 |
log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored."); |
120 | 0 |
return;
|
121 |
} |
|
122 |
|
|
123 | 0 |
if (log.isDebugEnabled()) {
|
124 | 0 |
log.debug(objectMessage.getObject()); |
125 |
} |
|
126 |
|
|
127 |
// This prevents the notification sent by this node from being handled by itself
|
|
128 | 0 |
if (!objectMessage.getStringProperty("nodeName").equals(clusterNode)) { |
129 |
//now handle the message
|
|
130 | 0 |
ClusterNotification notification = (ClusterNotification) objectMessage.getObject(); |
131 | 0 |
handleClusterNotification(notification); |
132 |
} |
|
133 |
} catch (JMSException jmsEx) {
|
|
134 | 0 |
log.error("Cannot handle cluster Notification", jmsEx);
|
135 |
} |
|
136 |
} |
|
137 |
}); |
|
138 |
|
|
139 |
// Start the JMS connection; allows messages to be delivered
|
|
140 | 0 |
connection.start(); |
141 |
} catch (Exception e) {
|
|
142 | 0 |
throw new InitializationException("Initialization of the JMS10BroadcastingListener failed: " + e); |
143 |
} |
|
144 |
} |
|
145 |
|
|
146 |
/**
|
|
147 |
* Called by the cache administrator class when a cache is destroyed.
|
|
148 |
*
|
|
149 |
* @throws FinalizationException thrown when there was a problem finalizing the
|
|
150 |
* listener. The cache administrator will catch and log this error.
|
|
151 |
*/
|
|
152 | 0 |
public void finialize() throws FinalizationException { |
153 | 0 |
try {
|
154 | 0 |
if (log.isInfoEnabled()) {
|
155 | 0 |
log.info("Shutting down JMS clustering...");
|
156 |
} |
|
157 |
|
|
158 | 0 |
connection.close(); |
159 |
|
|
160 | 0 |
if (log.isInfoEnabled()) {
|
161 | 0 |
log.info("JMS clustering shutdown complete.");
|
162 |
} |
|
163 |
} catch (JMSException e) {
|
|
164 | 0 |
log.warn("A problem was encountered when closing the JMS connection", e);
|
165 |
} |
|
166 |
} |
|
167 |
|
|
168 | 0 |
protected void sendNotification(ClusterNotification message) { |
169 | 0 |
try {
|
170 | 0 |
ObjectMessage objectMessage = publisherSession.createObjectMessage(); |
171 | 0 |
objectMessage.setObject(message); |
172 |
|
|
173 |
//sign the message, with the name of this node
|
|
174 | 0 |
objectMessage.setStringProperty("nodeName", clusterNode);
|
175 | 0 |
publisher.publish(objectMessage); |
176 |
} catch (JMSException e) {
|
|
177 | 0 |
log.error("Cannot send notification " + message, e);
|
178 |
} |
|
179 |
} |
|
180 |
} |
|
181 |
|
|