001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedAction; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 049 050/** 051 * This class creates a single process HBase cluster. 052 * each server. The master uses the 'default' FileSystem. The RegionServers, 053 * if we are running on DistributedFilesystem, create a FileSystem instance 054 * each and will close down their instance on the way out. 055 */ 056@InterfaceAudience.Public 057public class MiniHBaseCluster extends HBaseCluster { 058 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 * @throws IOException 067 */ 068 public MiniHBaseCluster(Configuration conf, int numRegionServers) 069 throws IOException, InterruptedException { 070 this(conf, 1, numRegionServers); 071 } 072 073 /** 074 * Start a MiniHBaseCluster. 075 * @param conf Configuration to be used for cluster 076 * @param numMasters initial number of masters to start. 077 * @param numRegionServers initial number of region servers to start. 078 * @throws IOException 079 */ 080 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 081 throws IOException, InterruptedException { 082 this(conf, numMasters, numRegionServers, null, null); 083 } 084 085 /** 086 * Start a MiniHBaseCluster. 087 * @param conf Configuration to be used for cluster 088 * @param numMasters initial number of masters to start. 089 * @param numRegionServers initial number of region servers to start. 090 */ 091 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 092 Class<? extends HMaster> masterClass, 093 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 094 throws IOException, InterruptedException { 095 this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass); 096 } 097 098 /** 099 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 100 * restart where for sure the regionservers come up on same address+port (but 101 * just with different startcode); by default mini hbase clusters choose new 102 * arbitrary ports on each cluster start. 103 * @throws IOException 104 * @throws InterruptedException 105 */ 106 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 107 List<Integer> rsPorts, 108 Class<? extends HMaster> masterClass, 109 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 110 throws IOException, InterruptedException { 111 super(conf); 112 113 // Hadoop 2 114 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 115 116 init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass); 117 this.initialClusterStatus = getClusterStatus(); 118 } 119 120 public Configuration getConfiguration() { 121 return this.conf; 122 } 123 124 /** 125 * Subclass so can get at protected methods (none at moment). Also, creates 126 * a FileSystem instance per instantiation. Adds a shutdown own FileSystem 127 * on the way out. Shuts down own Filesystem only, not All filesystems as 128 * the FileSystem system exit hook does. 129 */ 130 public static class MiniHBaseClusterRegionServer extends HRegionServer { 131 private Thread shutdownThread = null; 132 private User user = null; 133 /** 134 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 135 * so any restarted instances of the same server will have different ServerName and will not 136 * coincide with past dead ones. So there's no need to cleanup this list. 137 */ 138 static Set<ServerName> killedServers = new HashSet<>(); 139 140 public MiniHBaseClusterRegionServer(Configuration conf) 141 throws IOException, InterruptedException { 142 super(conf); 143 this.user = User.getCurrent(); 144 } 145 146 /* 147 * @param c 148 * @param currentfs We return this if we did not make a new one. 149 * @param uniqueName Same name used to help identify the created fs. 150 * @return A new fs instance if we are up on DistributeFileSystem. 151 * @throws IOException 152 */ 153 154 @Override 155 protected void handleReportForDutyResponse( 156 final RegionServerStartupResponse c) throws IOException { 157 super.handleReportForDutyResponse(c); 158 // Run this thread to shutdown our filesystem on way out. 159 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 160 } 161 162 @Override 163 public void run() { 164 try { 165 this.user.runAs(new PrivilegedAction<Object>() { 166 @Override 167 public Object run() { 168 runRegionServer(); 169 return null; 170 } 171 }); 172 } catch (Throwable t) { 173 LOG.error("Exception in run", t); 174 } finally { 175 // Run this on the way out. 176 if (this.shutdownThread != null) { 177 this.shutdownThread.start(); 178 Threads.shutdown(this.shutdownThread, 30000); 179 } 180 } 181 } 182 183 private void runRegionServer() { 184 super.run(); 185 } 186 187 @Override 188 protected void kill() { 189 killedServers.add(getServerName()); 190 super.kill(); 191 } 192 193 @Override 194 public void abort(final String reason, final Throwable cause) { 195 this.user.runAs(new PrivilegedAction<Object>() { 196 @Override 197 public Object run() { 198 abortRegionServer(reason, cause); 199 return null; 200 } 201 }); 202 } 203 204 private void abortRegionServer(String reason, Throwable cause) { 205 super.abort(reason, cause); 206 } 207 } 208 209 /** 210 * Alternate shutdown hook. 211 * Just shuts down the passed fs, not all as default filesystem hook does. 212 */ 213 static class SingleFileSystemShutdownThread extends Thread { 214 private final FileSystem fs; 215 SingleFileSystemShutdownThread(final FileSystem fs) { 216 super("Shutdown of " + fs); 217 this.fs = fs; 218 } 219 @Override 220 public void run() { 221 try { 222 LOG.info("Hook closing fs=" + this.fs); 223 this.fs.close(); 224 } catch (NullPointerException npe) { 225 LOG.debug("Need to fix these: " + npe.toString()); 226 } catch (IOException e) { 227 LOG.warn("Running hook", e); 228 } 229 } 230 } 231 232 private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts, 233 Class<? extends HMaster> masterClass, 234 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 235 throws IOException, InterruptedException { 236 try { 237 if (masterClass == null){ 238 masterClass = HMaster.class; 239 } 240 if (regionserverClass == null){ 241 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 242 } 243 244 // start up a LocalHBaseCluster 245 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, 246 masterClass, regionserverClass); 247 248 // manually add the regionservers as other users 249 for (int i = 0; i < nRegionNodes; i++) { 250 Configuration rsConf = HBaseConfiguration.create(conf); 251 if (rsPorts != null) { 252 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 253 } 254 User user = HBaseTestingUtility.getDifferentUser(rsConf, 255 ".hfs."+index++); 256 hbaseCluster.addRegionServer(rsConf, i, user); 257 } 258 259 hbaseCluster.startup(); 260 } catch (IOException e) { 261 shutdown(); 262 throw e; 263 } catch (Throwable t) { 264 LOG.error("Error starting cluster", t); 265 shutdown(); 266 throw new IOException("Shutting down", t); 267 } 268 } 269 270 @Override 271 public void startRegionServer(String hostname, int port) throws IOException { 272 this.startRegionServer(); 273 } 274 275 @Override 276 public void killRegionServer(ServerName serverName) throws IOException { 277 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 278 if (server instanceof MiniHBaseClusterRegionServer) { 279 LOG.info("Killing " + server.toString()); 280 ((MiniHBaseClusterRegionServer) server).kill(); 281 } else { 282 abortRegionServer(getRegionServerIndex(serverName)); 283 } 284 } 285 286 @Override 287 public boolean isKilledRS(ServerName serverName) { 288 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 289 } 290 291 @Override 292 public void stopRegionServer(ServerName serverName) throws IOException { 293 stopRegionServer(getRegionServerIndex(serverName)); 294 } 295 296 @Override 297 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 298 //ignore timeout for now 299 waitOnRegionServer(getRegionServerIndex(serverName)); 300 } 301 302 @Override 303 public void startZkNode(String hostname, int port) throws IOException { 304 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 305 } 306 307 @Override 308 public void killZkNode(ServerName serverName) throws IOException { 309 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 310 } 311 312 @Override 313 public void stopZkNode(ServerName serverName) throws IOException { 314 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 315 } 316 317 @Override 318 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 319 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 320 } 321 322 @Override 323 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 324 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 325 } 326 327 @Override 328 public void startDataNode(ServerName serverName) throws IOException { 329 LOG.warn("Starting datanodes on mini cluster is not supported"); 330 } 331 332 @Override 333 public void killDataNode(ServerName serverName) throws IOException { 334 LOG.warn("Aborting datanodes on mini cluster is not supported"); 335 } 336 337 @Override 338 public void stopDataNode(ServerName serverName) throws IOException { 339 LOG.warn("Stopping datanodes on mini cluster is not supported"); 340 } 341 342 @Override 343 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 344 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 345 } 346 347 @Override 348 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 349 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 350 } 351 352 @Override 353 public void startNameNode(ServerName serverName) throws IOException { 354 LOG.warn("Starting namenodes on mini cluster is not supported"); 355 } 356 357 @Override 358 public void killNameNode(ServerName serverName) throws IOException { 359 LOG.warn("Aborting namenodes on mini cluster is not supported"); 360 } 361 362 @Override 363 public void stopNameNode(ServerName serverName) throws IOException { 364 LOG.warn("Stopping namenodes on mini cluster is not supported"); 365 } 366 367 @Override 368 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 369 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 370 } 371 372 @Override 373 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 374 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 375 } 376 377 @Override 378 public void startMaster(String hostname, int port) throws IOException { 379 this.startMaster(); 380 } 381 382 @Override 383 public void killMaster(ServerName serverName) throws IOException { 384 abortMaster(getMasterIndex(serverName)); 385 } 386 387 @Override 388 public void stopMaster(ServerName serverName) throws IOException { 389 stopMaster(getMasterIndex(serverName)); 390 } 391 392 @Override 393 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 394 //ignore timeout for now 395 waitOnMaster(getMasterIndex(serverName)); 396 } 397 398 /** 399 * Starts a region server thread running 400 * 401 * @throws IOException 402 * @return New RegionServerThread 403 */ 404 public JVMClusterUtil.RegionServerThread startRegionServer() 405 throws IOException { 406 final Configuration newConf = HBaseConfiguration.create(conf); 407 User rsUser = 408 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++); 409 JVMClusterUtil.RegionServerThread t = null; 410 try { 411 t = hbaseCluster.addRegionServer( 412 newConf, hbaseCluster.getRegionServers().size(), rsUser); 413 t.start(); 414 t.waitForServerOnline(); 415 } catch (InterruptedException ie) { 416 throw new IOException("Interrupted adding regionserver to cluster", ie); 417 } 418 return t; 419 } 420 421 /** 422 * Starts a region server thread and waits until its processed by master. Throws an exception 423 * when it can't start a region server or when the region server is not processed by master 424 * within the timeout. 425 * 426 * @return New RegionServerThread 427 */ 428 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 429 throws IOException { 430 431 JVMClusterUtil.RegionServerThread t = startRegionServer(); 432 ServerName rsServerName = t.getRegionServer().getServerName(); 433 434 long start = System.currentTimeMillis(); 435 ClusterStatus clusterStatus = getClusterStatus(); 436 while ((System.currentTimeMillis() - start) < timeout) { 437 if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { 438 return t; 439 } 440 Threads.sleep(100); 441 } 442 if (t.getRegionServer().isOnline()) { 443 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 444 } else { 445 throw new IOException("RS: " + rsServerName + " is offline"); 446 } 447 } 448 449 /** 450 * Cause a region server to exit doing basic clean up only on its way out. 451 * @param serverNumber Used as index into a list. 452 */ 453 public String abortRegionServer(int serverNumber) { 454 HRegionServer server = getRegionServer(serverNumber); 455 LOG.info("Aborting " + server.toString()); 456 server.abort("Aborting for tests", new Exception("Trace info")); 457 return server.toString(); 458 } 459 460 /** 461 * Shut down the specified region server cleanly 462 * 463 * @param serverNumber Used as index into a list. 464 * @return the region server that was stopped 465 */ 466 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 467 return stopRegionServer(serverNumber, true); 468 } 469 470 /** 471 * Shut down the specified region server cleanly 472 * 473 * @param serverNumber Used as index into a list. 474 * @param shutdownFS True is we are to shutdown the filesystem as part of this 475 * regionserver's shutdown. Usually we do but you do not want to do this if 476 * you are running multiple regionservers in a test and you shut down one 477 * before end of the test. 478 * @return the region server that was stopped 479 */ 480 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 481 final boolean shutdownFS) { 482 JVMClusterUtil.RegionServerThread server = 483 hbaseCluster.getRegionServers().get(serverNumber); 484 LOG.info("Stopping " + server.toString()); 485 server.getRegionServer().stop("Stopping rs " + serverNumber); 486 return server; 487 } 488 489 /** 490 * Wait for the specified region server to stop. Removes this thread from list 491 * of running threads. 492 * @param serverNumber 493 * @return Name of region server that just went down. 494 */ 495 public String waitOnRegionServer(final int serverNumber) { 496 return this.hbaseCluster.waitOnRegionServer(serverNumber); 497 } 498 499 500 /** 501 * Starts a master thread running 502 * 503 * @return New RegionServerThread 504 */ 505 public JVMClusterUtil.MasterThread startMaster() throws IOException { 506 Configuration c = HBaseConfiguration.create(conf); 507 User user = 508 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++); 509 510 JVMClusterUtil.MasterThread t = null; 511 try { 512 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 513 t.start(); 514 } catch (InterruptedException ie) { 515 throw new IOException("Interrupted adding master to cluster", ie); 516 } 517 return t; 518 } 519 520 /** 521 * Returns the current active master, if available. 522 * @return the active HMaster, null if none is active. 523 */ 524 @Override 525 public MasterService.BlockingInterface getMasterAdminService() { 526 return this.hbaseCluster.getActiveMaster().getMasterRpcServices(); 527 } 528 529 /** 530 * Returns the current active master, if available. 531 * @return the active HMaster, null if none is active. 532 */ 533 public HMaster getMaster() { 534 return this.hbaseCluster.getActiveMaster(); 535 } 536 537 /** 538 * Returns the current active master thread, if available. 539 * @return the active MasterThread, null if none is active. 540 */ 541 public MasterThread getMasterThread() { 542 for (MasterThread mt: hbaseCluster.getLiveMasters()) { 543 if (mt.getMaster().isActiveMaster()) { 544 return mt; 545 } 546 } 547 return null; 548 } 549 550 /** 551 * Returns the master at the specified index, if available. 552 * @return the active HMaster, null if none is active. 553 */ 554 public HMaster getMaster(final int serverNumber) { 555 return this.hbaseCluster.getMaster(serverNumber); 556 } 557 558 /** 559 * Cause a master to exit without shutting down entire cluster. 560 * @param serverNumber Used as index into a list. 561 */ 562 public String abortMaster(int serverNumber) { 563 HMaster server = getMaster(serverNumber); 564 LOG.info("Aborting " + server.toString()); 565 server.abort("Aborting for tests", new Exception("Trace info")); 566 return server.toString(); 567 } 568 569 /** 570 * Shut down the specified master cleanly 571 * 572 * @param serverNumber Used as index into a list. 573 * @return the region server that was stopped 574 */ 575 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 576 return stopMaster(serverNumber, true); 577 } 578 579 /** 580 * Shut down the specified master cleanly 581 * 582 * @param serverNumber Used as index into a list. 583 * @param shutdownFS True is we are to shutdown the filesystem as part of this 584 * master's shutdown. Usually we do but you do not want to do this if 585 * you are running multiple master in a test and you shut down one 586 * before end of the test. 587 * @return the master that was stopped 588 */ 589 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, 590 final boolean shutdownFS) { 591 JVMClusterUtil.MasterThread server = 592 hbaseCluster.getMasters().get(serverNumber); 593 LOG.info("Stopping " + server.toString()); 594 server.getMaster().stop("Stopping master " + serverNumber); 595 return server; 596 } 597 598 /** 599 * Wait for the specified master to stop. Removes this thread from list 600 * of running threads. 601 * @param serverNumber 602 * @return Name of master that just went down. 603 */ 604 public String waitOnMaster(final int serverNumber) { 605 return this.hbaseCluster.waitOnMaster(serverNumber); 606 } 607 608 /** 609 * Blocks until there is an active master and that master has completed 610 * initialization. 611 * 612 * @return true if an active master becomes available. false if there are no 613 * masters left. 614 * @throws InterruptedException 615 */ 616 @Override 617 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 618 List<JVMClusterUtil.MasterThread> mts; 619 long start = System.currentTimeMillis(); 620 while (!(mts = getMasterThreads()).isEmpty() 621 && (System.currentTimeMillis() - start) < timeout) { 622 for (JVMClusterUtil.MasterThread mt : mts) { 623 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 624 return true; 625 } 626 } 627 628 Threads.sleep(100); 629 } 630 return false; 631 } 632 633 /** 634 * @return List of master threads. 635 */ 636 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 637 return this.hbaseCluster.getMasters(); 638 } 639 640 /** 641 * @return List of live master threads (skips the aborted and the killed) 642 */ 643 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 644 return this.hbaseCluster.getLiveMasters(); 645 } 646 647 /** 648 * Wait for Mini HBase Cluster to shut down. 649 */ 650 public void join() { 651 this.hbaseCluster.join(); 652 } 653 654 /** 655 * Shut down the mini HBase cluster 656 */ 657 @Override 658 public void shutdown() throws IOException { 659 if (this.hbaseCluster != null) { 660 this.hbaseCluster.shutdown(); 661 } 662 } 663 664 @Override 665 public void close() throws IOException { 666 } 667 668 /** 669 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 670 * Use {@link #getClusterMetrics()} instead. 671 */ 672 @Deprecated 673 public ClusterStatus getClusterStatus() throws IOException { 674 HMaster master = getMaster(); 675 return master == null ? null : new ClusterStatus(master.getClusterMetrics()); 676 } 677 678 @Override 679 public ClusterMetrics getClusterMetrics() throws IOException { 680 HMaster master = getMaster(); 681 return master == null ? null : master.getClusterMetrics(); 682 } 683 684 private void executeFlush(HRegion region) throws IOException { 685 // retry 5 times if we can not flush 686 for (int i = 0; i < 5; i++) { 687 FlushResult result = region.flush(true); 688 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 689 return; 690 } 691 Threads.sleep(1000); 692 } 693 } 694 695 /** 696 * Call flushCache on all regions on all participating regionservers. 697 */ 698 public void flushcache() throws IOException { 699 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 700 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 701 executeFlush(r); 702 } 703 } 704 } 705 706 /** 707 * Call flushCache on all regions of the specified table. 708 */ 709 public void flushcache(TableName tableName) throws IOException { 710 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 711 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 712 if (r.getTableDescriptor().getTableName().equals(tableName)) { 713 executeFlush(r); 714 } 715 } 716 } 717 } 718 719 /** 720 * Call flushCache on all regions on all participating regionservers. 721 * @throws IOException 722 */ 723 public void compact(boolean major) throws IOException { 724 for (JVMClusterUtil.RegionServerThread t: 725 this.hbaseCluster.getRegionServers()) { 726 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 727 r.compact(major); 728 } 729 } 730 } 731 732 /** 733 * Call flushCache on all regions of the specified table. 734 * @throws IOException 735 */ 736 public void compact(TableName tableName, boolean major) throws IOException { 737 for (JVMClusterUtil.RegionServerThread t: 738 this.hbaseCluster.getRegionServers()) { 739 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 740 if(r.getTableDescriptor().getTableName().equals(tableName)) { 741 r.compact(major); 742 } 743 } 744 } 745 } 746 747 /** 748 * @return List of region server threads. Does not return the master even though it is also 749 * a region server. 750 */ 751 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 752 return this.hbaseCluster.getRegionServers(); 753 } 754 755 /** 756 * @return List of live region server threads (skips the aborted and the killed) 757 */ 758 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 759 return this.hbaseCluster.getLiveRegionServers(); 760 } 761 762 /** 763 * Grab a numbered region server of your choice. 764 * @param serverNumber 765 * @return region server 766 */ 767 public HRegionServer getRegionServer(int serverNumber) { 768 return hbaseCluster.getRegionServer(serverNumber); 769 } 770 771 public HRegionServer getRegionServer(ServerName serverName) { 772 return hbaseCluster.getRegionServers().stream() 773 .map(t -> t.getRegionServer()) 774 .filter(r -> r.getServerName().equals(serverName)) 775 .findFirst().orElse(null); 776 } 777 778 public List<HRegion> getRegions(byte[] tableName) { 779 return getRegions(TableName.valueOf(tableName)); 780 } 781 782 public List<HRegion> getRegions(TableName tableName) { 783 List<HRegion> ret = new ArrayList<>(); 784 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 785 HRegionServer hrs = rst.getRegionServer(); 786 for (Region region : hrs.getOnlineRegionsLocalContext()) { 787 if (region.getTableDescriptor().getTableName().equals(tableName)) { 788 ret.add((HRegion)region); 789 } 790 } 791 } 792 return ret; 793 } 794 795 /** 796 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 797 * of HRS carrying regionName. Returns -1 if none found. 798 */ 799 public int getServerWithMeta() { 800 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); 801 } 802 803 /** 804 * Get the location of the specified region 805 * @param regionName Name of the region in bytes 806 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 807 * of HRS carrying hbase:meta. Returns -1 if none found. 808 */ 809 public int getServerWith(byte[] regionName) { 810 int index = -1; 811 int count = 0; 812 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { 813 HRegionServer hrs = rst.getRegionServer(); 814 if (!hrs.isStopped()) { 815 Region region = hrs.getOnlineRegion(regionName); 816 if (region != null) { 817 index = count; 818 break; 819 } 820 } 821 count++; 822 } 823 return index; 824 } 825 826 @Override 827 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 828 throws IOException { 829 // Assume there is only one master thread which is the active master. 830 // If there are multiple master threads, the backup master threads 831 // should hold some regions. Please refer to #countServedRegions 832 // to see how we find out all regions. 833 HMaster master = getMaster(); 834 Region region = master.getOnlineRegion(regionName); 835 if (region != null) { 836 return master.getServerName(); 837 } 838 int index = getServerWith(regionName); 839 if (index < 0) { 840 return null; 841 } 842 return getRegionServer(index).getServerName(); 843 } 844 845 /** 846 * Counts the total numbers of regions being served by the currently online 847 * region servers by asking each how many regions they have. Does not look 848 * at hbase:meta at all. Count includes catalog tables. 849 * @return number of regions being served by all region servers 850 */ 851 public long countServedRegions() { 852 long count = 0; 853 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 854 count += rst.getRegionServer().getNumberOfOnlineRegions(); 855 } 856 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) { 857 count += mt.getMaster().getNumberOfOnlineRegions(); 858 } 859 return count; 860 } 861 862 /** 863 * Do a simulated kill all masters and regionservers. Useful when it is 864 * impossible to bring the mini-cluster back for clean shutdown. 865 */ 866 public void killAll() { 867 // Do backups first. 868 MasterThread activeMaster = null; 869 for (MasterThread masterThread : getMasterThreads()) { 870 if (!masterThread.getMaster().isActiveMaster()) { 871 masterThread.getMaster().abort("killAll"); 872 } else { 873 activeMaster = masterThread; 874 } 875 } 876 // Do active after. 877 if (activeMaster != null) { 878 activeMaster.getMaster().abort("killAll"); 879 } 880 for (RegionServerThread rst : getRegionServerThreads()) { 881 rst.getRegionServer().abort("killAll"); 882 } 883 } 884 885 @Override 886 public void waitUntilShutDown() { 887 this.hbaseCluster.join(); 888 } 889 890 public List<HRegion> findRegionsForTable(TableName tableName) { 891 ArrayList<HRegion> ret = new ArrayList<>(); 892 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 893 HRegionServer hrs = rst.getRegionServer(); 894 for (Region region : hrs.getRegions(tableName)) { 895 if (region.getTableDescriptor().getTableName().equals(tableName)) { 896 ret.add((HRegion)region); 897 } 898 } 899 } 900 return ret; 901 } 902 903 904 protected int getRegionServerIndex(ServerName serverName) { 905 //we have a small number of region servers, this should be fine for now. 906 List<RegionServerThread> servers = getRegionServerThreads(); 907 for (int i=0; i < servers.size(); i++) { 908 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 909 return i; 910 } 911 } 912 return -1; 913 } 914 915 protected int getMasterIndex(ServerName serverName) { 916 List<MasterThread> masters = getMasterThreads(); 917 for (int i = 0; i < masters.size(); i++) { 918 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 919 return i; 920 } 921 } 922 return -1; 923 } 924 925 @Override 926 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { 927 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 928 } 929 930 @Override 931 public ClientService.BlockingInterface getClientProtocol(ServerName serverName) 932 throws IOException { 933 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 934 } 935}