001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.sql.PreparedStatement;
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    import java.util.ArrayList;
025    import java.util.HashSet;
026    import java.util.LinkedList;
027    import java.util.Set;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.MessageId;
030    import org.apache.activemq.command.SubscriptionInfo;
031    import org.apache.activemq.store.jdbc.JDBCAdapter;
032    import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
033    import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
034    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
035    import org.apache.activemq.store.jdbc.Statements;
036    import org.apache.activemq.store.jdbc.TransactionContext;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
042     * encouraged to override the default implementation of methods to account for differences in JDBC Driver
043     * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
044     * The databases/JDBC drivers that use this adapter are:
045     * <ul>
046     * <li></li>
047     * </ul>
048     * 
049     * @org.apache.xbean.XBean element="defaultJDBCAdapter"
050     * 
051     * @version $Revision: 1.10 $
052     */
053    public class DefaultJDBCAdapter implements JDBCAdapter {
054        private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
055        protected Statements statements;
056        protected boolean batchStatments = true;
057    
058        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
059            s.setBytes(index, data);
060        }
061    
062        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
063            return rs.getBytes(index);
064        }
065    
066        public void doCreateTables(TransactionContext c) throws SQLException, IOException {
067            Statement s = null;
068            try {
069                // Check to see if the table already exists. If it does, then don't
070                // log warnings during startup.
071                // Need to run the scripts anyways since they may contain ALTER
072                // statements that upgrade a previous version
073                // of the table
074                boolean alreadyExists = false;
075                ResultSet rs = null;
076                try {
077                    rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
078                            new String[] { "TABLE" });
079                    alreadyExists = rs.next();
080                } catch (Throwable ignore) {
081                } finally {
082                    close(rs);
083                }
084                s = c.getConnection().createStatement();
085                String[] createStatments = this.statements.getCreateSchemaStatements();
086                for (int i = 0; i < createStatments.length; i++) {
087                    // This will fail usually since the tables will be
088                    // created already.
089                    try {
090                        LOG.debug("Executing SQL: " + createStatments[i]);
091                        s.execute(createStatments[i]);
092                    } catch (SQLException e) {
093                        if (alreadyExists) {
094                            LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
095                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
096                                    + " Vendor code: " + e.getErrorCode());
097                        } else {
098                            LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
099                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
100                                    + " Vendor code: " + e.getErrorCode());
101                            JDBCPersistenceAdapter.log("Failure details: ", e);
102                        }
103                    }
104                }
105                c.getConnection().commit();
106            } finally {
107                try {
108                    s.close();
109                } catch (Throwable e) {
110                }
111            }
112        }
113    
114        public void doDropTables(TransactionContext c) throws SQLException, IOException {
115            Statement s = null;
116            try {
117                s = c.getConnection().createStatement();
118                String[] dropStatments = this.statements.getDropSchemaStatements();
119                for (int i = 0; i < dropStatments.length; i++) {
120                    // This will fail usually since the tables will be
121                    // created already.
122                    try {
123                        LOG.debug("Executing SQL: " + dropStatments[i]);
124                        s.execute(dropStatments[i]);
125                    } catch (SQLException e) {
126                        LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
127                                + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
128                                + e.getErrorCode());
129                        JDBCPersistenceAdapter.log("Failure details: ", e);
130                    }
131                }
132                c.getConnection().commit();
133            } finally {
134                try {
135                    s.close();
136                } catch (Throwable e) {
137                }
138            }
139        }
140    
141        public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
142            PreparedStatement s = null;
143            ResultSet rs = null;
144            try {
145                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
146                rs = s.executeQuery();
147                long seq1 = 0;
148                if (rs.next()) {
149                    seq1 = rs.getLong(1);
150                }
151                rs.close();
152                s.close();
153                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
154                rs = s.executeQuery();
155                long seq2 = 0;
156                if (rs.next()) {
157                    seq2 = rs.getLong(1);
158                }
159                return Math.max(seq1, seq2);
160            } finally {
161                close(rs);
162                close(s);
163            }
164        }
165        
166        public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
167            PreparedStatement s = null;
168            ResultSet rs = null;
169            try {
170                s = c.getConnection().prepareStatement(
171                        this.statements.getFindMessageByIdStatement());
172                s.setLong(1, storeSequenceId);
173                rs = s.executeQuery();
174                if (!rs.next()) {
175                    return null;
176                }
177                return getBinaryData(rs, 1);
178            } finally {
179                close(rs);
180                close(s);
181            }
182        }
183        
184    
185        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
186                long expiration) throws SQLException, IOException {
187            PreparedStatement s = c.getAddMessageStatement();
188            try {
189                if (s == null) {
190                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
191                    if (this.batchStatments) {
192                        c.setAddMessageStatement(s);
193                    }
194                }
195                s.setLong(1, sequence);
196                s.setString(2, messageID.getProducerId().toString());
197                s.setLong(3, messageID.getProducerSequenceId());
198                s.setString(4, destination.getQualifiedName());
199                s.setLong(5, expiration);
200                setBinaryData(s, 6, data);
201                if (this.batchStatments) {
202                    s.addBatch();
203                } else if (s.executeUpdate() != 1) {
204                    throw new SQLException("Failed add a message");
205                }
206            } finally {
207                if (!this.batchStatments) {
208                    if (s != null) {
209                        s.close();
210                    }
211                }
212            }
213        }
214    
215        public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
216                long expirationTime, String messageRef) throws SQLException, IOException {
217            PreparedStatement s = c.getAddMessageStatement();
218            try {
219                if (s == null) {
220                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
221                    if (this.batchStatments) {
222                        c.setAddMessageStatement(s);
223                    }
224                }
225                s.setLong(1, messageID.getBrokerSequenceId());
226                s.setString(2, messageID.getProducerId().toString());
227                s.setLong(3, messageID.getProducerSequenceId());
228                s.setString(4, destination.getQualifiedName());
229                s.setLong(5, expirationTime);
230                s.setString(6, messageRef);
231                if (this.batchStatments) {
232                    s.addBatch();
233                } else if (s.executeUpdate() != 1) {
234                    throw new SQLException("Failed add a message");
235                }
236            } finally {
237                if (!this.batchStatments) {
238                    s.close();
239                }
240            }
241        }
242    
243        public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
244            PreparedStatement s = null;
245            ResultSet rs = null;
246            try {
247                s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
248                s.setString(1, messageID.getProducerId().toString());
249                s.setLong(2, messageID.getProducerSequenceId());
250                rs = s.executeQuery();
251                if (!rs.next()) {
252                    return 0;
253                }
254                return rs.getLong(1);
255            } finally {
256                close(rs);
257                close(s);
258            }
259        }
260    
261        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
262            PreparedStatement s = null;
263            ResultSet rs = null;
264            try {
265                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
266                s.setString(1, id.getProducerId().toString());
267                s.setLong(2, id.getProducerSequenceId());
268                rs = s.executeQuery();
269                if (!rs.next()) {
270                    return null;
271                }
272                return getBinaryData(rs, 1);
273            } finally {
274                close(rs);
275                close(s);
276            }
277        }
278    
279        public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
280            PreparedStatement s = null;
281            ResultSet rs = null;
282            try {
283                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
284                s.setLong(1, seq);
285                rs = s.executeQuery();
286                if (!rs.next()) {
287                    return null;
288                }
289                return rs.getString(1);
290            } finally {
291                close(rs);
292                close(s);
293            }
294        }
295    
296        public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
297            PreparedStatement s = c.getRemovedMessageStatement();
298            try {
299                if (s == null) {
300                    s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment());
301                    if (this.batchStatments) {
302                        c.setRemovedMessageStatement(s);
303                    }
304                }
305                s.setLong(1, seq);
306                if (this.batchStatments) {
307                    s.addBatch();
308                } else if (s.executeUpdate() != 1) {
309                    throw new SQLException("Failed to remove message");
310                }
311            } finally {
312                if (!this.batchStatments && s != null) {
313                    s.close();
314                }
315            }
316        }
317    
318        public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
319                throws Exception {
320            PreparedStatement s = null;
321            ResultSet rs = null;
322            try {
323                s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
324                s.setString(1, destination.getQualifiedName());
325                rs = s.executeQuery();
326                if (this.statements.isUseExternalMessageReferences()) {
327                    while (rs.next()) {
328                        if (!listener.recoverMessageReference(rs.getString(2))) {
329                            break;
330                        }
331                    }
332                } else {
333                    while (rs.next()) {
334                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
335                            break;
336                        }
337                    }
338                }
339            } finally {
340                close(rs);
341                close(s);
342            }
343        }
344    
345        public void doMessageIdScan(TransactionContext c, int limit, 
346                JDBCMessageIdScanListener listener) throws SQLException, IOException {
347            PreparedStatement s = null;
348            ResultSet rs = null;
349            try {
350                s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
351                s.setMaxRows(limit);
352                rs = s.executeQuery();
353                // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
354                LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
355                while (rs.next()) {
356                    reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
357                }
358                if (LOG.isDebugEnabled()) {
359                    LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
360                }
361                for (MessageId id : reverseOrderIds) {
362                    listener.messageId(id);
363                }
364            } finally {
365                close(rs);
366                close(s);
367            }
368        }
369        
370        public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
371                String subscriptionName, long seq) throws SQLException, IOException {
372            PreparedStatement s = c.getUpdateLastAckStatement();
373            try {
374                if (s == null) {
375                    s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
376                    if (this.batchStatments) {
377                        c.setUpdateLastAckStatement(s);
378                    }
379                }
380                s.setLong(1, seq);
381                s.setString(2, destination.getQualifiedName());
382                s.setString(3, clientId);
383                s.setString(4, subscriptionName);
384                if (this.batchStatments) {
385                    s.addBatch();
386                } else if (s.executeUpdate() != 1) {
387                    throw new SQLException("Failed add a message");
388                }
389            } finally {
390                if (!this.batchStatments) {
391                    s.close();
392                }
393            }
394        }
395    
396        public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
397                String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
398            // dumpTables(c,
399            // destination.getQualifiedName(),clientId,subscriptionName);
400            PreparedStatement s = null;
401            ResultSet rs = null;
402            try {
403                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
404                s.setString(1, destination.getQualifiedName());
405                s.setString(2, clientId);
406                s.setString(3, subscriptionName);
407                rs = s.executeQuery();
408                if (this.statements.isUseExternalMessageReferences()) {
409                    while (rs.next()) {
410                        if (!listener.recoverMessageReference(rs.getString(2))) {
411                            break;
412                        }
413                    }
414                } else {
415                    while (rs.next()) {
416                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
417                            break;
418                        }
419                    }
420                }
421            } finally {
422                close(rs);
423                close(s);
424            }
425        }
426    
427        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
428                String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
429            PreparedStatement s = null;
430            ResultSet rs = null;
431            try {
432                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
433                s.setMaxRows(maxReturned);
434                s.setString(1, destination.getQualifiedName());
435                s.setString(2, clientId);
436                s.setString(3, subscriptionName);
437                s.setLong(4, seq);
438                rs = s.executeQuery();
439                int count = 0;
440                if (this.statements.isUseExternalMessageReferences()) {
441                    while (rs.next() && count < maxReturned) {
442                        if (listener.recoverMessageReference(rs.getString(1))) {
443                            count++;
444                        } else {
445                            break;
446                        }
447                    }
448                } else {
449                    while (rs.next() && count < maxReturned) {
450                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
451                            count++;
452                        } else {
453                            break;
454                        }
455                    }
456                }
457            } finally {
458                close(rs);
459                close(s);
460            }
461        }
462    
463        public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
464                String clientId, String subscriptionName) throws SQLException, IOException {
465            PreparedStatement s = null;
466            ResultSet rs = null;
467            int result = 0;
468            try {
469                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
470                s.setString(1, destination.getQualifiedName());
471                s.setString(2, clientId);
472                s.setString(3, subscriptionName);
473                rs = s.executeQuery();
474                if (rs.next()) {
475                    result = rs.getInt(1);
476                }
477            } finally {
478                close(rs);
479                close(s);
480            }
481            return result;
482        }
483    
484        /**
485         * @param c 
486         * @param info 
487         * @param retroactive 
488         * @throws SQLException 
489         * @throws IOException 
490         * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
491         *      org.apache.activemq.service.SubscriptionInfo)
492         */
493        public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
494                throws SQLException, IOException {
495            // dumpTables(c, destination.getQualifiedName(), clientId,
496            // subscriptionName);
497            PreparedStatement s = null;
498            try {
499                long lastMessageId = -1;
500                if (!retroactive) {
501                    s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
502                    ResultSet rs = null;
503                    try {
504                        rs = s.executeQuery();
505                        if (rs.next()) {
506                            lastMessageId = rs.getLong(1);
507                        }
508                    } finally {
509                        close(rs);
510                        close(s);
511                    }
512                }
513                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
514                s.setString(1, info.getDestination().getQualifiedName());
515                s.setString(2, info.getClientId());
516                s.setString(3, info.getSubscriptionName());
517                s.setString(4, info.getSelector());
518                s.setLong(5, lastMessageId);
519                s.setString(6, info.getSubscribedDestination().getQualifiedName());
520                if (s.executeUpdate() != 1) {
521                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
522                }
523            } finally {
524                close(s);
525            }
526        }
527    
528        public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
529                String clientId, String subscriptionName) throws SQLException, IOException {
530            PreparedStatement s = null;
531            ResultSet rs = null;
532            try {
533                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
534                s.setString(1, destination.getQualifiedName());
535                s.setString(2, clientId);
536                s.setString(3, subscriptionName);
537                rs = s.executeQuery();
538                if (!rs.next()) {
539                    return null;
540                }
541                SubscriptionInfo subscription = new SubscriptionInfo();
542                subscription.setDestination(destination);
543                subscription.setClientId(clientId);
544                subscription.setSubscriptionName(subscriptionName);
545                subscription.setSelector(rs.getString(1));
546                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
547                        ActiveMQDestination.QUEUE_TYPE));
548                return subscription;
549            } finally {
550                close(rs);
551                close(s);
552            }
553        }
554    
555        public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
556                throws SQLException, IOException {
557            PreparedStatement s = null;
558            ResultSet rs = null;
559            try {
560                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
561                s.setString(1, destination.getQualifiedName());
562                rs = s.executeQuery();
563                ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
564                while (rs.next()) {
565                    SubscriptionInfo subscription = new SubscriptionInfo();
566                    subscription.setDestination(destination);
567                    subscription.setSelector(rs.getString(1));
568                    subscription.setSubscriptionName(rs.getString(2));
569                    subscription.setClientId(rs.getString(3));
570                    subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
571                            ActiveMQDestination.QUEUE_TYPE));
572                    rc.add(subscription);
573                }
574                return rc.toArray(new SubscriptionInfo[rc.size()]);
575            } finally {
576                close(rs);
577                close(s);
578            }
579        }
580    
581        public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
582                IOException {
583            PreparedStatement s = null;
584            try {
585                s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
586                s.setString(1, destinationName.getQualifiedName());
587                s.executeUpdate();
588                s.close();
589                s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
590                s.setString(1, destinationName.getQualifiedName());
591                s.executeUpdate();
592            } finally {
593                close(s);
594            }
595        }
596    
597        public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
598                String subscriptionName) throws SQLException, IOException {
599            PreparedStatement s = null;
600            try {
601                s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
602                s.setString(1, destination.getQualifiedName());
603                s.setString(2, clientId);
604                s.setString(3, subscriptionName);
605                s.executeUpdate();
606            } finally {
607                close(s);
608            }
609        }
610    
611        public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
612            PreparedStatement s = null;
613            try {
614                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
615                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
616                s.setLong(1, System.currentTimeMillis());
617                int i = s.executeUpdate();
618                LOG.debug("Deleted " + i + " old message(s).");
619            } finally {
620                close(s);
621            }
622        }
623    
624        public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
625                String clientId, String subscriberName) throws SQLException, IOException {
626            PreparedStatement s = null;
627            ResultSet rs = null;
628            long result = -1;
629            try {
630                s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
631                s.setString(1, destination.getQualifiedName());
632                s.setString(2, clientId);
633                s.setString(3, subscriberName);
634                rs = s.executeQuery();
635                if (rs.next()) {
636                    result = rs.getLong(1);
637                }
638                rs.close();
639                s.close();
640            } finally {
641                close(rs);
642                close(s);
643            }
644            return result;
645        }
646    
647        private static void close(PreparedStatement s) {
648            try {
649                s.close();
650            } catch (Throwable e) {
651            }
652        }
653    
654        private static void close(ResultSet rs) {
655            try {
656                rs.close();
657            } catch (Throwable e) {
658            }
659        }
660    
661        public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
662            HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
663            PreparedStatement s = null;
664            ResultSet rs = null;
665            try {
666                s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
667                rs = s.executeQuery();
668                while (rs.next()) {
669                    rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
670                }
671            } finally {
672                close(rs);
673                close(s);
674            }
675            return rc;
676        }
677    
678        /**
679         * @return true if batchStements
680         */
681        public boolean isBatchStatments() {
682            return this.batchStatments;
683        }
684    
685        /**
686         * @param batchStatments
687         */
688        public void setBatchStatments(boolean batchStatments) {
689            this.batchStatments = batchStatments;
690        }
691    
692        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
693            this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
694        }
695    
696        /**
697         * @return the statements
698         */
699        public Statements getStatements() {
700            return this.statements;
701        }
702    
703        public void setStatements(Statements statements) {
704            this.statements = statements;
705        }
706    
707        /**
708         * @param c
709         * @param destination
710         * @param clientId
711         * @param subscriberName
712         * @return
713         * @throws SQLException
714         * @throws IOException
715         */
716        public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
717                String clientId, String subscriberName) throws SQLException, IOException {
718            PreparedStatement s = null;
719            ResultSet rs = null;
720            try {
721                s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
722                s.setString(1, destination.getQualifiedName());
723                s.setString(2, clientId);
724                s.setString(3, subscriberName);
725                rs = s.executeQuery();
726                if (!rs.next()) {
727                    return null;
728                }
729                return getBinaryData(rs, 1);
730            } finally {
731                close(rs);
732                close(s);
733            }
734        }
735    
736        public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
737                IOException {
738            PreparedStatement s = null;
739            ResultSet rs = null;
740            int result = 0;
741            try {
742                s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
743                s.setString(1, destination.getQualifiedName());
744                rs = s.executeQuery();
745                if (rs.next()) {
746                    result = rs.getInt(1);
747                }
748            } finally {
749                close(rs);
750                close(s);
751            }
752            return result;
753        }
754    
755        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
756                int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
757            PreparedStatement s = null;
758            ResultSet rs = null;
759            try {
760                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
761                s.setMaxRows(maxReturned * 2);
762                s.setString(1, destination.getQualifiedName());
763                s.setLong(2, nextSeq);
764                rs = s.executeQuery();
765                int count = 0;
766                if (this.statements.isUseExternalMessageReferences()) {
767                    while (rs.next() && count < maxReturned) {
768                        if (listener.recoverMessageReference(rs.getString(1))) {
769                            count++;
770                        } else {
771                            LOG.debug("Stopped recover next messages");
772                            break;
773                        }
774                    }
775                } else {
776                    while (rs.next() && count < maxReturned) {
777                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
778                            count++;
779                        } else {
780                            LOG.debug("Stopped recover next messages");
781                            break;
782                        }
783                    }
784                }
785            } catch (Exception e) {
786                e.printStackTrace();
787            } finally {
788                close(rs);
789                close(s);
790            }
791        }
792        /*
793         * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
794         * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
795         * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
796         * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
797         * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
798         * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
799         * printQuery(s,System.out); }
800         * 
801         * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
802         * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
803         * 
804         * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
805         * printQuery(c.prepareStatement(query), out); }
806         * 
807         * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
808         * 
809         * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
810         * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
811         * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
812         * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
813         * try { s.close(); } catch (Throwable ignore) {} } }
814         */
815    
816    }