Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jdbc » adapter » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jdbc.adapter;
   18   
   19   import java.io.IOException;
   20   import java.sql.PreparedStatement;
   21   import java.sql.ResultSet;
   22   import java.sql.SQLException;
   23   import java.sql.Statement;
   24   import java.util.ArrayList;
   25   import java.util.HashSet;
   26   import java.util.LinkedList;
   27   import java.util.Set;
   28   import org.apache.activemq.command.ActiveMQDestination;
   29   import org.apache.activemq.command.MessageId;
   30   import org.apache.activemq.command.SubscriptionInfo;
   31   import org.apache.activemq.store.jdbc.JDBCAdapter;
   32   import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
   33   import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
   34   import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
   35   import org.apache.activemq.store.jdbc.Statements;
   36   import org.apache.activemq.store.jdbc.TransactionContext;
   37   import org.apache.commons.logging.Log;
   38   import org.apache.commons.logging.LogFactory;
   39   
   40   /**
   41    * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
   42    * encouraged to override the default implementation of methods to account for differences in JDBC Driver
   43    * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
   44    * The databases/JDBC drivers that use this adapter are:
   45    * <ul>
   46    * <li></li>
   47    * </ul>
   48    * 
   49    * @org.apache.xbean.XBean element="defaultJDBCAdapter"
   50    * 
   51    * @version $Revision: 1.10 $
   52    */
   53   public class DefaultJDBCAdapter implements JDBCAdapter {
   54       private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
   55       protected Statements statements;
   56       protected boolean batchStatments = true;
   57   
   58       protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
   59           s.setBytes(index, data);
   60       }
   61   
   62       protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
   63           return rs.getBytes(index);
   64       }
   65   
   66       public void doCreateTables(TransactionContext c) throws SQLException, IOException {
   67           Statement s = null;
   68           try {
   69               // Check to see if the table already exists. If it does, then don't
   70               // log warnings during startup.
   71               // Need to run the scripts anyways since they may contain ALTER
   72               // statements that upgrade a previous version
   73               // of the table
   74               boolean alreadyExists = false;
   75               ResultSet rs = null;
   76               try {
   77                   rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
   78                           new String[] { "TABLE" });
   79                   alreadyExists = rs.next();
   80               } catch (Throwable ignore) {
   81               } finally {
   82                   close(rs);
   83               }
   84               s = c.getConnection().createStatement();
   85               String[] createStatments = this.statements.getCreateSchemaStatements();
   86               for (int i = 0; i < createStatments.length; i++) {
   87                   // This will fail usually since the tables will be
   88                   // created already.
   89                   try {
   90                       LOG.debug("Executing SQL: " + createStatments[i]);
   91                       s.execute(createStatments[i]);
   92                   } catch (SQLException e) {
   93                       if (alreadyExists) {
   94                           LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
   95                                   + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
   96                                   + " Vendor code: " + e.getErrorCode());
   97                       } else {
   98                           LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
   99                                   + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
  100                                   + " Vendor code: " + e.getErrorCode());
  101                           JDBCPersistenceAdapter.log("Failure details: ", e);
  102                       }
  103                   }
  104               }
  105               c.getConnection().commit();
  106           } finally {
  107               try {
  108                   s.close();
  109               } catch (Throwable e) {
  110               }
  111           }
  112       }
  113   
  114       public void doDropTables(TransactionContext c) throws SQLException, IOException {
  115           Statement s = null;
  116           try {
  117               s = c.getConnection().createStatement();
  118               String[] dropStatments = this.statements.getDropSchemaStatements();
  119               for (int i = 0; i < dropStatments.length; i++) {
  120                   // This will fail usually since the tables will be
  121                   // created already.
  122                   try {
  123                       LOG.debug("Executing SQL: " + dropStatments[i]);
  124                       s.execute(dropStatments[i]);
  125                   } catch (SQLException e) {
  126                       LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
  127                               + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
  128                               + e.getErrorCode());
  129                       JDBCPersistenceAdapter.log("Failure details: ", e);
  130                   }
  131               }
  132               c.getConnection().commit();
  133           } finally {
  134               try {
  135                   s.close();
  136               } catch (Throwable e) {
  137               }
  138           }
  139       }
  140   
  141       public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
  142           PreparedStatement s = null;
  143           ResultSet rs = null;
  144           try {
  145               s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
  146               rs = s.executeQuery();
  147               long seq1 = 0;
  148               if (rs.next()) {
  149                   seq1 = rs.getLong(1);
  150               }
  151               rs.close();
  152               s.close();
  153               s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
  154               rs = s.executeQuery();
  155               long seq2 = 0;
  156               if (rs.next()) {
  157                   seq2 = rs.getLong(1);
  158               }
  159               return Math.max(seq1, seq2);
  160           } finally {
  161               close(rs);
  162               close(s);
  163           }
  164       }
  165       
  166       public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
  167           PreparedStatement s = null;
  168           ResultSet rs = null;
  169           try {
  170               s = c.getConnection().prepareStatement(
  171                       this.statements.getFindMessageByIdStatement());
  172               s.setLong(1, storeSequenceId);
  173               rs = s.executeQuery();
  174               if (!rs.next()) {
  175                   return null;
  176               }
  177               return getBinaryData(rs, 1);
  178           } finally {
  179               close(rs);
  180               close(s);
  181           }
  182       }
  183       
  184   
  185       public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
  186               long expiration) throws SQLException, IOException {
  187           PreparedStatement s = c.getAddMessageStatement();
  188           try {
  189               if (s == null) {
  190                   s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
  191                   if (this.batchStatments) {
  192                       c.setAddMessageStatement(s);
  193                   }
  194               }
  195               s.setLong(1, sequence);
  196               s.setString(2, messageID.getProducerId().toString());
  197               s.setLong(3, messageID.getProducerSequenceId());
  198               s.setString(4, destination.getQualifiedName());
  199               s.setLong(5, expiration);
  200               setBinaryData(s, 6, data);
  201               if (this.batchStatments) {
  202                   s.addBatch();
  203               } else if (s.executeUpdate() != 1) {
  204                   throw new SQLException("Failed add a message");
  205               }
  206           } finally {
  207               if (!this.batchStatments) {
  208                   if (s != null) {
  209                       s.close();
  210                   }
  211               }
  212           }
  213       }
  214   
  215       public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
  216               long expirationTime, String messageRef) throws SQLException, IOException {
  217           PreparedStatement s = c.getAddMessageStatement();
  218           try {
  219               if (s == null) {
  220                   s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
  221                   if (this.batchStatments) {
  222                       c.setAddMessageStatement(s);
  223                   }
  224               }
  225               s.setLong(1, messageID.getBrokerSequenceId());
  226               s.setString(2, messageID.getProducerId().toString());
  227               s.setLong(3, messageID.getProducerSequenceId());
  228               s.setString(4, destination.getQualifiedName());
  229               s.setLong(5, expirationTime);
  230               s.setString(6, messageRef);
  231               if (this.batchStatments) {
  232                   s.addBatch();
  233               } else if (s.executeUpdate() != 1) {
  234                   throw new SQLException("Failed add a message");
  235               }
  236           } finally {
  237               if (!this.batchStatments) {
  238                   s.close();
  239               }
  240           }
  241       }
  242   
  243       public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
  244           PreparedStatement s = null;
  245           ResultSet rs = null;
  246           try {
  247               s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
  248               s.setString(1, messageID.getProducerId().toString());
  249               s.setLong(2, messageID.getProducerSequenceId());
  250               rs = s.executeQuery();
  251               if (!rs.next()) {
  252                   return 0;
  253               }
  254               return rs.getLong(1);
  255           } finally {
  256               close(rs);
  257               close(s);
  258           }
  259       }
  260   
  261       public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
  262           PreparedStatement s = null;
  263           ResultSet rs = null;
  264           try {
  265               s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
  266               s.setString(1, id.getProducerId().toString());
  267               s.setLong(2, id.getProducerSequenceId());
  268               rs = s.executeQuery();
  269               if (!rs.next()) {
  270                   return null;
  271               }
  272               return getBinaryData(rs, 1);
  273           } finally {
  274               close(rs);
  275               close(s);
  276           }
  277       }
  278   
  279       public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
  280           PreparedStatement s = null;
  281           ResultSet rs = null;
  282           try {
  283               s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
  284               s.setLong(1, seq);
  285               rs = s.executeQuery();
  286               if (!rs.next()) {
  287                   return null;
  288               }
  289               return rs.getString(1);
  290           } finally {
  291               close(rs);
  292               close(s);
  293           }
  294       }
  295   
  296       public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
  297           PreparedStatement s = c.getRemovedMessageStatement();
  298           try {
  299               if (s == null) {
  300                   s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment());
  301                   if (this.batchStatments) {
  302                       c.setRemovedMessageStatement(s);
  303                   }
  304               }
  305               s.setLong(1, seq);
  306               if (this.batchStatments) {
  307                   s.addBatch();
  308               } else if (s.executeUpdate() != 1) {
  309                   throw new SQLException("Failed to remove message");
  310               }
  311           } finally {
  312               if (!this.batchStatments && s != null) {
  313                   s.close();
  314               }
  315           }
  316       }
  317   
  318       public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
  319               throws Exception {
  320           PreparedStatement s = null;
  321           ResultSet rs = null;
  322           try {
  323               s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
  324               s.setString(1, destination.getQualifiedName());
  325               rs = s.executeQuery();
  326               if (this.statements.isUseExternalMessageReferences()) {
  327                   while (rs.next()) {
  328                       if (!listener.recoverMessageReference(rs.getString(2))) {
  329                           break;
  330                       }
  331                   }
  332               } else {
  333                   while (rs.next()) {
  334                       if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
  335                           break;
  336                       }
  337                   }
  338               }
  339           } finally {
  340               close(rs);
  341               close(s);
  342           }
  343       }
  344   
  345       public void doMessageIdScan(TransactionContext c, int limit, 
  346               JDBCMessageIdScanListener listener) throws SQLException, IOException {
  347           PreparedStatement s = null;
  348           ResultSet rs = null;
  349           try {
  350               s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
  351               s.setMaxRows(limit);
  352               rs = s.executeQuery();
  353               // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
  354               LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
  355               while (rs.next()) {
  356                   reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
  357               }
  358               if (LOG.isDebugEnabled()) {
  359                   LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
  360               }
  361               for (MessageId id : reverseOrderIds) {
  362                   listener.messageId(id);
  363               }
  364           } finally {
  365               close(rs);
  366               close(s);
  367           }
  368       }
  369       
  370       public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
  371               String subscriptionName, long seq) throws SQLException, IOException {
  372           PreparedStatement s = c.getUpdateLastAckStatement();
  373           try {
  374               if (s == null) {
  375                   s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
  376                   if (this.batchStatments) {
  377                       c.setUpdateLastAckStatement(s);
  378                   }
  379               }
  380               s.setLong(1, seq);
  381               s.setString(2, destination.getQualifiedName());
  382               s.setString(3, clientId);
  383               s.setString(4, subscriptionName);
  384               if (this.batchStatments) {
  385                   s.addBatch();
  386               } else if (s.executeUpdate() != 1) {
  387                   throw new SQLException("Failed add a message");
  388               }
  389           } finally {
  390               if (!this.batchStatments) {
  391                   s.close();
  392               }
  393           }
  394       }
  395   
  396       public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
  397               String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
  398           // dumpTables(c,
  399           // destination.getQualifiedName(),clientId,subscriptionName);
  400           PreparedStatement s = null;
  401           ResultSet rs = null;
  402           try {
  403               s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
  404               s.setString(1, destination.getQualifiedName());
  405               s.setString(2, clientId);
  406               s.setString(3, subscriptionName);
  407               rs = s.executeQuery();
  408               if (this.statements.isUseExternalMessageReferences()) {
  409                   while (rs.next()) {
  410                       if (!listener.recoverMessageReference(rs.getString(2))) {
  411                           break;
  412                       }
  413                   }
  414               } else {
  415                   while (rs.next()) {
  416                       if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
  417                           break;
  418                       }
  419                   }
  420               }
  421           } finally {
  422               close(rs);
  423               close(s);
  424           }
  425       }
  426   
  427       public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
  428               String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
  429           PreparedStatement s = null;
  430           ResultSet rs = null;
  431           try {
  432               s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
  433               s.setMaxRows(maxReturned);
  434               s.setString(1, destination.getQualifiedName());
  435               s.setString(2, clientId);
  436               s.setString(3, subscriptionName);
  437               s.setLong(4, seq);
  438               rs = s.executeQuery();
  439               int count = 0;
  440               if (this.statements.isUseExternalMessageReferences()) {
  441                   while (rs.next() && count < maxReturned) {
  442                       if (listener.recoverMessageReference(rs.getString(1))) {
  443                           count++;
  444                       } else {
  445                           break;
  446                       }
  447                   }
  448               } else {
  449                   while (rs.next() && count < maxReturned) {
  450                       if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
  451                           count++;
  452                       } else {
  453                           break;
  454                       }
  455                   }
  456               }
  457           } finally {
  458               close(rs);
  459               close(s);
  460           }
  461       }
  462   
  463       public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
  464               String clientId, String subscriptionName) throws SQLException, IOException {
  465           PreparedStatement s = null;
  466           ResultSet rs = null;
  467           int result = 0;
  468           try {
  469               s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
  470               s.setString(1, destination.getQualifiedName());
  471               s.setString(2, clientId);
  472               s.setString(3, subscriptionName);
  473               rs = s.executeQuery();
  474               if (rs.next()) {
  475                   result = rs.getInt(1);
  476               }
  477           } finally {
  478               close(rs);
  479               close(s);
  480           }
  481           return result;
  482       }
  483   
  484       /**
  485        * @param c 
  486        * @param info 
  487        * @param retroactive 
  488        * @throws SQLException 
  489        * @throws IOException 
  490        * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
  491        *      org.apache.activemq.service.SubscriptionInfo)
  492        */
  493       public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
  494               throws SQLException, IOException {
  495           // dumpTables(c, destination.getQualifiedName(), clientId,
  496           // subscriptionName);
  497           PreparedStatement s = null;
  498           try {
  499               long lastMessageId = -1;
  500               if (!retroactive) {
  501                   s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
  502                   ResultSet rs = null;
  503                   try {
  504                       rs = s.executeQuery();
  505                       if (rs.next()) {
  506                           lastMessageId = rs.getLong(1);
  507                       }
  508                   } finally {
  509                       close(rs);
  510                       close(s);
  511                   }
  512               }
  513               s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
  514               s.setString(1, info.getDestination().getQualifiedName());
  515               s.setString(2, info.getClientId());
  516               s.setString(3, info.getSubscriptionName());
  517               s.setString(4, info.getSelector());
  518               s.setLong(5, lastMessageId);
  519               s.setString(6, info.getSubscribedDestination().getQualifiedName());
  520               if (s.executeUpdate() != 1) {
  521                   throw new IOException("Could not create durable subscription for: " + info.getClientId());
  522               }
  523           } finally {
  524               close(s);
  525           }
  526       }
  527   
  528       public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
  529               String clientId, String subscriptionName) throws SQLException, IOException {
  530           PreparedStatement s = null;
  531           ResultSet rs = null;
  532           try {
  533               s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
  534               s.setString(1, destination.getQualifiedName());
  535               s.setString(2, clientId);
  536               s.setString(3, subscriptionName);
  537               rs = s.executeQuery();
  538               if (!rs.next()) {
  539                   return null;
  540               }
  541               SubscriptionInfo subscription = new SubscriptionInfo();
  542               subscription.setDestination(destination);
  543               subscription.setClientId(clientId);
  544               subscription.setSubscriptionName(subscriptionName);
  545               subscription.setSelector(rs.getString(1));
  546               subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
  547                       ActiveMQDestination.QUEUE_TYPE));
  548               return subscription;
  549           } finally {
  550               close(rs);
  551               close(s);
  552           }
  553       }
  554   
  555       public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
  556               throws SQLException, IOException {
  557           PreparedStatement s = null;
  558           ResultSet rs = null;
  559           try {
  560               s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
  561               s.setString(1, destination.getQualifiedName());
  562               rs = s.executeQuery();
  563               ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
  564               while (rs.next()) {
  565                   SubscriptionInfo subscription = new SubscriptionInfo();
  566                   subscription.setDestination(destination);
  567                   subscription.setSelector(rs.getString(1));
  568                   subscription.setSubscriptionName(rs.getString(2));
  569                   subscription.setClientId(rs.getString(3));
  570                   subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
  571                           ActiveMQDestination.QUEUE_TYPE));
  572                   rc.add(subscription);
  573               }
  574               return rc.toArray(new SubscriptionInfo[rc.size()]);
  575           } finally {
  576               close(rs);
  577               close(s);
  578           }
  579       }
  580   
  581       public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
  582               IOException {
  583           PreparedStatement s = null;
  584           try {
  585               s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
  586               s.setString(1, destinationName.getQualifiedName());
  587               s.executeUpdate();
  588               s.close();
  589               s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
  590               s.setString(1, destinationName.getQualifiedName());
  591               s.executeUpdate();
  592           } finally {
  593               close(s);
  594           }
  595       }
  596   
  597       public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
  598               String subscriptionName) throws SQLException, IOException {
  599           PreparedStatement s = null;
  600           try {
  601               s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
  602               s.setString(1, destination.getQualifiedName());
  603               s.setString(2, clientId);
  604               s.setString(3, subscriptionName);
  605               s.executeUpdate();
  606           } finally {
  607               close(s);
  608           }
  609       }
  610   
  611       public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
  612           PreparedStatement s = null;
  613           try {
  614               LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
  615               s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
  616               s.setLong(1, System.currentTimeMillis());
  617               int i = s.executeUpdate();
  618               LOG.debug("Deleted " + i + " old message(s).");
  619           } finally {
  620               close(s);
  621           }
  622       }
  623   
  624       public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
  625               String clientId, String subscriberName) throws SQLException, IOException {
  626           PreparedStatement s = null;
  627           ResultSet rs = null;
  628           long result = -1;
  629           try {
  630               s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
  631               s.setString(1, destination.getQualifiedName());
  632               s.setString(2, clientId);
  633               s.setString(3, subscriberName);
  634               rs = s.executeQuery();
  635               if (rs.next()) {
  636                   result = rs.getLong(1);
  637               }
  638               rs.close();
  639               s.close();
  640           } finally {
  641               close(rs);
  642               close(s);
  643           }
  644           return result;
  645       }
  646   
  647       private static void close(PreparedStatement s) {
  648           try {
  649               s.close();
  650           } catch (Throwable e) {
  651           }
  652       }
  653   
  654       private static void close(ResultSet rs) {
  655           try {
  656               rs.close();
  657           } catch (Throwable e) {
  658           }
  659       }
  660   
  661       public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
  662           HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
  663           PreparedStatement s = null;
  664           ResultSet rs = null;
  665           try {
  666               s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
  667               rs = s.executeQuery();
  668               while (rs.next()) {
  669                   rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
  670               }
  671           } finally {
  672               close(rs);
  673               close(s);
  674           }
  675           return rc;
  676       }
  677   
  678       /**
  679        * @return true if batchStements
  680        */
  681       public boolean isBatchStatments() {
  682           return this.batchStatments;
  683       }
  684   
  685       /**
  686        * @param batchStatments
  687        */
  688       public void setBatchStatments(boolean batchStatments) {
  689           this.batchStatments = batchStatments;
  690       }
  691   
  692       public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
  693           this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
  694       }
  695   
  696       /**
  697        * @return the statements
  698        */
  699       public Statements getStatements() {
  700           return this.statements;
  701       }
  702   
  703       public void setStatements(Statements statements) {
  704           this.statements = statements;
  705       }
  706   
  707       /**
  708        * @param c
  709        * @param destination
  710        * @param clientId
  711        * @param subscriberName
  712        * @return
  713        * @throws SQLException
  714        * @throws IOException
  715        */
  716       public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
  717               String clientId, String subscriberName) throws SQLException, IOException {
  718           PreparedStatement s = null;
  719           ResultSet rs = null;
  720           try {
  721               s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
  722               s.setString(1, destination.getQualifiedName());
  723               s.setString(2, clientId);
  724               s.setString(3, subscriberName);
  725               rs = s.executeQuery();
  726               if (!rs.next()) {
  727                   return null;
  728               }
  729               return getBinaryData(rs, 1);
  730           } finally {
  731               close(rs);
  732               close(s);
  733           }
  734       }
  735   
  736       public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
  737               IOException {
  738           PreparedStatement s = null;
  739           ResultSet rs = null;
  740           int result = 0;
  741           try {
  742               s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
  743               s.setString(1, destination.getQualifiedName());
  744               rs = s.executeQuery();
  745               if (rs.next()) {
  746                   result = rs.getInt(1);
  747               }
  748           } finally {
  749               close(rs);
  750               close(s);
  751           }
  752           return result;
  753       }
  754   
  755       public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
  756               int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
  757           PreparedStatement s = null;
  758           ResultSet rs = null;
  759           try {
  760               s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
  761               s.setMaxRows(maxReturned * 2);
  762               s.setString(1, destination.getQualifiedName());
  763               s.setLong(2, nextSeq);
  764               rs = s.executeQuery();
  765               int count = 0;
  766               if (this.statements.isUseExternalMessageReferences()) {
  767                   while (rs.next() && count < maxReturned) {
  768                       if (listener.recoverMessageReference(rs.getString(1))) {
  769                           count++;
  770                       } else {
  771                           LOG.debug("Stopped recover next messages");
  772                           break;
  773                       }
  774                   }
  775               } else {
  776                   while (rs.next() && count < maxReturned) {
  777                       if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
  778                           count++;
  779                       } else {
  780                           LOG.debug("Stopped recover next messages");
  781                           break;
  782                       }
  783                   }
  784               }
  785           } catch (Exception e) {
  786               e.printStackTrace();
  787           } finally {
  788               close(rs);
  789               close(s);
  790           }
  791       }
  792       /*
  793        * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
  794        * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
  795        * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
  796        * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
  797        * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
  798        * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
  799        * printQuery(s,System.out); }
  800        * 
  801        * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
  802        * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
  803        * 
  804        * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
  805        * printQuery(c.prepareStatement(query), out); }
  806        * 
  807        * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
  808        * 
  809        * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
  810        * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
  811        * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
  812        * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
  813        * try { s.close(); } catch (Throwable ignore) {} } }
  814        */
  815   
  816   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jdbc » adapter » [javadoc | source]