001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc.adapter; 018 019 import java.io.IOException; 020 import java.sql.PreparedStatement; 021 import java.sql.ResultSet; 022 import java.sql.SQLException; 023 import java.sql.Statement; 024 import java.util.ArrayList; 025 import java.util.HashSet; 026 import java.util.LinkedList; 027 import java.util.Set; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.MessageId; 030 import org.apache.activemq.command.SubscriptionInfo; 031 import org.apache.activemq.store.jdbc.JDBCAdapter; 032 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 033 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 034 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 035 import org.apache.activemq.store.jdbc.Statements; 036 import org.apache.activemq.store.jdbc.TransactionContext; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 042 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 043 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 044 * The databases/JDBC drivers that use this adapter are: 045 * <ul> 046 * <li></li> 047 * </ul> 048 * 049 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 050 * 051 * @version $Revision: 1.10 $ 052 */ 053 public class DefaultJDBCAdapter implements JDBCAdapter { 054 private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); 055 protected Statements statements; 056 protected boolean batchStatments = true; 057 058 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 059 s.setBytes(index, data); 060 } 061 062 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 063 return rs.getBytes(index); 064 } 065 066 public void doCreateTables(TransactionContext c) throws SQLException, IOException { 067 Statement s = null; 068 try { 069 // Check to see if the table already exists. If it does, then don't 070 // log warnings during startup. 071 // Need to run the scripts anyways since they may contain ALTER 072 // statements that upgrade a previous version 073 // of the table 074 boolean alreadyExists = false; 075 ResultSet rs = null; 076 try { 077 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), 078 new String[] { "TABLE" }); 079 alreadyExists = rs.next(); 080 } catch (Throwable ignore) { 081 } finally { 082 close(rs); 083 } 084 s = c.getConnection().createStatement(); 085 String[] createStatments = this.statements.getCreateSchemaStatements(); 086 for (int i = 0; i < createStatments.length; i++) { 087 // This will fail usually since the tables will be 088 // created already. 089 try { 090 LOG.debug("Executing SQL: " + createStatments[i]); 091 s.execute(createStatments[i]); 092 } catch (SQLException e) { 093 if (alreadyExists) { 094 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " 095 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 096 + " Vendor code: " + e.getErrorCode()); 097 } else { 098 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " 099 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 100 + " Vendor code: " + e.getErrorCode()); 101 JDBCPersistenceAdapter.log("Failure details: ", e); 102 } 103 } 104 } 105 c.getConnection().commit(); 106 } finally { 107 try { 108 s.close(); 109 } catch (Throwable e) { 110 } 111 } 112 } 113 114 public void doDropTables(TransactionContext c) throws SQLException, IOException { 115 Statement s = null; 116 try { 117 s = c.getConnection().createStatement(); 118 String[] dropStatments = this.statements.getDropSchemaStatements(); 119 for (int i = 0; i < dropStatments.length; i++) { 120 // This will fail usually since the tables will be 121 // created already. 122 try { 123 LOG.debug("Executing SQL: " + dropStatments[i]); 124 s.execute(dropStatments[i]); 125 } catch (SQLException e) { 126 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 127 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 128 + e.getErrorCode()); 129 JDBCPersistenceAdapter.log("Failure details: ", e); 130 } 131 } 132 c.getConnection().commit(); 133 } finally { 134 try { 135 s.close(); 136 } catch (Throwable e) { 137 } 138 } 139 } 140 141 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 142 PreparedStatement s = null; 143 ResultSet rs = null; 144 try { 145 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 146 rs = s.executeQuery(); 147 long seq1 = 0; 148 if (rs.next()) { 149 seq1 = rs.getLong(1); 150 } 151 rs.close(); 152 s.close(); 153 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 154 rs = s.executeQuery(); 155 long seq2 = 0; 156 if (rs.next()) { 157 seq2 = rs.getLong(1); 158 } 159 return Math.max(seq1, seq2); 160 } finally { 161 close(rs); 162 close(s); 163 } 164 } 165 166 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 167 PreparedStatement s = null; 168 ResultSet rs = null; 169 try { 170 s = c.getConnection().prepareStatement( 171 this.statements.getFindMessageByIdStatement()); 172 s.setLong(1, storeSequenceId); 173 rs = s.executeQuery(); 174 if (!rs.next()) { 175 return null; 176 } 177 return getBinaryData(rs, 1); 178 } finally { 179 close(rs); 180 close(s); 181 } 182 } 183 184 185 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 186 long expiration) throws SQLException, IOException { 187 PreparedStatement s = c.getAddMessageStatement(); 188 try { 189 if (s == null) { 190 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 191 if (this.batchStatments) { 192 c.setAddMessageStatement(s); 193 } 194 } 195 s.setLong(1, sequence); 196 s.setString(2, messageID.getProducerId().toString()); 197 s.setLong(3, messageID.getProducerSequenceId()); 198 s.setString(4, destination.getQualifiedName()); 199 s.setLong(5, expiration); 200 setBinaryData(s, 6, data); 201 if (this.batchStatments) { 202 s.addBatch(); 203 } else if (s.executeUpdate() != 1) { 204 throw new SQLException("Failed add a message"); 205 } 206 } finally { 207 if (!this.batchStatments) { 208 if (s != null) { 209 s.close(); 210 } 211 } 212 } 213 } 214 215 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 216 long expirationTime, String messageRef) throws SQLException, IOException { 217 PreparedStatement s = c.getAddMessageStatement(); 218 try { 219 if (s == null) { 220 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 221 if (this.batchStatments) { 222 c.setAddMessageStatement(s); 223 } 224 } 225 s.setLong(1, messageID.getBrokerSequenceId()); 226 s.setString(2, messageID.getProducerId().toString()); 227 s.setLong(3, messageID.getProducerSequenceId()); 228 s.setString(4, destination.getQualifiedName()); 229 s.setLong(5, expirationTime); 230 s.setString(6, messageRef); 231 if (this.batchStatments) { 232 s.addBatch(); 233 } else if (s.executeUpdate() != 1) { 234 throw new SQLException("Failed add a message"); 235 } 236 } finally { 237 if (!this.batchStatments) { 238 s.close(); 239 } 240 } 241 } 242 243 public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { 244 PreparedStatement s = null; 245 ResultSet rs = null; 246 try { 247 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 248 s.setString(1, messageID.getProducerId().toString()); 249 s.setLong(2, messageID.getProducerSequenceId()); 250 rs = s.executeQuery(); 251 if (!rs.next()) { 252 return 0; 253 } 254 return rs.getLong(1); 255 } finally { 256 close(rs); 257 close(s); 258 } 259 } 260 261 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 262 PreparedStatement s = null; 263 ResultSet rs = null; 264 try { 265 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 266 s.setString(1, id.getProducerId().toString()); 267 s.setLong(2, id.getProducerSequenceId()); 268 rs = s.executeQuery(); 269 if (!rs.next()) { 270 return null; 271 } 272 return getBinaryData(rs, 1); 273 } finally { 274 close(rs); 275 close(s); 276 } 277 } 278 279 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 280 PreparedStatement s = null; 281 ResultSet rs = null; 282 try { 283 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 284 s.setLong(1, seq); 285 rs = s.executeQuery(); 286 if (!rs.next()) { 287 return null; 288 } 289 return rs.getString(1); 290 } finally { 291 close(rs); 292 close(s); 293 } 294 } 295 296 public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException { 297 PreparedStatement s = c.getRemovedMessageStatement(); 298 try { 299 if (s == null) { 300 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatment()); 301 if (this.batchStatments) { 302 c.setRemovedMessageStatement(s); 303 } 304 } 305 s.setLong(1, seq); 306 if (this.batchStatments) { 307 s.addBatch(); 308 } else if (s.executeUpdate() != 1) { 309 throw new SQLException("Failed to remove message"); 310 } 311 } finally { 312 if (!this.batchStatments && s != null) { 313 s.close(); 314 } 315 } 316 } 317 318 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 319 throws Exception { 320 PreparedStatement s = null; 321 ResultSet rs = null; 322 try { 323 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 324 s.setString(1, destination.getQualifiedName()); 325 rs = s.executeQuery(); 326 if (this.statements.isUseExternalMessageReferences()) { 327 while (rs.next()) { 328 if (!listener.recoverMessageReference(rs.getString(2))) { 329 break; 330 } 331 } 332 } else { 333 while (rs.next()) { 334 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 335 break; 336 } 337 } 338 } 339 } finally { 340 close(rs); 341 close(s); 342 } 343 } 344 345 public void doMessageIdScan(TransactionContext c, int limit, 346 JDBCMessageIdScanListener listener) throws SQLException, IOException { 347 PreparedStatement s = null; 348 ResultSet rs = null; 349 try { 350 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 351 s.setMaxRows(limit); 352 rs = s.executeQuery(); 353 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 354 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 355 while (rs.next()) { 356 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 357 } 358 if (LOG.isDebugEnabled()) { 359 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 360 } 361 for (MessageId id : reverseOrderIds) { 362 listener.messageId(id); 363 } 364 } finally { 365 close(rs); 366 close(s); 367 } 368 } 369 370 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, 371 String subscriptionName, long seq) throws SQLException, IOException { 372 PreparedStatement s = c.getUpdateLastAckStatement(); 373 try { 374 if (s == null) { 375 s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement()); 376 if (this.batchStatments) { 377 c.setUpdateLastAckStatement(s); 378 } 379 } 380 s.setLong(1, seq); 381 s.setString(2, destination.getQualifiedName()); 382 s.setString(3, clientId); 383 s.setString(4, subscriptionName); 384 if (this.batchStatments) { 385 s.addBatch(); 386 } else if (s.executeUpdate() != 1) { 387 throw new SQLException("Failed add a message"); 388 } 389 } finally { 390 if (!this.batchStatments) { 391 s.close(); 392 } 393 } 394 } 395 396 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 397 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 398 // dumpTables(c, 399 // destination.getQualifiedName(),clientId,subscriptionName); 400 PreparedStatement s = null; 401 ResultSet rs = null; 402 try { 403 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 404 s.setString(1, destination.getQualifiedName()); 405 s.setString(2, clientId); 406 s.setString(3, subscriptionName); 407 rs = s.executeQuery(); 408 if (this.statements.isUseExternalMessageReferences()) { 409 while (rs.next()) { 410 if (!listener.recoverMessageReference(rs.getString(2))) { 411 break; 412 } 413 } 414 } else { 415 while (rs.next()) { 416 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 417 break; 418 } 419 } 420 } 421 } finally { 422 close(rs); 423 close(s); 424 } 425 } 426 427 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 428 String subscriptionName, long seq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 429 PreparedStatement s = null; 430 ResultSet rs = null; 431 try { 432 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 433 s.setMaxRows(maxReturned); 434 s.setString(1, destination.getQualifiedName()); 435 s.setString(2, clientId); 436 s.setString(3, subscriptionName); 437 s.setLong(4, seq); 438 rs = s.executeQuery(); 439 int count = 0; 440 if (this.statements.isUseExternalMessageReferences()) { 441 while (rs.next() && count < maxReturned) { 442 if (listener.recoverMessageReference(rs.getString(1))) { 443 count++; 444 } else { 445 break; 446 } 447 } 448 } else { 449 while (rs.next() && count < maxReturned) { 450 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 451 count++; 452 } else { 453 break; 454 } 455 } 456 } 457 } finally { 458 close(rs); 459 close(s); 460 } 461 } 462 463 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 464 String clientId, String subscriptionName) throws SQLException, IOException { 465 PreparedStatement s = null; 466 ResultSet rs = null; 467 int result = 0; 468 try { 469 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 470 s.setString(1, destination.getQualifiedName()); 471 s.setString(2, clientId); 472 s.setString(3, subscriptionName); 473 rs = s.executeQuery(); 474 if (rs.next()) { 475 result = rs.getInt(1); 476 } 477 } finally { 478 close(rs); 479 close(s); 480 } 481 return result; 482 } 483 484 /** 485 * @param c 486 * @param info 487 * @param retroactive 488 * @throws SQLException 489 * @throws IOException 490 * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, 491 * org.apache.activemq.service.SubscriptionInfo) 492 */ 493 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive) 494 throws SQLException, IOException { 495 // dumpTables(c, destination.getQualifiedName(), clientId, 496 // subscriptionName); 497 PreparedStatement s = null; 498 try { 499 long lastMessageId = -1; 500 if (!retroactive) { 501 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 502 ResultSet rs = null; 503 try { 504 rs = s.executeQuery(); 505 if (rs.next()) { 506 lastMessageId = rs.getLong(1); 507 } 508 } finally { 509 close(rs); 510 close(s); 511 } 512 } 513 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 514 s.setString(1, info.getDestination().getQualifiedName()); 515 s.setString(2, info.getClientId()); 516 s.setString(3, info.getSubscriptionName()); 517 s.setString(4, info.getSelector()); 518 s.setLong(5, lastMessageId); 519 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 520 if (s.executeUpdate() != 1) { 521 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 522 } 523 } finally { 524 close(s); 525 } 526 } 527 528 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 529 String clientId, String subscriptionName) throws SQLException, IOException { 530 PreparedStatement s = null; 531 ResultSet rs = null; 532 try { 533 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 534 s.setString(1, destination.getQualifiedName()); 535 s.setString(2, clientId); 536 s.setString(3, subscriptionName); 537 rs = s.executeQuery(); 538 if (!rs.next()) { 539 return null; 540 } 541 SubscriptionInfo subscription = new SubscriptionInfo(); 542 subscription.setDestination(destination); 543 subscription.setClientId(clientId); 544 subscription.setSubscriptionName(subscriptionName); 545 subscription.setSelector(rs.getString(1)); 546 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 547 ActiveMQDestination.QUEUE_TYPE)); 548 return subscription; 549 } finally { 550 close(rs); 551 close(s); 552 } 553 } 554 555 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 556 throws SQLException, IOException { 557 PreparedStatement s = null; 558 ResultSet rs = null; 559 try { 560 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 561 s.setString(1, destination.getQualifiedName()); 562 rs = s.executeQuery(); 563 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 564 while (rs.next()) { 565 SubscriptionInfo subscription = new SubscriptionInfo(); 566 subscription.setDestination(destination); 567 subscription.setSelector(rs.getString(1)); 568 subscription.setSubscriptionName(rs.getString(2)); 569 subscription.setClientId(rs.getString(3)); 570 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 571 ActiveMQDestination.QUEUE_TYPE)); 572 rc.add(subscription); 573 } 574 return rc.toArray(new SubscriptionInfo[rc.size()]); 575 } finally { 576 close(rs); 577 close(s); 578 } 579 } 580 581 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 582 IOException { 583 PreparedStatement s = null; 584 try { 585 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 586 s.setString(1, destinationName.getQualifiedName()); 587 s.executeUpdate(); 588 s.close(); 589 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 590 s.setString(1, destinationName.getQualifiedName()); 591 s.executeUpdate(); 592 } finally { 593 close(s); 594 } 595 } 596 597 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 598 String subscriptionName) throws SQLException, IOException { 599 PreparedStatement s = null; 600 try { 601 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 602 s.setString(1, destination.getQualifiedName()); 603 s.setString(2, clientId); 604 s.setString(3, subscriptionName); 605 s.executeUpdate(); 606 } finally { 607 close(s); 608 } 609 } 610 611 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { 612 PreparedStatement s = null; 613 try { 614 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement()); 615 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement()); 616 s.setLong(1, System.currentTimeMillis()); 617 int i = s.executeUpdate(); 618 LOG.debug("Deleted " + i + " old message(s)."); 619 } finally { 620 close(s); 621 } 622 } 623 624 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 625 String clientId, String subscriberName) throws SQLException, IOException { 626 PreparedStatement s = null; 627 ResultSet rs = null; 628 long result = -1; 629 try { 630 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 631 s.setString(1, destination.getQualifiedName()); 632 s.setString(2, clientId); 633 s.setString(3, subscriberName); 634 rs = s.executeQuery(); 635 if (rs.next()) { 636 result = rs.getLong(1); 637 } 638 rs.close(); 639 s.close(); 640 } finally { 641 close(rs); 642 close(s); 643 } 644 return result; 645 } 646 647 private static void close(PreparedStatement s) { 648 try { 649 s.close(); 650 } catch (Throwable e) { 651 } 652 } 653 654 private static void close(ResultSet rs) { 655 try { 656 rs.close(); 657 } catch (Throwable e) { 658 } 659 } 660 661 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 662 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 663 PreparedStatement s = null; 664 ResultSet rs = null; 665 try { 666 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 667 rs = s.executeQuery(); 668 while (rs.next()) { 669 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 670 } 671 } finally { 672 close(rs); 673 close(s); 674 } 675 return rc; 676 } 677 678 /** 679 * @return true if batchStements 680 */ 681 public boolean isBatchStatments() { 682 return this.batchStatments; 683 } 684 685 /** 686 * @param batchStatments 687 */ 688 public void setBatchStatments(boolean batchStatments) { 689 this.batchStatments = batchStatments; 690 } 691 692 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 693 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 694 } 695 696 /** 697 * @return the statements 698 */ 699 public Statements getStatements() { 700 return this.statements; 701 } 702 703 public void setStatements(Statements statements) { 704 this.statements = statements; 705 } 706 707 /** 708 * @param c 709 * @param destination 710 * @param clientId 711 * @param subscriberName 712 * @return 713 * @throws SQLException 714 * @throws IOException 715 */ 716 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination, 717 String clientId, String subscriberName) throws SQLException, IOException { 718 PreparedStatement s = null; 719 ResultSet rs = null; 720 try { 721 s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement()); 722 s.setString(1, destination.getQualifiedName()); 723 s.setString(2, clientId); 724 s.setString(3, subscriberName); 725 rs = s.executeQuery(); 726 if (!rs.next()) { 727 return null; 728 } 729 return getBinaryData(rs, 1); 730 } finally { 731 close(rs); 732 close(s); 733 } 734 } 735 736 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 737 IOException { 738 PreparedStatement s = null; 739 ResultSet rs = null; 740 int result = 0; 741 try { 742 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 743 s.setString(1, destination.getQualifiedName()); 744 rs = s.executeQuery(); 745 if (rs.next()) { 746 result = rs.getInt(1); 747 } 748 } finally { 749 close(rs); 750 close(s); 751 } 752 return result; 753 } 754 755 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, 756 int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 757 PreparedStatement s = null; 758 ResultSet rs = null; 759 try { 760 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); 761 s.setMaxRows(maxReturned * 2); 762 s.setString(1, destination.getQualifiedName()); 763 s.setLong(2, nextSeq); 764 rs = s.executeQuery(); 765 int count = 0; 766 if (this.statements.isUseExternalMessageReferences()) { 767 while (rs.next() && count < maxReturned) { 768 if (listener.recoverMessageReference(rs.getString(1))) { 769 count++; 770 } else { 771 LOG.debug("Stopped recover next messages"); 772 break; 773 } 774 } 775 } else { 776 while (rs.next() && count < maxReturned) { 777 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 778 count++; 779 } else { 780 LOG.debug("Stopped recover next messages"); 781 break; 782 } 783 } 784 } 785 } catch (Exception e) { 786 e.printStackTrace(); 787 } finally { 788 close(rs); 789 close(s); 790 } 791 } 792 /* 793 * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String 794 * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, 795 * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID, 796 * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND 797 * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID"); 798 * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 799 * printQuery(s,System.out); } 800 * 801 * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", 802 * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); } 803 * 804 * private void printQuery(Connection c, String query, PrintStream out) throws SQLException { 805 * printQuery(c.prepareStatement(query), out); } 806 * 807 * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { 808 * 809 * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<= 810 * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); } 811 * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|"); 812 * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {} 813 * try { s.close(); } catch (Throwable ignore) {} } } 814 */ 815 816 }