1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package org.apache.jmeter.protocol.jms.client; 20 21 import javax.jms.JMSException; 22 import javax.jms.Message; 23 import javax.jms.TextMessage; 24 import javax.jms.Topic; 25 import javax.jms.TopicConnection; 26 import javax.jms.TopicSession; 27 import javax.jms.TopicSubscriber; 28 import javax.naming.Context; 29 import javax.naming.InitialContext; 30 import javax.naming.NamingException; 31 32 import org.apache.jorphan.logging.LoggingManager; 33 import org.apache.log.Logger; 34 35 /** 36 * Receives messages in a separate thread until told to stop. 37 * Run loop permanently receives messages; the sampler calls reset() 38 * when it has taken enough messages. 39 * 40 */ 41 /* 42 * TODO Needs rework - there is a window between receiving a message and calling reset() 43 * which means that a message can be lost. It's not clear why a separate thread is needed, 44 * given that the sampler loops until enough samples have been received. 45 * Also, messages are received in wait mode, so the RUN flag won't be checked until 46 * at least one more message has been received. 47 */ 48 public class ReceiveSubscriber implements Runnable { 49 50 private static final Logger log = LoggingManager.getLoggerForClass(); 51 52 private final TopicConnection CONN; 53 54 private final TopicSession SESSION; 55 56 private final Topic TOPIC; 57 58 private final TopicSubscriber SUBSCRIBER; 59 60 //@GuardedBy("this") 61 private int counter; 62 63 private int loop = 1; // TODO never read 64 65 //@GuardedBy("this") 66 private StringBuffer buffer = new StringBuffer(); 67 68 //@GuardedBy("this") 69 private volatile boolean RUN = true; 70 // Needs to be volatile to ensure value is picked up 71 72 //@GuardedBy("this") 73 private Thread CLIENTTHREAD; 74 75 public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic, 76 boolean useAuth, String user, String pwd) { 77 Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd); 78 TopicConnection _conn = null; 79 Topic _topic = null; 80 TopicSession _session = null; 81 TopicSubscriber _subscriber = null; 82 if (ctx != null) { 83 try { 84 ConnectionFactory.getTopicConnectionFactory(ctx,connfactory); 85 _conn = ConnectionFactory.getTopicConnection(); 86 _topic = InitialContextFactory.lookupTopic(ctx, topic); 87 _session = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 88 _subscriber = this.SESSION.createSubscriber(this.TOPIC); 89 log.info("created the topic connection successfully"); 90 } catch (JMSException e) { 91 log.error("Connection error: " + e.getMessage()); 92 } 93 } else { 94 log.error("Could not initialize JNDI Initial Context Factory"); 95 } 96 this.CONN = _conn; 97 this.TOPIC = _topic; 98 this.SESSION = _session; 99 this.SUBSCRIBER = _subscriber; 100 } 101 102 /** 103 * Initialize the JNDI initial context 104 * 105 * @param useProps 106 * @param jndi 107 * @param url 108 * @param useAuth 109 * @param user 110 * @param pwd 111 * @return the JNDI initial context or null 112 */ 113 // Called by ctor 114 private Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) { 115 if (useProps) { 116 try { 117 return new InitialContext(); 118 } catch (NamingException e) { 119 log.error(e.getMessage()); 120 return null; 121 } 122 } else { 123 return InitialContextFactory.lookupContext(jndi, url, useAuth, user, pwd); 124 } 125 } 126 127 /** 128 * Set the number of iterations for each call to sample() 129 * 130 * @param loop 131 */ 132 public void setLoop(int loop) { 133 this.loop = loop; 134 } 135 136 /** 137 * Resume will call Connection.start() and begin receiving messages from the 138 * JMS provider. 139 */ 140 public void resume() { 141 if (this.CONN == null) { 142 log.error("Connection not set up"); 143 return; 144 } 145 try { 146 this.CONN.start(); 147 } catch (JMSException e) { 148 log.error("failed to start recieving"); 149 } 150 } 151 152 /** 153 * Get the message as a string 154 * 155 */ 156 public synchronized String getMessage() { 157 return this.buffer.toString(); 158 } 159 160 /** 161 * Get the message(s) as an array of byte[] 162 * 163 */ 164 public synchronized byte[] getByteResult() { 165 return this.buffer.toString().getBytes(); 166 } 167 168 /** 169 * close() will stop the connection first. Then it closes the subscriber, 170 * session and connection. 171 */ 172 public synchronized void close() { // called from testEnded() thread 173 try { 174 this.RUN = false; 175 this.CONN.stop(); 176 this.SUBSCRIBER.close(); 177 this.SESSION.close(); 178 this.CONN.close(); 179 this.CLIENTTHREAD.interrupt(); 180 this.CLIENTTHREAD = null; 181 this.buffer.setLength(0); 182 } catch (JMSException e) { 183 log.error(e.getMessage()); 184 } catch (Exception e) { 185 log.error(e.getMessage()); 186 } 187 } 188 189 /** 190 * Reset the receiver ready for receiving any further messages 191 */ 192 public synchronized void reset() { 193 counter = 0; 194 this.buffer.setLength(0); 195 } 196 197 /** 198 * Increment the count and return the new value 199 * 200 * @param increment 201 */ 202 public synchronized int count(int increment) { 203 counter += increment; 204 return counter; 205 } 206 207 /** 208 * start will create a new thread and pass this class. once the thread is 209 * created, it calls Thread.start(). 210 */ 211 public void start() { 212 // No point starting thread unless subscriber exists 213 if (SUBSCRIBER == null) { 214 log.error("Subscriber has not been set up"); 215 return; 216 } 217 this.CLIENTTHREAD = new Thread(this, "Subscriber2"); 218 this.CLIENTTHREAD.start(); 219 } 220 221 /** 222 * run calls listen to begin listening for inbound messages from the 223 * provider. 224 * 225 * Updates the count field so the caller can check how many messages have been receieved. 226 * 227 */ 228 public void run() { 229 if (SUBSCRIBER == null) { // just in case 230 log.error("Subscriber has not been set up"); 231 return; 232 } 233 while (RUN) { 234 try { 235 Message message = this.SUBSCRIBER.receive(); 236 if (message != null && message instanceof TextMessage) { 237 TextMessage msg = (TextMessage) message; 238 String text = msg.getText(); 239 if (text.trim().length() > 0) { 240 synchronized (this) { 241 this.buffer.append(text); 242 count(1); 243 } 244 } 245 } 246 } catch (JMSException e) { 247 log.error("Communication error: " + e.getMessage()); 248 } 249 } 250 } 251 }