Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.kahadaptor;
   18   
   19   import java.io.IOException;
   20   import java.util.HashSet;
   21   import java.util.Set;
   22   import java.util.concurrent.locks.Lock;
   23   import java.util.concurrent.locks.ReentrantLock;
   24   
   25   import org.apache.activemq.ActiveMQMessageAudit;
   26   import org.apache.activemq.broker.ConnectionContext;
   27   import org.apache.activemq.command.ActiveMQDestination;
   28   import org.apache.activemq.command.Message;
   29   import org.apache.activemq.command.MessageAck;
   30   import org.apache.activemq.command.MessageId;
   31   import org.apache.activemq.kaha.MapContainer;
   32   import org.apache.activemq.kaha.StoreEntry;
   33   import org.apache.activemq.store.AbstractMessageStore;
   34   import org.apache.activemq.store.MessageRecoveryListener;
   35   import org.apache.activemq.store.ReferenceStore;
   36   import org.apache.commons.logging.Log;
   37   import org.apache.commons.logging.LogFactory;
   38   
   39   /**
   40    * @author rajdavies
   41    *
   42    */
   43   public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
   44   
   45       private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
   46       protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
   47       protected KahaReferenceStoreAdapter adapter;
   48       // keep track of dispatched messages so that duplicate sends that follow a successful
   49       // dispatch can be suppressed.
   50       protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
   51       private StoreEntry batchEntry;
   52       private String lastBatchId;
   53       protected final Lock lock = new ReentrantLock();
   54   
   55       public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
   56                                 ActiveMQDestination destination) throws IOException {
   57           super(destination);
   58           this.adapter = adapter;
   59           this.messageContainer = container;
   60       }
   61       
   62       public Lock getStoreLock() {
   63           return lock;
   64       }
   65   
   66       public void dispose(ConnectionContext context) {
   67           super.dispose(context);
   68           this.messageContainer.delete();
   69           this.adapter.removeReferenceStore(this);
   70       }
   71   
   72       protected MessageId getMessageId(Object object) {
   73           return new MessageId(((ReferenceRecord)object).getMessageId());
   74       }
   75   
   76       public void addMessage(ConnectionContext context, Message message) throws IOException {
   77           throw new RuntimeException("Use addMessageReference instead");
   78       }
   79   
   80       public Message getMessage(MessageId identity) throws IOException {
   81           throw new RuntimeException("Use addMessageReference instead");
   82       }
   83   
   84       protected final boolean recoverReference(MessageRecoveryListener listener,
   85               ReferenceRecord record) throws Exception {
   86           MessageId id = new MessageId(record.getMessageId());
   87           if (listener.hasSpace()) {
   88               return listener.recoverMessageReference(id);
   89           }
   90           return false;
   91       }
   92   
   93       public void recover(MessageRecoveryListener listener) throws Exception {
   94           lock.lock();
   95           try {
   96               for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
   97                   .getNext(entry)) {
   98                   ReferenceRecord record = messageContainer.getValue(entry);
   99                   if (!recoverReference(listener, record)) {
  100                       break;
  101                   }
  102               }
  103           }finally {
  104               lock.unlock();
  105           }
  106       }
  107   
  108       public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
  109           throws Exception {
  110           lock.lock();
  111           try {
  112               StoreEntry entry = batchEntry;
  113               if (entry == null) {
  114                   entry = messageContainer.getFirst();
  115               } else {
  116                   entry = messageContainer.refresh(entry);
  117                   if (entry != null) {
  118                       entry = messageContainer.getNext(entry);
  119                   }
  120               }
  121               if (entry != null) {      
  122                   int count = 0;
  123                   do {
  124                       ReferenceRecord msg = messageContainer.getValue(entry);
  125                       if (msg != null ) {
  126                           if (recoverReference(listener, msg)) {
  127                               count++;
  128                               lastBatchId = msg.getMessageId();
  129                           } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
  130                               if (LOG.isDebugEnabled()) {
  131                                   LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
  132                               }
  133                               // give usage limits a chance to reclaim
  134                               break;
  135                           } else {
  136                               // skip duplicate and continue
  137                               if (LOG.isDebugEnabled()) {
  138                                   LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
  139                               }
  140                           }                        
  141                       } else {
  142                           lastBatchId = null;
  143                       }
  144                       batchEntry = entry;
  145                       entry = messageContainer.getNext(entry);
  146                   } while (entry != null && count < maxReturned && listener.hasSpace());
  147               }
  148           }finally {
  149               lock.unlock();
  150           }
  151       }
  152   
  153       public boolean addMessageReference(ConnectionContext context, MessageId messageId,
  154                                                    ReferenceData data) throws IOException {
  155           
  156           boolean uniqueueReferenceAdded = false;
  157           lock.lock();
  158           try {
  159               if (!isDuplicate(messageId)) {
  160                   ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
  161                   messageContainer.put(messageId, record);
  162                   uniqueueReferenceAdded = true;
  163                   addInterest(record);
  164                   if (LOG.isDebugEnabled()) {
  165                       LOG.debug(destination.getPhysicalName() + " add: " + messageId);
  166                   }
  167               }
  168           } finally {
  169               lock.unlock();
  170           }
  171           return uniqueueReferenceAdded;
  172       }
  173   
  174       protected boolean isDuplicate(final MessageId messageId) {
  175           boolean duplicate = messageContainer.containsKey(messageId);
  176           if (!duplicate) {
  177               duplicate = dispatchAudit.isDuplicate(messageId);
  178               if (duplicate) {
  179                   if (LOG.isDebugEnabled()) {
  180                       LOG.debug(destination.getPhysicalName()
  181                           + " ignoring duplicated (add) message reference, already dispatched: "
  182                           + messageId);
  183                   }
  184               }
  185           } else if (LOG.isDebugEnabled()) {
  186               LOG.debug(destination.getPhysicalName()
  187                       + " ignoring duplicated (add) message reference, already in store: " + messageId);
  188           }
  189           return duplicate;
  190       }
  191       
  192       public ReferenceData getMessageReference(MessageId identity) throws IOException {
  193           lock.lock();
  194           try {
  195               ReferenceRecord result = messageContainer.get(identity);
  196               if (result == null) {
  197                   return null;
  198               }
  199               return result.getData();
  200           }finally {
  201               lock.unlock();
  202           }
  203       }
  204   
  205       public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
  206           removeMessage(ack.getLastMessageId());
  207       }
  208   
  209       public void removeMessage(MessageId msgId) throws IOException {  
  210           lock.lock();
  211           try {
  212               StoreEntry entry = messageContainer.getEntry(msgId);
  213               if (entry != null) {
  214                   ReferenceRecord rr = messageContainer.remove(msgId);
  215                   if (rr != null) {
  216                       removeInterest(rr);
  217                       dispatchAudit.isDuplicate(msgId);
  218                       if (LOG.isDebugEnabled()) {
  219                           LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
  220                       }
  221                       if (messageContainer.isEmpty()
  222                           || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
  223                           || (batchEntry != null && batchEntry.equals(entry))) {
  224                           resetBatching();
  225                       }
  226                   }
  227               }
  228           }finally {
  229               lock.unlock();
  230           }
  231       }
  232   
  233       public void removeAllMessages(ConnectionContext context) throws IOException {
  234           lock.lock();
  235           try {
  236               Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
  237               for (MessageId id:tmpSet) {
  238                   removeMessage(id);
  239               }
  240               resetBatching();
  241               messageContainer.clear();
  242           }finally {
  243               lock.unlock();
  244           }
  245       }
  246   
  247       public void delete() {
  248           lock.lock();
  249           try {
  250               messageContainer.clear();
  251           }finally {
  252               lock.unlock();
  253           }
  254       }
  255   
  256       public void resetBatching() {
  257           lock.lock();
  258           try {
  259               batchEntry = null;
  260               lastBatchId = null;
  261           }finally {
  262               lock.unlock();
  263           }
  264       }
  265   
  266       public int getMessageCount() {
  267           return messageContainer.size();
  268       }
  269   
  270       public boolean isSupportForCursors() {
  271           return true;
  272       }
  273   
  274       public boolean supportsExternalBatchControl() {
  275           return true;
  276       }
  277   
  278       void removeInterest(ReferenceRecord rr) {
  279           adapter.removeInterestInRecordFile(rr.getData().getFileId());
  280       }
  281   
  282       void addInterest(ReferenceRecord rr) {
  283           adapter.addInterestInRecordFile(rr.getData().getFileId());
  284       }
  285   
  286       /**
  287        * @param startAfter
  288        * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
  289        */
  290       public void setBatch(MessageId startAfter) {
  291           lock.lock();
  292           try {
  293               batchEntry = messageContainer.getEntry(startAfter);
  294               if (LOG.isDebugEnabled()) {
  295                   LOG.debug("setBatch: " + startAfter);
  296               }
  297           } finally {
  298               lock.unlock();
  299           }
  300       }
  301   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » kahadaptor » [javadoc | source]