Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.kahadaptor;
   18   
   19   import java.io.IOException;
   20   import java.util.Iterator;
   21   import java.util.Map;
   22   import java.util.concurrent.ConcurrentHashMap;
   23   import org.apache.activemq.broker.ConnectionContext;
   24   import org.apache.activemq.command.ActiveMQDestination;
   25   import org.apache.activemq.command.Message;
   26   import org.apache.activemq.command.MessageId;
   27   import org.apache.activemq.command.SubscriptionInfo;
   28   import org.apache.activemq.kaha.ListContainer;
   29   import org.apache.activemq.kaha.MapContainer;
   30   import org.apache.activemq.kaha.Marshaller;
   31   import org.apache.activemq.kaha.Store;
   32   import org.apache.activemq.kaha.StoreEntry;
   33   import org.apache.activemq.store.MessageRecoveryListener;
   34   import org.apache.activemq.store.TopicMessageStore;
   35   
   36   /**
   37    * @version $Revision: 1.5 $
   38    */
   39   public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
   40   
   41       protected ListContainer<TopicSubAck> ackContainer;
   42       protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>();
   43       private Map<String, SubscriptionInfo> subscriberContainer;
   44       private Store store;
   45   
   46       public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer,
   47                                    ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer,
   48                                    ActiveMQDestination destination) throws IOException {
   49           super(messageContainer, destination);
   50           this.store = store;
   51           this.ackContainer = ackContainer;
   52           subscriberContainer = subsContainer;
   53           // load all the Ack containers
   54           for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
   55               Object key = i.next();
   56               addSubscriberMessageContainer(key);
   57           }
   58       }
   59   
   60       @Override
   61       public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
   62           int subscriberCount = subscriberMessages.size();
   63           if (subscriberCount > 0) {
   64               MessageId id = message.getMessageId();
   65               StoreEntry messageEntry = messageContainer.place(id, message);
   66               TopicSubAck tsa = new TopicSubAck();
   67               tsa.setCount(subscriberCount);
   68               tsa.setMessageEntry(messageEntry);
   69               StoreEntry ackEntry = ackContainer.placeLast(tsa);
   70               for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
   71                   TopicSubContainer container = i.next();
   72                   ConsumerMessageRef ref = new ConsumerMessageRef();
   73                   ref.setAckEntry(ackEntry);
   74                   ref.setMessageEntry(messageEntry);
   75                   ref.setMessageId(id);
   76                   container.add(ref);
   77               }
   78           }
   79       }
   80   
   81       public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
   82                                            MessageId messageId) throws IOException {
   83           String subcriberId = getSubscriptionKey(clientId, subscriptionName);
   84           TopicSubContainer container = subscriberMessages.get(subcriberId);
   85           if (container != null) {
   86               ConsumerMessageRef ref = container.remove(messageId);
   87               if (container.isEmpty()) {
   88                   container.reset();
   89               }
   90               if (ref != null) {
   91                   TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
   92                   if (tsa != null) {
   93                       if (tsa.decrementCount() <= 0) {
   94                           StoreEntry entry = ref.getAckEntry();
   95                           entry = ackContainer.refresh(entry);
   96                           ackContainer.remove(entry);
   97                           entry = tsa.getMessageEntry();
   98                           entry = messageContainer.refresh(entry);
   99                           messageContainer.remove(entry);
  100                       } else {
  101                           ackContainer.update(ref.getAckEntry(), tsa);
  102                       }
  103                   }
  104               }
  105           }
  106       }
  107   
  108       public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
  109           return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
  110       }
  111   
  112       public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
  113           String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
  114           // if already exists - won't add it again as it causes data files
  115           // to hang around
  116           if (!subscriberContainer.containsKey(key)) {
  117               subscriberContainer.put(key, info);
  118           }
  119           // add the subscriber
  120           addSubscriberMessageContainer(key);
  121           /*
  122            * if(retroactive){ for(StoreEntry
  123            * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
  124            * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
  125            * ConsumerMessageRef ref=new ConsumerMessageRef();
  126            * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry());
  127            * container.add(ref); } }
  128            */
  129       }
  130   
  131       public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
  132           String key = getSubscriptionKey(clientId, subscriptionName);
  133           removeSubscriberMessageContainer(key);
  134       }
  135   
  136       public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
  137           throws Exception {
  138           String key = getSubscriptionKey(clientId, subscriptionName);
  139           TopicSubContainer container = subscriberMessages.get(key);
  140           if (container != null) {
  141               for (Iterator i = container.iterator(); i.hasNext();) {
  142                   ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
  143                   Message msg = messageContainer.get(ref.getMessageEntry());
  144                   if (msg != null) {
  145                       if (!recoverMessage(listener, msg)) {
  146                           break;
  147                       }
  148                   }
  149               }
  150           }
  151       }
  152   
  153       public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
  154                                       MessageRecoveryListener listener) throws Exception {
  155           String key = getSubscriptionKey(clientId, subscriptionName);
  156           TopicSubContainer container = subscriberMessages.get(key);
  157           if (container != null) {
  158               int count = 0;
  159               StoreEntry entry = container.getBatchEntry();
  160               if (entry == null) {
  161                   entry = container.getEntry();
  162               } else {
  163                   entry = container.refreshEntry(entry);
  164                   if (entry != null) {
  165                       entry = container.getNextEntry(entry);
  166                   }
  167               }
  168               if (entry != null) {
  169                   do {
  170                       ConsumerMessageRef consumerRef = container.get(entry);
  171                       Message msg = messageContainer.getValue(consumerRef.getMessageEntry());
  172                       if (msg != null) {
  173                           recoverMessage(listener, msg);
  174                           count++;
  175                           container.setBatchEntry(msg.getMessageId().toString(), entry);
  176                       } else {
  177                           container.reset();
  178                       }
  179   
  180                       entry = container.getNextEntry(entry);
  181                   } while (entry != null && count < maxReturned && listener.hasSpace());
  182               }
  183           }
  184       }
  185   
  186       public synchronized void delete() {
  187           super.delete();
  188           ackContainer.clear();
  189           subscriberContainer.clear();
  190       }
  191   
  192       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
  193           return subscriberContainer.values()
  194               .toArray(new SubscriptionInfo[subscriberContainer.size()]);
  195       }
  196   
  197       protected String getSubscriptionKey(String clientId, String subscriberName) {
  198           String result = clientId + ":";
  199           result += subscriberName != null ? subscriberName : "NOT_SET";
  200           return result;
  201       }
  202   
  203       protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
  204           MapContainer container = store.getMapContainer(key, "topic-subs");
  205           container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
  206           Marshaller marshaller = new ConsumerMessageRefMarshaller();
  207           container.setValueMarshaller(marshaller);
  208           TopicSubContainer tsc = new TopicSubContainer(container);
  209           subscriberMessages.put(key, tsc);
  210           return container;
  211       }
  212   
  213       protected synchronized void removeSubscriberMessageContainer(Object key)
  214               throws IOException {
  215           subscriberContainer.remove(key);
  216           TopicSubContainer container = subscriberMessages.remove(key);
  217           if (container != null) {
  218               for (Iterator i = container.iterator(); i.hasNext();) {
  219                   ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
  220                   if (ref != null) {
  221                       TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
  222                       if (tsa != null) {
  223                           if (tsa.decrementCount() <= 0) {
  224                               ackContainer.remove(ref.getAckEntry());
  225                               messageContainer.remove(tsa.getMessageEntry());
  226                           } else {
  227                               ackContainer.update(ref.getAckEntry(), tsa);
  228                           }
  229                       }
  230                   }
  231               }
  232               container.clear();
  233           }
  234           store.deleteListContainer(key, "topic-subs");
  235   
  236       }
  237   
  238       public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
  239           String key = getSubscriptionKey(clientId, subscriberName);
  240           TopicSubContainer container = subscriberMessages.get(key);
  241           return container != null ? container.size() : 0;
  242       }
  243   
  244       /**
  245        * @param context
  246        * @throws IOException
  247        * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
  248        */
  249       public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
  250           messageContainer.clear();
  251           ackContainer.clear();
  252           for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
  253               TopicSubContainer container = i.next();
  254               container.clear();
  255           }
  256       }
  257   
  258       public synchronized void resetBatching(String clientId, String subscriptionName) {
  259           String key = getSubscriptionKey(clientId, subscriptionName);
  260           TopicSubContainer topicSubContainer = subscriberMessages.get(key);
  261           if (topicSubContainer != null) {
  262               topicSubContainer.reset();
  263           }
  264       }
  265   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]