1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package org.apache.jmeter.protocol.jms.client; 20 21 import javax.naming.Context; 22 import javax.naming.InitialContext; 23 import javax.naming.NamingException; 24 import javax.jms.JMSException; 25 import javax.jms.MessageListener; 26 import javax.jms.Topic; 27 import javax.jms.TopicConnection; 28 import javax.jms.TopicSession; 29 import javax.jms.TopicSubscriber; 30 31 import org.apache.jorphan.logging.LoggingManager; 32 import org.apache.log.Logger; 33 34 /** 35 * OnMessageSubscriber is designed to create the connection, session and 36 * subscriber. The sampler is responsible for implementing 37 * javax.jms.MessageListener interface and onMessage(Message msg) method. 38 * 39 * The implementation provides a close() method to clean up the client at the 40 * end of a test. This is important to make sure there aren't any zombie threads 41 * or odd memory leaks. 42 */ 43 public class OnMessageSubscriber { 44 45 private static final Logger log = LoggingManager.getLoggerForClass(); 46 47 private TopicConnection CONN = null; 48 49 private TopicSession SESSION = null; 50 51 private Topic TOPIC = null; 52 53 private TopicSubscriber SUBSCRIBER = null; 54 55 /** 56 * 57 */ 58 public OnMessageSubscriber() { 59 super(); 60 } 61 62 /** 63 * Constructor takes the necessary JNDI related parameters to create a 64 * connection and begin receiving messages. 65 * 66 * @param useProps 67 * @param jndi 68 * @param url 69 * @param connfactory 70 * @param topic 71 * @param useAuth 72 * @param user 73 * @param pwd 74 */ 75 public OnMessageSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic, 76 boolean useAuth, String user, String pwd) { 77 Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd); 78 if (ctx != null) { 79 initConnection(ctx, connfactory, topic); 80 } else { 81 log.error("Could not initialize JNDI Initial Context Factory"); 82 } 83 } 84 85 /** 86 * initialize the JNDI intial context 87 * 88 * @param useProps 89 * @param jndi 90 * @param url 91 * @param useAuth 92 * @param user 93 * @param pwd 94 * @return the context or null 95 */ 96 public Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) { 97 if (useProps) { 98 try { 99 return new InitialContext(); 100 } catch (NamingException e) { 101 log.error(e.getMessage()); 102 return null; 103 } 104 } else { 105 return InitialContextFactory.lookupContext(jndi, url, useAuth, user, pwd); 106 } 107 } 108 109 /** 110 * Initialize the connection, session and subscriber 111 * 112 * @param ctx 113 * @param connfactory 114 * @param topic 115 */ 116 public void initConnection(Context ctx, String connfactory, String topic) { 117 try { 118 this.CONN = ConnectionFactory.getTopicConnection(); 119 this.TOPIC = InitialContextFactory.lookupTopic(ctx, topic); 120 this.SESSION = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 121 this.SUBSCRIBER = this.SESSION.createSubscriber(this.TOPIC); 122 log.info("created the topic connection successfully"); 123 } catch (JMSException e) { 124 log.error("Connection error: " + e.getMessage()); 125 } 126 } 127 128 /** 129 * resume will call Connection.start() to begin receiving inbound messages. 130 */ 131 public void resume() { 132 try { 133 this.CONN.start(); 134 } catch (JMSException e) { 135 log.error("failed to start recieving"); 136 } 137 } 138 139 /** 140 * close will close all the objects and set them to null. 141 */ 142 public void close() { 143 try { 144 log.info("Subscriber closed"); 145 this.SUBSCRIBER.close(); 146 this.SESSION.close(); 147 this.CONN.close(); 148 this.SUBSCRIBER = null; 149 this.SESSION = null; 150 this.CONN = null; 151 } catch (JMSException e) { 152 log.error(e.getMessage()); 153 } catch (Throwable e) { 154 log.error(e.getMessage()); 155 } 156 } 157 158 /** 159 * The sample uses this method to set itself as the listener. That means the 160 * sampler need to implement MessageListener interface. 161 * 162 * @param listener 163 */ 164 public void setMessageListener(MessageListener listener) { 165 try { 166 this.SUBSCRIBER.setMessageListener(listener); 167 } catch (JMSException e) { 168 log.error(e.getMessage()); 169 } 170 } 171 }