001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.PrivilegedAction;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
049
050/**
051 * This class creates a single process HBase cluster.
052 * each server.  The master uses the 'default' FileSystem.  The RegionServers,
053 * if we are running on DistributedFilesystem, create a FileSystem instance
054 * each and will close down their instance on the way out.
055 */
056@InterfaceAudience.Public
057public class MiniHBaseCluster extends HBaseCluster {
058  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   * @throws IOException
067   */
068  public MiniHBaseCluster(Configuration conf, int numRegionServers)
069  throws IOException, InterruptedException {
070    this(conf, 1, numRegionServers);
071  }
072
073  /**
074   * Start a MiniHBaseCluster.
075   * @param conf Configuration to be used for cluster
076   * @param numMasters initial number of masters to start.
077   * @param numRegionServers initial number of region servers to start.
078   * @throws IOException
079   */
080  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
081      throws IOException, InterruptedException {
082    this(conf, numMasters, numRegionServers, null, null);
083  }
084
085  /**
086   * Start a MiniHBaseCluster.
087   * @param conf Configuration to be used for cluster
088   * @param numMasters initial number of masters to start.
089   * @param numRegionServers initial number of region servers to start.
090   */
091  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
092         Class<? extends HMaster> masterClass,
093         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
094      throws IOException, InterruptedException {
095    this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass);
096  }
097
098  /**
099   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
100   *   restart where for sure the regionservers come up on same address+port (but
101   *   just with different startcode); by default mini hbase clusters choose new
102   *   arbitrary ports on each cluster start.
103   * @throws IOException
104   * @throws InterruptedException
105   */
106  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
107         List<Integer> rsPorts,
108         Class<? extends HMaster> masterClass,
109         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
110      throws IOException, InterruptedException {
111    super(conf);
112
113    // Hadoop 2
114    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
115
116    init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass);
117    this.initialClusterStatus = getClusterStatus();
118  }
119
120  public Configuration getConfiguration() {
121    return this.conf;
122  }
123
124  /**
125   * Subclass so can get at protected methods (none at moment).  Also, creates
126   * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
127   * on the way out. Shuts down own Filesystem only, not All filesystems as
128   * the FileSystem system exit hook does.
129   */
130  public static class MiniHBaseClusterRegionServer extends HRegionServer {
131    private Thread shutdownThread = null;
132    private User user = null;
133    /**
134     * List of RegionServers killed so far. ServerName also comprises startCode of a server,
135     * so any restarted instances of the same server will have different ServerName and will not
136     * coincide with past dead ones. So there's no need to cleanup this list.
137     */
138    static Set<ServerName> killedServers = new HashSet<>();
139
140    public MiniHBaseClusterRegionServer(Configuration conf)
141        throws IOException, InterruptedException {
142      super(conf);
143      this.user = User.getCurrent();
144    }
145
146    /*
147     * @param c
148     * @param currentfs We return this if we did not make a new one.
149     * @param uniqueName Same name used to help identify the created fs.
150     * @return A new fs instance if we are up on DistributeFileSystem.
151     * @throws IOException
152     */
153
154    @Override
155    protected void handleReportForDutyResponse(
156        final RegionServerStartupResponse c) throws IOException {
157      super.handleReportForDutyResponse(c);
158      // Run this thread to shutdown our filesystem on way out.
159      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
160    }
161
162    @Override
163    public void run() {
164      try {
165        this.user.runAs(new PrivilegedAction<Object>() {
166          @Override
167          public Object run() {
168            runRegionServer();
169            return null;
170          }
171        });
172      } catch (Throwable t) {
173        LOG.error("Exception in run", t);
174      } finally {
175        // Run this on the way out.
176        if (this.shutdownThread != null) {
177          this.shutdownThread.start();
178          Threads.shutdown(this.shutdownThread, 30000);
179        }
180      }
181    }
182
183    private void runRegionServer() {
184      super.run();
185    }
186
187    @Override
188    protected void kill() {
189      killedServers.add(getServerName());
190      super.kill();
191    }
192
193    @Override
194    public void abort(final String reason, final Throwable cause) {
195      this.user.runAs(new PrivilegedAction<Object>() {
196        @Override
197        public Object run() {
198          abortRegionServer(reason, cause);
199          return null;
200        }
201      });
202    }
203
204    private void abortRegionServer(String reason, Throwable cause) {
205      super.abort(reason, cause);
206    }
207  }
208
209  /**
210   * Alternate shutdown hook.
211   * Just shuts down the passed fs, not all as default filesystem hook does.
212   */
213  static class SingleFileSystemShutdownThread extends Thread {
214    private final FileSystem fs;
215    SingleFileSystemShutdownThread(final FileSystem fs) {
216      super("Shutdown of " + fs);
217      this.fs = fs;
218    }
219    @Override
220    public void run() {
221      try {
222        LOG.info("Hook closing fs=" + this.fs);
223        this.fs.close();
224      } catch (NullPointerException npe) {
225        LOG.debug("Need to fix these: " + npe.toString());
226      } catch (IOException e) {
227        LOG.warn("Running hook", e);
228      }
229    }
230  }
231
232  private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts,
233                 Class<? extends HMaster> masterClass,
234                 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
235  throws IOException, InterruptedException {
236    try {
237      if (masterClass == null){
238        masterClass =  HMaster.class;
239      }
240      if (regionserverClass == null){
241        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
242      }
243
244      // start up a LocalHBaseCluster
245      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
246          masterClass, regionserverClass);
247
248      // manually add the regionservers as other users
249      for (int i = 0; i < nRegionNodes; i++) {
250        Configuration rsConf = HBaseConfiguration.create(conf);
251        if (rsPorts != null) {
252          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
253        }
254        User user = HBaseTestingUtility.getDifferentUser(rsConf,
255            ".hfs."+index++);
256        hbaseCluster.addRegionServer(rsConf, i, user);
257      }
258
259      hbaseCluster.startup();
260    } catch (IOException e) {
261      shutdown();
262      throw e;
263    } catch (Throwable t) {
264      LOG.error("Error starting cluster", t);
265      shutdown();
266      throw new IOException("Shutting down", t);
267    }
268  }
269
270  @Override
271  public void startRegionServer(String hostname, int port) throws IOException {
272    this.startRegionServer();
273  }
274
275  @Override
276  public void killRegionServer(ServerName serverName) throws IOException {
277    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
278    if (server instanceof MiniHBaseClusterRegionServer) {
279      LOG.info("Killing " + server.toString());
280      ((MiniHBaseClusterRegionServer) server).kill();
281    } else {
282      abortRegionServer(getRegionServerIndex(serverName));
283    }
284  }
285
286  @Override
287  public boolean isKilledRS(ServerName serverName) {
288    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
289  }
290
291  @Override
292  public void stopRegionServer(ServerName serverName) throws IOException {
293    stopRegionServer(getRegionServerIndex(serverName));
294  }
295
296  @Override
297  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
298    //ignore timeout for now
299    waitOnRegionServer(getRegionServerIndex(serverName));
300  }
301
302  @Override
303  public void startZkNode(String hostname, int port) throws IOException {
304    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
305  }
306
307  @Override
308  public void killZkNode(ServerName serverName) throws IOException {
309    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
310  }
311
312  @Override
313  public void stopZkNode(ServerName serverName) throws IOException {
314    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
315  }
316
317  @Override
318  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
319    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
320  }
321
322  @Override
323  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
324    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
325  }
326
327  @Override
328  public void startDataNode(ServerName serverName) throws IOException {
329    LOG.warn("Starting datanodes on mini cluster is not supported");
330  }
331
332  @Override
333  public void killDataNode(ServerName serverName) throws IOException {
334    LOG.warn("Aborting datanodes on mini cluster is not supported");
335  }
336
337  @Override
338  public void stopDataNode(ServerName serverName) throws IOException {
339    LOG.warn("Stopping datanodes on mini cluster is not supported");
340  }
341
342  @Override
343  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
344    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
345  }
346
347  @Override
348  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
349    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
350  }
351
352  @Override
353  public void startNameNode(ServerName serverName) throws IOException {
354    LOG.warn("Starting namenodes on mini cluster is not supported");
355  }
356
357  @Override
358  public void killNameNode(ServerName serverName) throws IOException {
359    LOG.warn("Aborting namenodes on mini cluster is not supported");
360  }
361
362  @Override
363  public void stopNameNode(ServerName serverName) throws IOException {
364    LOG.warn("Stopping namenodes on mini cluster is not supported");
365  }
366
367  @Override
368  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
369    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
370  }
371
372  @Override
373  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
374    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
375  }
376
377  @Override
378  public void startMaster(String hostname, int port) throws IOException {
379    this.startMaster();
380  }
381
382  @Override
383  public void killMaster(ServerName serverName) throws IOException {
384    abortMaster(getMasterIndex(serverName));
385  }
386
387  @Override
388  public void stopMaster(ServerName serverName) throws IOException {
389    stopMaster(getMasterIndex(serverName));
390  }
391
392  @Override
393  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
394    //ignore timeout for now
395    waitOnMaster(getMasterIndex(serverName));
396  }
397
398  /**
399   * Starts a region server thread running
400   *
401   * @throws IOException
402   * @return New RegionServerThread
403   */
404  public JVMClusterUtil.RegionServerThread startRegionServer()
405      throws IOException {
406    final Configuration newConf = HBaseConfiguration.create(conf);
407    User rsUser =
408        HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
409    JVMClusterUtil.RegionServerThread t =  null;
410    try {
411      t = hbaseCluster.addRegionServer(
412          newConf, hbaseCluster.getRegionServers().size(), rsUser);
413      t.start();
414      t.waitForServerOnline();
415    } catch (InterruptedException ie) {
416      throw new IOException("Interrupted adding regionserver to cluster", ie);
417    }
418    return t;
419  }
420
421  /**
422   * Starts a region server thread and waits until its processed by master. Throws an exception
423   * when it can't start a region server or when the region server is not processed by master
424   * within the timeout.
425   *
426   * @return New RegionServerThread
427   */
428  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
429      throws IOException {
430
431    JVMClusterUtil.RegionServerThread t =  startRegionServer();
432    ServerName rsServerName = t.getRegionServer().getServerName();
433
434    long start = System.currentTimeMillis();
435    ClusterStatus clusterStatus = getClusterStatus();
436    while ((System.currentTimeMillis() - start) < timeout) {
437      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
438        return t;
439      }
440      Threads.sleep(100);
441    }
442    if (t.getRegionServer().isOnline()) {
443      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
444    } else {
445      throw new IOException("RS: " + rsServerName + " is offline");
446    }
447  }
448
449  /**
450   * Cause a region server to exit doing basic clean up only on its way out.
451   * @param serverNumber  Used as index into a list.
452   */
453  public String abortRegionServer(int serverNumber) {
454    HRegionServer server = getRegionServer(serverNumber);
455    LOG.info("Aborting " + server.toString());
456    server.abort("Aborting for tests", new Exception("Trace info"));
457    return server.toString();
458  }
459
460  /**
461   * Shut down the specified region server cleanly
462   *
463   * @param serverNumber  Used as index into a list.
464   * @return the region server that was stopped
465   */
466  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
467    return stopRegionServer(serverNumber, true);
468  }
469
470  /**
471   * Shut down the specified region server cleanly
472   *
473   * @param serverNumber  Used as index into a list.
474   * @param shutdownFS True is we are to shutdown the filesystem as part of this
475   * regionserver's shutdown.  Usually we do but you do not want to do this if
476   * you are running multiple regionservers in a test and you shut down one
477   * before end of the test.
478   * @return the region server that was stopped
479   */
480  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
481      final boolean shutdownFS) {
482    JVMClusterUtil.RegionServerThread server =
483      hbaseCluster.getRegionServers().get(serverNumber);
484    LOG.info("Stopping " + server.toString());
485    server.getRegionServer().stop("Stopping rs " + serverNumber);
486    return server;
487  }
488
489  /**
490   * Wait for the specified region server to stop. Removes this thread from list
491   * of running threads.
492   * @param serverNumber
493   * @return Name of region server that just went down.
494   */
495  public String waitOnRegionServer(final int serverNumber) {
496    return this.hbaseCluster.waitOnRegionServer(serverNumber);
497  }
498
499
500  /**
501   * Starts a master thread running
502   *
503   * @return New RegionServerThread
504   */
505  public JVMClusterUtil.MasterThread startMaster() throws IOException {
506    Configuration c = HBaseConfiguration.create(conf);
507    User user =
508        HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
509
510    JVMClusterUtil.MasterThread t = null;
511    try {
512      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
513      t.start();
514    } catch (InterruptedException ie) {
515      throw new IOException("Interrupted adding master to cluster", ie);
516    }
517    return t;
518  }
519
520  /**
521   * Returns the current active master, if available.
522   * @return the active HMaster, null if none is active.
523   */
524  @Override
525  public MasterService.BlockingInterface getMasterAdminService() {
526    return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
527  }
528
529  /**
530   * Returns the current active master, if available.
531   * @return the active HMaster, null if none is active.
532   */
533  public HMaster getMaster() {
534    return this.hbaseCluster.getActiveMaster();
535  }
536
537  /**
538   * Returns the current active master thread, if available.
539   * @return the active MasterThread, null if none is active.
540   */
541  public MasterThread getMasterThread() {
542    for (MasterThread mt: hbaseCluster.getLiveMasters()) {
543      if (mt.getMaster().isActiveMaster()) {
544        return mt;
545      }
546    }
547    return null;
548  }
549
550  /**
551   * Returns the master at the specified index, if available.
552   * @return the active HMaster, null if none is active.
553   */
554  public HMaster getMaster(final int serverNumber) {
555    return this.hbaseCluster.getMaster(serverNumber);
556  }
557
558  /**
559   * Cause a master to exit without shutting down entire cluster.
560   * @param serverNumber  Used as index into a list.
561   */
562  public String abortMaster(int serverNumber) {
563    HMaster server = getMaster(serverNumber);
564    LOG.info("Aborting " + server.toString());
565    server.abort("Aborting for tests", new Exception("Trace info"));
566    return server.toString();
567  }
568
569  /**
570   * Shut down the specified master cleanly
571   *
572   * @param serverNumber  Used as index into a list.
573   * @return the region server that was stopped
574   */
575  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
576    return stopMaster(serverNumber, true);
577  }
578
579  /**
580   * Shut down the specified master cleanly
581   *
582   * @param serverNumber  Used as index into a list.
583   * @param shutdownFS True is we are to shutdown the filesystem as part of this
584   * master's shutdown.  Usually we do but you do not want to do this if
585   * you are running multiple master in a test and you shut down one
586   * before end of the test.
587   * @return the master that was stopped
588   */
589  public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
590      final boolean shutdownFS) {
591    JVMClusterUtil.MasterThread server =
592      hbaseCluster.getMasters().get(serverNumber);
593    LOG.info("Stopping " + server.toString());
594    server.getMaster().stop("Stopping master " + serverNumber);
595    return server;
596  }
597
598  /**
599   * Wait for the specified master to stop. Removes this thread from list
600   * of running threads.
601   * @param serverNumber
602   * @return Name of master that just went down.
603   */
604  public String waitOnMaster(final int serverNumber) {
605    return this.hbaseCluster.waitOnMaster(serverNumber);
606  }
607
608  /**
609   * Blocks until there is an active master and that master has completed
610   * initialization.
611   *
612   * @return true if an active master becomes available.  false if there are no
613   *         masters left.
614   * @throws InterruptedException
615   */
616  @Override
617  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
618    List<JVMClusterUtil.MasterThread> mts;
619    long start = System.currentTimeMillis();
620    while (!(mts = getMasterThreads()).isEmpty()
621        && (System.currentTimeMillis() - start) < timeout) {
622      for (JVMClusterUtil.MasterThread mt : mts) {
623        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
624          return true;
625        }
626      }
627
628      Threads.sleep(100);
629    }
630    return false;
631  }
632
633  /**
634   * @return List of master threads.
635   */
636  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
637    return this.hbaseCluster.getMasters();
638  }
639
640  /**
641   * @return List of live master threads (skips the aborted and the killed)
642   */
643  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
644    return this.hbaseCluster.getLiveMasters();
645  }
646
647  /**
648   * Wait for Mini HBase Cluster to shut down.
649   */
650  public void join() {
651    this.hbaseCluster.join();
652  }
653
654  /**
655   * Shut down the mini HBase cluster
656   */
657  @Override
658  public void shutdown() throws IOException {
659    if (this.hbaseCluster != null) {
660      this.hbaseCluster.shutdown();
661    }
662  }
663
664  @Override
665  public void close() throws IOException {
666  }
667
668  /**
669   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
670   *             Use {@link #getClusterMetrics()} instead.
671   */
672  @Deprecated
673  public ClusterStatus getClusterStatus() throws IOException {
674    HMaster master = getMaster();
675    return master == null ? null : new ClusterStatus(master.getClusterMetrics());
676  }
677
678  @Override
679  public ClusterMetrics getClusterMetrics() throws IOException {
680    HMaster master = getMaster();
681    return master == null ? null : master.getClusterMetrics();
682  }
683
684  private void executeFlush(HRegion region) throws IOException {
685    // retry 5 times if we can not flush
686    for (int i = 0; i < 5; i++) {
687      FlushResult result = region.flush(true);
688      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
689        return;
690      }
691      Threads.sleep(1000);
692    }
693  }
694
695  /**
696   * Call flushCache on all regions on all participating regionservers.
697   */
698  public void flushcache() throws IOException {
699    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
700      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
701        executeFlush(r);
702      }
703    }
704  }
705
706  /**
707   * Call flushCache on all regions of the specified table.
708   */
709  public void flushcache(TableName tableName) throws IOException {
710    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
711      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
712        if (r.getTableDescriptor().getTableName().equals(tableName)) {
713          executeFlush(r);
714        }
715      }
716    }
717  }
718
719  /**
720   * Call flushCache on all regions on all participating regionservers.
721   * @throws IOException
722   */
723  public void compact(boolean major) throws IOException {
724    for (JVMClusterUtil.RegionServerThread t:
725        this.hbaseCluster.getRegionServers()) {
726      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
727        r.compact(major);
728      }
729    }
730  }
731
732  /**
733   * Call flushCache on all regions of the specified table.
734   * @throws IOException
735   */
736  public void compact(TableName tableName, boolean major) throws IOException {
737    for (JVMClusterUtil.RegionServerThread t:
738        this.hbaseCluster.getRegionServers()) {
739      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
740        if(r.getTableDescriptor().getTableName().equals(tableName)) {
741          r.compact(major);
742        }
743      }
744    }
745  }
746
747  /**
748   * @return List of region server threads. Does not return the master even though it is also
749   * a region server.
750   */
751  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
752    return this.hbaseCluster.getRegionServers();
753  }
754
755  /**
756   * @return List of live region server threads (skips the aborted and the killed)
757   */
758  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
759    return this.hbaseCluster.getLiveRegionServers();
760  }
761
762  /**
763   * Grab a numbered region server of your choice.
764   * @param serverNumber
765   * @return region server
766   */
767  public HRegionServer getRegionServer(int serverNumber) {
768    return hbaseCluster.getRegionServer(serverNumber);
769  }
770
771  public HRegionServer getRegionServer(ServerName serverName) {
772    return hbaseCluster.getRegionServers().stream()
773        .map(t -> t.getRegionServer())
774        .filter(r -> r.getServerName().equals(serverName))
775        .findFirst().orElse(null);
776  }
777
778  public List<HRegion> getRegions(byte[] tableName) {
779    return getRegions(TableName.valueOf(tableName));
780  }
781
782  public List<HRegion> getRegions(TableName tableName) {
783    List<HRegion> ret = new ArrayList<>();
784    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
785      HRegionServer hrs = rst.getRegionServer();
786      for (Region region : hrs.getOnlineRegionsLocalContext()) {
787        if (region.getTableDescriptor().getTableName().equals(tableName)) {
788          ret.add((HRegion)region);
789        }
790      }
791    }
792    return ret;
793  }
794
795  /**
796   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
797   * of HRS carrying regionName. Returns -1 if none found.
798   */
799  public int getServerWithMeta() {
800    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
801  }
802
803  /**
804   * Get the location of the specified region
805   * @param regionName Name of the region in bytes
806   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
807   * of HRS carrying hbase:meta. Returns -1 if none found.
808   */
809  public int getServerWith(byte[] regionName) {
810    int index = -1;
811    int count = 0;
812    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
813      HRegionServer hrs = rst.getRegionServer();
814      if (!hrs.isStopped()) {
815        Region region = hrs.getOnlineRegion(regionName);
816        if (region != null) {
817          index = count;
818          break;
819        }
820      }
821      count++;
822    }
823    return index;
824  }
825
826  @Override
827  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
828  throws IOException {
829    // Assume there is only one master thread which is the active master.
830    // If there are multiple master threads, the backup master threads
831    // should hold some regions. Please refer to #countServedRegions
832    // to see how we find out all regions.
833    HMaster master = getMaster();
834    Region region = master.getOnlineRegion(regionName);
835    if (region != null) {
836      return master.getServerName();
837    }
838    int index = getServerWith(regionName);
839    if (index < 0) {
840      return null;
841    }
842    return getRegionServer(index).getServerName();
843  }
844
845  /**
846   * Counts the total numbers of regions being served by the currently online
847   * region servers by asking each how many regions they have.  Does not look
848   * at hbase:meta at all.  Count includes catalog tables.
849   * @return number of regions being served by all region servers
850   */
851  public long countServedRegions() {
852    long count = 0;
853    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
854      count += rst.getRegionServer().getNumberOfOnlineRegions();
855    }
856    for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
857      count += mt.getMaster().getNumberOfOnlineRegions();
858    }
859    return count;
860  }
861
862  /**
863   * Do a simulated kill all masters and regionservers. Useful when it is
864   * impossible to bring the mini-cluster back for clean shutdown.
865   */
866  public void killAll() {
867    // Do backups first.
868    MasterThread activeMaster = null;
869    for (MasterThread masterThread : getMasterThreads()) {
870      if (!masterThread.getMaster().isActiveMaster()) {
871        masterThread.getMaster().abort("killAll");
872      } else {
873        activeMaster = masterThread;
874      }
875    }
876    // Do active after.
877    if (activeMaster != null) {
878      activeMaster.getMaster().abort("killAll");
879    }
880    for (RegionServerThread rst : getRegionServerThreads()) {
881      rst.getRegionServer().abort("killAll");
882    }
883  }
884
885  @Override
886  public void waitUntilShutDown() {
887    this.hbaseCluster.join();
888  }
889
890  public List<HRegion> findRegionsForTable(TableName tableName) {
891    ArrayList<HRegion> ret = new ArrayList<>();
892    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
893      HRegionServer hrs = rst.getRegionServer();
894      for (Region region : hrs.getRegions(tableName)) {
895        if (region.getTableDescriptor().getTableName().equals(tableName)) {
896          ret.add((HRegion)region);
897        }
898      }
899    }
900    return ret;
901  }
902
903
904  protected int getRegionServerIndex(ServerName serverName) {
905    //we have a small number of region servers, this should be fine for now.
906    List<RegionServerThread> servers = getRegionServerThreads();
907    for (int i=0; i < servers.size(); i++) {
908      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
909        return i;
910      }
911    }
912    return -1;
913  }
914
915  protected int getMasterIndex(ServerName serverName) {
916    List<MasterThread> masters = getMasterThreads();
917    for (int i = 0; i < masters.size(); i++) {
918      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
919        return i;
920      }
921    }
922    return -1;
923  }
924
925  @Override
926  public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
927    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
928  }
929
930  @Override
931  public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
932  throws IOException {
933    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
934  }
935}