Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.kahadaptor;
   18   
   19   import java.io.IOException;
   20   import java.util.HashSet;
   21   import java.util.Iterator;
   22   import java.util.List;
   23   import java.util.Map;
   24   import java.util.Set;
   25   import java.util.Map.Entry;
   26   import java.util.concurrent.ConcurrentHashMap;
   27   
   28   import org.apache.activemq.broker.ConnectionContext;
   29   import org.apache.activemq.command.ActiveMQDestination;
   30   import org.apache.activemq.command.Message;
   31   import org.apache.activemq.command.MessageId;
   32   import org.apache.activemq.command.SubscriptionInfo;
   33   import org.apache.activemq.kaha.ListContainer;
   34   import org.apache.activemq.kaha.MapContainer;
   35   import org.apache.activemq.kaha.Marshaller;
   36   import org.apache.activemq.kaha.Store;
   37   import org.apache.activemq.kaha.StoreEntry;
   38   import org.apache.activemq.store.MessageRecoveryListener;
   39   import org.apache.activemq.store.TopicReferenceStore;
   40   import org.apache.commons.logging.Log;
   41   import org.apache.commons.logging.LogFactory;
   42   
   43   public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
   44       private static final Log LOG = LogFactory.getLog(KahaTopicReferenceStore.class);
   45       protected ListContainer<TopicSubAck> ackContainer;
   46       protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
   47       private MapContainer<String, SubscriptionInfo> subscriberContainer;
   48       private Store store;
   49       private static final String TOPIC_SUB_NAME = "tsn";
   50   
   51       public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
   52                                      MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
   53                                      MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
   54           throws IOException {
   55           super(adapter, messageContainer, destination);
   56           this.store = store;
   57           this.ackContainer = ackContainer;
   58           subscriberContainer = subsContainer;
   59           // load all the Ack containers
   60           for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) {
   61               SubscriptionInfo info = i.next();
   62               addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
   63           }
   64       }
   65   
   66       public void dispose(ConnectionContext context) {
   67           super.dispose(context);
   68           subscriberContainer.delete();
   69       }
   70   
   71       protected MessageId getMessageId(Object object) {
   72           return new MessageId(((ReferenceRecord)object).getMessageId());
   73       }
   74   
   75       public void addMessage(ConnectionContext context, Message message) throws IOException {
   76           throw new RuntimeException("Use addMessageReference instead");
   77       }
   78   
   79       public Message getMessage(MessageId identity) throws IOException {
   80           throw new RuntimeException("Use addMessageReference instead");
   81       }
   82   
   83       public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
   84                                       final ReferenceData data) {
   85           boolean uniqueReferenceAdded = false;
   86           lock.lock();
   87           try {
   88               final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
   89               final int subscriberCount = subscriberMessages.size();
   90               if (subscriberCount > 0 && !isDuplicate(messageId)) {
   91                   final StoreEntry messageEntry = messageContainer.place(messageId, record);
   92                   addInterest(record);
   93                   uniqueReferenceAdded = true;
   94                   final TopicSubAck tsa = new TopicSubAck();
   95                   tsa.setCount(subscriberCount);
   96                   tsa.setMessageEntry(messageEntry);
   97                   final StoreEntry ackEntry = ackContainer.placeLast(tsa);
   98                   for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
   99                       final TopicSubContainer container = i.next();
  100                       final ConsumerMessageRef ref = new ConsumerMessageRef();
  101                       ref.setAckEntry(ackEntry);
  102                       ref.setMessageEntry(messageEntry);
  103                       ref.setMessageId(messageId);
  104                       container.add(ref);
  105                   }
  106                   if (LOG.isTraceEnabled()) {
  107                       LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
  108                   }
  109               }
  110           } finally {
  111               lock.unlock();
  112           }
  113           return uniqueReferenceAdded;
  114       }
  115   
  116       public ReferenceData getMessageReference(final MessageId identity) throws IOException {
  117           final ReferenceRecord result = messageContainer.get(identity);
  118           if (result == null) {
  119               return null;
  120           }
  121           return result.getData();
  122       }
  123   
  124       public void addReferenceFileIdsInUse() {
  125           for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
  126               TopicSubAck subAck = ackContainer.get(entry);
  127               if (subAck.getCount() > 0) {
  128                   ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
  129                   addInterest(rr);
  130               }
  131           }
  132       }
  133   
  134       
  135       protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
  136           String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName));
  137           MapContainer container = store.getMapContainer(containerName,containerName);
  138           container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
  139           Marshaller marshaller = new ConsumerMessageRefMarshaller();
  140           container.setValueMarshaller(marshaller);
  141           TopicSubContainer tsc = new TopicSubContainer(container);
  142           subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
  143           return container;
  144       }
  145   
  146       public boolean acknowledgeReference(ConnectionContext context,
  147               String clientId, String subscriptionName, MessageId messageId)
  148               throws IOException {
  149           boolean removeMessage = false;
  150           lock.lock();
  151               try {
  152               String key = getSubscriptionKey(clientId, subscriptionName);
  153       
  154               TopicSubContainer container = subscriberMessages.get(key);
  155               if (container != null) {
  156                   ConsumerMessageRef ref = null;
  157                   if((ref = container.remove(messageId)) != null) {
  158                       StoreEntry entry = ref.getAckEntry();
  159                       //ensure we get up to-date pointers
  160                       entry = ackContainer.refresh(entry);
  161                       TopicSubAck tsa = ackContainer.get(entry);
  162                       if (tsa != null) {
  163                           if (tsa.decrementCount() <= 0) {
  164                               ackContainer.remove(entry);
  165                               ReferenceRecord rr = messageContainer.get(messageId);
  166                               if (rr != null) {
  167                                   entry = tsa.getMessageEntry();
  168                                   entry = messageContainer.refresh(entry);
  169                                   messageContainer.remove(entry);
  170                                   removeInterest(rr);
  171                                   removeMessage = true;
  172                                   dispatchAudit.isDuplicate(messageId);
  173                               }
  174                           }else {
  175                               ackContainer.update(entry,tsa);
  176                           }
  177                       }
  178                       if (LOG.isTraceEnabled()) {
  179                           LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
  180                       }
  181                   }else{
  182                       if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
  183                           // no message reference held        
  184                           removeMessage = true;
  185                           if (LOG.isDebugEnabled()) {
  186                               LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (dup ack): " + messageId);
  187                           }
  188                       }
  189                   }
  190               }
  191           }finally {
  192               lock.unlock();
  193           }
  194           return removeMessage;
  195       }
  196       
  197       // verify that no subscriber has a reference to this message. In the case where the subscribers
  198       // references are persisted but more than the persisted consumers get the message, the ack from the non
  199       // persisted consumer would remove the message in error
  200       //
  201       // see: https://issues.apache.org/activemq/browse/AMQ-2123
  202       private boolean isUnreferencedBySubscribers(
  203               String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
  204           boolean isUnreferenced = true;
  205           for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
  206               if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
  207                   TopicSubContainer container = entry.getValue();
  208                   for (Iterator i = container.iterator(); i.hasNext();) {
  209                       ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
  210                       if (messageId.equals(ref.getMessageId())) {
  211                           isUnreferenced = false;
  212                           break;
  213                       }
  214                   }
  215               }
  216           }
  217           return isUnreferenced; 
  218       }
  219   
  220       public void acknowledge(ConnectionContext context,
  221   			String clientId, String subscriptionName, MessageId messageId) throws IOException {
  222   	    acknowledgeReference(context, clientId, subscriptionName, messageId);
  223   	}
  224   
  225       public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
  226           String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
  227           lock.lock();
  228           try {
  229               // if already exists - won't add it again as it causes data files
  230               // to hang around
  231               if (!subscriberContainer.containsKey(key)) {
  232                   subscriberContainer.put(key, info);
  233                   adapter.addSubscriberState(info);
  234               }
  235               // add the subscriber
  236               addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
  237               if (retroactive) {
  238                   /*
  239                    * for(StoreEntry
  240                    * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
  241                    * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
  242                    * ConsumerMessageRef ref=new ConsumerMessageRef();
  243                    * ref.setAckEntry(entry);
  244                    * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
  245                    */
  246               }
  247           }finally {
  248               lock.unlock();
  249           }
  250       }
  251   
  252       public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
  253           lock.lock();
  254           try {
  255               SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
  256               if (info != null) {
  257                   adapter.removeSubscriberState(info);
  258               }
  259           removeSubscriberMessageContainer(clientId,subscriptionName);
  260           }finally {
  261               lock.unlock();
  262           }
  263       }
  264   
  265       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
  266           SubscriptionInfo[] result = subscriberContainer.values()
  267               .toArray(new SubscriptionInfo[subscriberContainer.size()]);
  268           return result;
  269       }
  270   
  271       public int getMessageCount(String clientId, String subscriberName) throws IOException {
  272           String key = getSubscriptionKey(clientId, subscriberName);
  273           TopicSubContainer container = subscriberMessages.get(key);
  274           return container != null ? container.size() : 0;
  275       }
  276   
  277       public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
  278           return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
  279       }
  280   
  281       public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
  282                                                    MessageRecoveryListener listener) throws Exception {
  283           String key = getSubscriptionKey(clientId, subscriptionName);
  284           lock.lock();
  285           try {
  286               TopicSubContainer container = subscriberMessages.get(key);
  287               if (container != null) {
  288                   int count = 0;
  289                   StoreEntry entry = container.getBatchEntry();
  290                   if (entry == null) {
  291                       entry = container.getEntry();
  292                   } else {
  293                       entry = container.refreshEntry(entry);
  294                       if (entry != null) {
  295                           entry = container.getNextEntry(entry);
  296                       }
  297                   }
  298                  
  299                   if (entry != null) {
  300                       do {
  301                           ConsumerMessageRef consumerRef = container.get(entry);
  302                           ReferenceRecord msg = messageContainer.getValue(consumerRef
  303                                   .getMessageEntry());
  304                           if (msg != null) {
  305                               if (recoverReference(listener, msg)) {
  306                                   count++;
  307                                   container.setBatchEntry(msg.getMessageId(), entry);
  308                               } else {
  309                                   break;
  310                               }
  311                           } else {
  312                               container.reset();
  313                           }
  314       
  315                           entry = container.getNextEntry(entry);
  316                       } while (entry != null && count < maxReturned && listener.hasSpace());
  317                   }
  318               }
  319           }finally {
  320               lock.unlock();
  321           }
  322       }
  323   
  324       public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
  325           throws Exception {
  326           String key = getSubscriptionKey(clientId, subscriptionName);
  327           TopicSubContainer container = subscriberMessages.get(key);
  328           if (container != null) {
  329               for (Iterator i = container.iterator(); i.hasNext();) {
  330                   ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
  331                   ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
  332                   if (msg != null) {
  333                       if (!recoverReference(listener, msg)) {
  334                           break;
  335                       }
  336                   }
  337               }
  338           }
  339       }
  340   
  341       public void resetBatching(String clientId, String subscriptionName) {
  342           lock.lock();
  343           try {
  344               String key = getSubscriptionKey(clientId, subscriptionName);
  345               TopicSubContainer topicSubContainer = subscriberMessages.get(key);
  346               if (topicSubContainer != null) {
  347                   topicSubContainer.reset();
  348               }
  349           }finally {
  350               lock.unlock();
  351           }
  352       }
  353       
  354       public void removeAllMessages(ConnectionContext context) throws IOException {
  355           lock.lock();
  356           try {
  357               Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
  358               for (String key:tmpSet) {
  359                   TopicSubContainer container = subscriberMessages.get(key);
  360                   if (container != null) {
  361                       container.clear();
  362                   }
  363               }
  364               ackContainer.clear();
  365           }finally {
  366               lock.unlock();
  367           }
  368           super.removeAllMessages(context);
  369       }
  370   
  371       protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
  372           String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
  373           String containerName = getSubscriptionContainerName(subscriberKey);
  374           subscriberContainer.remove(subscriberKey);
  375           TopicSubContainer container = subscriberMessages.remove(subscriberKey);
  376           if (container != null) {
  377               for (Iterator i = container.iterator(); i.hasNext();) {
  378                   ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
  379                   if (ref != null) {
  380                       TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
  381                       if (tsa != null) {
  382                           if (tsa.decrementCount() <= 0) {
  383                               ackContainer.remove(ref.getAckEntry());
  384                               messageContainer.remove(tsa.getMessageEntry());
  385                           } else {
  386                               ackContainer.update(ref.getAckEntry(), tsa);
  387                           }
  388                       }
  389                   }
  390               }
  391           }
  392           store.deleteMapContainer(containerName,containerName);
  393       }
  394   
  395       protected String getSubscriptionKey(String clientId, String subscriberName) {
  396           StringBuffer buffer = new StringBuffer();
  397           buffer.append(clientId).append(":");  
  398           String name = subscriberName != null ? subscriberName : "NOT_SET";
  399           return buffer.append(name).toString();
  400       }
  401       
  402       private String getSubscriptionContainerName(String subscriptionKey) {
  403           StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
  404           result.append(destination.getQualifiedName());
  405           result.append(subscriptionKey);
  406           return result.toString();
  407       }
  408   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]