1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.errorhandling.ForeignException;
31 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
32 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
34 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
35
36 import com.google.common.collect.Lists;
37
38 /**
39 * A globally-barriered distributed procedure. This class encapsulates state and methods for
40 * tracking and managing a distributed procedure, as well as aborting if any member encounters
41 * a problem or if a cancellation is requested.
42 * <p>
43 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
44 * method. The procedure contacts all members and waits for all subprocedures to execute
45 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
46 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed,
47 * the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to
48 * execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
49 * {@link Subprocedure#insideBarrier} executions complete at the members. When
50 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
51 * the coordinator. Once all members complete, the coordinator calls
52 * {@link #sendGlobalBarrierComplete()}.
53 * <p>
54 * If errors are encountered remotely, they are forwarded to the coordinator, and
55 * {@link Subprocedure#cleanup(Exception)} is called.
56 * <p>
57 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
58 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
59 * an {@link ForeignException} to abort the procedure. This is particularly useful for situations
60 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
61 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
62 * <p>
63 * Users should generally not directly create or subclass instances of this. They are created
64 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
65 * String, byte[], List)}}
66 */
67 @InterfaceAudience.Private
68 public class Procedure implements Callable<Void>, ForeignExceptionListener {
69 private static final Log LOG = LogFactory.getLog(Procedure.class);
70
71 //
72 // Arguments and naming
73 //
74
75 // Name of the procedure
76 final private String procName;
77 // Arguments for this procedure execution
78 final private byte[] args;
79
80 //
81 // Execution State
82 //
83 /** latch for waiting until all members have acquire in barrier state */
84 final CountDownLatch acquiredBarrierLatch;
85 /** latch for waiting until all members have executed and released their in barrier state */
86 final CountDownLatch releasedBarrierLatch;
87 /** latch for waiting until a procedure has completed */
88 final CountDownLatch completedLatch;
89 /** monitor to check for errors */
90 private final ForeignExceptionDispatcher monitor;
91
92 //
93 // Execution Timeout Handling.
94 //
95
96 /** frequency to check for errors (ms) */
97 protected final long wakeFrequency;
98 protected final TimeoutExceptionInjector timeoutInjector;
99
100 //
101 // Members' and Coordinator's state
102 //
103
104 /** lock to prevent nodes from acquiring and then releasing before we can track them */
105 private Object joinBarrierLock = new Object();
106 private final List<String> acquiringMembers;
107 private final List<String> inBarrierMembers;
108 private ProcedureCoordinator coord;
109
110 /**
111 * Creates a procedure. (FOR TESTING)
112 *
113 * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
114 * @param coord coordinator to call back to for general errors (e.g.
115 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
116 * @param monitor error monitor to check for external errors
117 * @param wakeFreq frequency to check for errors while waiting
118 * @param timeout amount of time to allow the procedure to run before cancelling
119 * @param procName name of the procedure instance
120 * @param args argument data associated with the procedure instance
121 * @param expectedMembers names of the expected members
122 */
123 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
124 long timeout, String procName, byte[] args, List<String> expectedMembers) {
125 this.coord = coord;
126 this.acquiringMembers = new ArrayList<String>(expectedMembers);
127 this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
128 this.procName = procName;
129 this.args = args;
130 this.monitor = monitor;
131 this.wakeFrequency = wakeFreq;
132
133 int count = expectedMembers.size();
134 this.acquiredBarrierLatch = new CountDownLatch(count);
135 this.releasedBarrierLatch = new CountDownLatch(count);
136 this.completedLatch = new CountDownLatch(1);
137 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
138 }
139
140 /**
141 * Create a procedure.
142 *
143 * Users should generally not directly create instances of this. They are created them
144 * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
145 * String, byte[], List)}}
146 *
147 * @param coord coordinator to call back to for general errors (e.g.
148 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
149 * @param wakeFreq frequency to check for errors while waiting
150 * @param timeout amount of time to allow the procedure to run before cancelling
151 * @param procName name of the procedure instance
152 * @param args argument data associated with the procedure instance
153 * @param expectedMembers names of the expected members
154 */
155 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
156 String procName, byte[] args, List<String> expectedMembers) {
157 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
158 expectedMembers);
159 }
160
161 public String getName() {
162 return procName;
163 }
164
165 /**
166 * @return String of the procedure members both trying to enter the barrier and already in barrier
167 */
168 public String getStatus() {
169 String waiting, done;
170 synchronized (joinBarrierLock) {
171 waiting = acquiringMembers.toString();
172 done = inBarrierMembers.toString();
173 }
174 return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
175 }
176
177 /**
178 * Get the ForeignExceptionDispatcher
179 * @return the Procedure's monitor.
180 */
181 public ForeignExceptionDispatcher getErrorMonitor() {
182 return monitor;
183 }
184
185 /**
186 * This call is the main execution thread of the barriered procedure. It sends messages and
187 * essentially blocks until all procedure members acquire or later complete but periodically
188 * checks for foreign exceptions.
189 */
190 @Override
191 @SuppressWarnings("finally")
192 final public Void call() {
193 LOG.info("Starting procedure '" + procName + "'");
194 // start the timer
195 timeoutInjector.start();
196
197 // run the procedure
198 try {
199 // start by checking for error first
200 monitor.rethrowException();
201 LOG.debug("Procedure '" + procName + "' starting 'acquire'");
202 sendGlobalBarrierStart();
203
204 // wait for all the members to report acquisition
205 LOG.debug("Waiting for all members to 'acquire'");
206 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
207 monitor.rethrowException();
208
209 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
210 sendGlobalBarrierReached();
211
212 // wait for all members to report barrier release
213 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
214
215 // make sure we didn't get an error during in barrier execution and release
216 monitor.rethrowException();
217 LOG.info("Procedure '" + procName + "' execution completed");
218 } catch (Exception e) {
219 if (e instanceof InterruptedException) {
220 Thread.currentThread().interrupt();
221 }
222 String msg = "Procedure '" + procName +"' execution failed!";
223 LOG.error(msg, e);
224 receive(new ForeignException(getName(), e));
225 } finally {
226 LOG.debug("Running finish phase.");
227 sendGlobalBarrierComplete();
228 completedLatch.countDown();
229
230 // tell the timer we are done, if we get here successfully
231 timeoutInjector.complete();
232 return null;
233 }
234 }
235
236 /**
237 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
238 * the {@link Subprocedure#acquireBarrier} step.
239 * @throws ForeignException
240 */
241 public void sendGlobalBarrierStart() throws ForeignException {
242 // start the procedure
243 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
244 try {
245 // send procedure barrier start to specified list of members. cloning the list to avoid
246 // concurrent modification from the controller setting the prepared nodes
247 coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
248 } catch (IOException e) {
249 coord.rpcConnectionFailure("Can't reach controller.", e);
250 } catch (IllegalArgumentException e) {
251 throw new ForeignException(getName(), e);
252 }
253 }
254
255 /**
256 * Sends a message to all members that the global barrier condition has been satisfied. This
257 * should only be executed after all members have completed its
258 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
259 * {@link Subprocedure#insideBarrier} method.
260 * @throws ForeignException
261 */
262 public void sendGlobalBarrierReached() throws ForeignException {
263 try {
264 // trigger to have member run {@link Subprocedure#insideBarrier}
265 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
266 } catch (IOException e) {
267 coord.rpcConnectionFailure("Can't reach controller.", e);
268 }
269 }
270
271 /**
272 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
273 * After this executes, the coordinator can assume that any state resources about this barrier
274 * procedure state has been released.
275 */
276 public void sendGlobalBarrierComplete() {
277 LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
278 try {
279 coord.getRpcs().resetMembers(this);
280 } catch (IOException e) {
281 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
282 }
283 }
284
285 //
286 // Call backs from other external processes.
287 //
288
289 /**
290 * Call back triggered by an individual member upon successful local barrier acquisition
291 * @param member
292 */
293 public void barrierAcquiredByMember(String member) {
294 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
295 + "' on coordinator");
296 if (this.acquiringMembers.contains(member)) {
297 synchronized (joinBarrierLock) {
298 if (this.acquiringMembers.remove(member)) {
299 this.inBarrierMembers.add(member);
300 acquiredBarrierLatch.countDown();
301 }
302 }
303 LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
304 } else {
305 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
306 " Continuing on.");
307 }
308 }
309
310 /**
311 * Call back triggered by a individual member upon successful local in-barrier execution and
312 * release
313 * @param member
314 */
315 public void barrierReleasedByMember(String member) {
316 boolean removed = false;
317 synchronized (joinBarrierLock) {
318 removed = this.inBarrierMembers.remove(member);
319 if (removed) {
320 releasedBarrierLatch.countDown();
321 }
322 }
323 if (removed) {
324 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
325 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount()
326 + " more");
327 } else {
328 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
329 + "', but we weren't waiting on it to release!");
330 }
331 }
332
333 /**
334 * Waits until the entire procedure has globally completed, or has been aborted. If an
335 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
336 * yet.
337 * @throws ForeignException
338 * @throws InterruptedException
339 */
340 public void waitForCompleted() throws ForeignException, InterruptedException {
341 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
342 }
343
344 /**
345 * A callback that handles incoming ForeignExceptions.
346 */
347 @Override
348 public void receive(ForeignException e) {
349 monitor.receive(e);
350 }
351
352 /**
353 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
354 * check for errors
355 * @param latch latch to wait on
356 * @param monitor monitor to check for errors while waiting
357 * @param wakeFrequency frequency to wake up and check for errors (in
358 * {@link TimeUnit#MILLISECONDS})
359 * @param latchDescription description of the latch, for logging
360 * @throws ForeignException type of error the monitor can throw, if the task fails
361 * @throws InterruptedException if we are interrupted while waiting on latch
362 */
363 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
364 long wakeFrequency, String latchDescription) throws ForeignException,
365 InterruptedException {
366 boolean released = false;
367 while (!released) {
368 if (monitor != null) {
369 monitor.rethrowException();
370 }
371 /*
372 ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
373 + wakeFrequency + " ms)"); */
374 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
375 }
376 // check error again in case an error raised during last wait
377 if (monitor != null) {
378 monitor.rethrowException();
379 }
380 }
381 }