Home » jakarta-jmeter-2.3.4_src » org.apache.jmeter.protocol.jms.client » [javadoc | source]

    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *   http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    *
   17    */
   18   
   19   package org.apache.jmeter.protocol.jms.client;
   20   
   21   import javax.jms.JMSException;
   22   import javax.jms.Message;
   23   import javax.jms.TextMessage;
   24   import javax.jms.Topic;
   25   import javax.jms.TopicConnection;
   26   import javax.jms.TopicSession;
   27   import javax.jms.TopicSubscriber;
   28   import javax.naming.Context;
   29   import javax.naming.InitialContext;
   30   import javax.naming.NamingException;
   31   
   32   import org.apache.jorphan.logging.LoggingManager;
   33   import org.apache.log.Logger;
   34   
   35   /**
   36    * Receives messages in a separate thread until told to stop.
   37    * Run loop permanently receives messages; the sampler calls reset()
   38    * when it has taken enough messages.
   39    *
   40    */
   41   /*
   42    * TODO Needs rework - there is a window between receiving a message and calling reset()
   43    * which means that a message can be lost. It's not clear why a separate thread is needed,
   44    * given that the sampler loops until enough samples have been received.
   45    * Also, messages are received in wait mode, so the RUN flag won't be checked until
   46    * at least one more message has been received.
   47   */
   48   public class ReceiveSubscriber implements Runnable {
   49   
   50       private static final Logger log = LoggingManager.getLoggerForClass();
   51   
   52       private final TopicConnection CONN;
   53   
   54       private final TopicSession SESSION;
   55   
   56       private final Topic TOPIC;
   57   
   58       private final TopicSubscriber SUBSCRIBER;
   59   
   60       //@GuardedBy("this")
   61       private int counter;
   62   
   63       private int loop = 1; // TODO never read
   64   
   65       //@GuardedBy("this")
   66       private StringBuffer buffer = new StringBuffer();
   67   
   68       //@GuardedBy("this")
   69       private volatile boolean RUN = true;
   70       // Needs to be volatile to ensure value is picked up
   71   
   72       //@GuardedBy("this")
   73       private Thread CLIENTTHREAD;
   74   
   75       public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic,
   76               boolean useAuth, String user, String pwd) {
   77           Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd);
   78           TopicConnection _conn = null;
   79           Topic _topic = null;
   80           TopicSession _session = null;
   81           TopicSubscriber _subscriber = null;
   82           if (ctx != null) {
   83               try {
   84                   ConnectionFactory.getTopicConnectionFactory(ctx,connfactory);
   85                   _conn = ConnectionFactory.getTopicConnection();
   86                   _topic = InitialContextFactory.lookupTopic(ctx, topic);
   87                   _session = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
   88                   _subscriber = this.SESSION.createSubscriber(this.TOPIC);
   89                   log.info("created the topic connection successfully");
   90               } catch (JMSException e) {
   91                   log.error("Connection error: " + e.getMessage());
   92               }
   93           } else {
   94               log.error("Could not initialize JNDI Initial Context Factory");
   95           }
   96           this.CONN = _conn;
   97           this.TOPIC = _topic;
   98           this.SESSION = _session;
   99           this.SUBSCRIBER = _subscriber;
  100       }
  101   
  102       /**
  103        * Initialize the JNDI initial context
  104        *
  105        * @param useProps
  106        * @param jndi
  107        * @param url
  108        * @param useAuth
  109        * @param user
  110        * @param pwd
  111        * @return  the JNDI initial context or null
  112        */
  113       // Called by ctor
  114       private Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) {
  115           if (useProps) {
  116               try {
  117                   return new InitialContext();
  118               } catch (NamingException e) {
  119                   log.error(e.getMessage());
  120                   return null;
  121               }
  122           } else {
  123               return InitialContextFactory.lookupContext(jndi, url, useAuth, user, pwd);
  124           }
  125       }
  126   
  127       /**
  128        * Set the number of iterations for each call to sample()
  129        *
  130        * @param loop
  131        */
  132       public void setLoop(int loop) {
  133           this.loop = loop;
  134       }
  135   
  136       /**
  137        * Resume will call Connection.start() and begin receiving messages from the
  138        * JMS provider.
  139        */
  140       public void resume() {
  141           if (this.CONN == null) {
  142               log.error("Connection not set up");
  143               return;
  144           }
  145           try {
  146               this.CONN.start();
  147           } catch (JMSException e) {
  148               log.error("failed to start recieving");
  149           }
  150       }
  151   
  152       /**
  153        * Get the message as a string
  154        *
  155        */
  156       public synchronized String getMessage() {
  157           return this.buffer.toString();
  158       }
  159   
  160       /**
  161        * Get the message(s) as an array of byte[]
  162        * 
  163        */
  164       public synchronized byte[] getByteResult() {
  165           return this.buffer.toString().getBytes();
  166       }
  167   
  168       /**
  169        * close() will stop the connection first. Then it closes the subscriber,
  170        * session and connection.
  171        */
  172       public synchronized void close() { // called from testEnded() thread
  173           try {
  174               this.RUN = false;
  175               this.CONN.stop();
  176               this.SUBSCRIBER.close();
  177               this.SESSION.close();
  178               this.CONN.close();
  179               this.CLIENTTHREAD.interrupt();
  180               this.CLIENTTHREAD = null;
  181               this.buffer.setLength(0);
  182           } catch (JMSException e) {
  183               log.error(e.getMessage());
  184           } catch (Exception e) {
  185               log.error(e.getMessage());
  186           }
  187       }
  188   
  189       /**
  190        * Reset the receiver ready for receiving any further messages
  191        */
  192       public synchronized void reset() {
  193           counter = 0;
  194           this.buffer.setLength(0);
  195       }
  196   
  197       /**
  198        * Increment the count and return the new value
  199        *
  200        * @param increment
  201        */
  202       public synchronized int count(int increment) {
  203           counter += increment;
  204           return counter;
  205       }
  206   
  207       /**
  208        * start will create a new thread and pass this class. once the thread is
  209        * created, it calls Thread.start().
  210        */
  211       public void start() {
  212           // No point starting thread unless subscriber exists
  213           if (SUBSCRIBER == null) {
  214               log.error("Subscriber has not been set up");
  215               return;
  216           }
  217           this.CLIENTTHREAD = new Thread(this, "Subscriber2");
  218           this.CLIENTTHREAD.start();
  219       }
  220   
  221       /**
  222        * run calls listen to begin listening for inbound messages from the
  223        * provider.
  224        * 
  225        * Updates the count field so the caller can check how many messages have been receieved.
  226        * 
  227        */
  228       public void run() {
  229           if (SUBSCRIBER == null) { // just in case
  230               log.error("Subscriber has not been set up");
  231               return;
  232           }
  233           while (RUN) {
  234               try {
  235                   Message message = this.SUBSCRIBER.receive();
  236                   if (message != null && message instanceof TextMessage) {
  237                       TextMessage msg = (TextMessage) message;
  238                       String text = msg.getText();
  239                       if (text.trim().length() > 0) {
  240                           synchronized (this) {
  241                               this.buffer.append(text);
  242                               count(1);
  243                           }
  244                       }
  245                   }
  246               } catch (JMSException e) {
  247                   log.error("Communication error: " + e.getMessage());
  248               }
  249           }
  250       }
  251   }

Home » jakarta-jmeter-2.3.4_src » org.apache.jmeter.protocol.jms.client » [javadoc | source]