processingQueue = new LinkedBlockingQueue<>();
/**
* The max time interval between two scheduling passes in milliseconds. A scheduling pass is
* defined as the scheduler thread determining whether to drain all pending visits from the queue
* and submitting tasks to perform the visits.
*
* The choice of 1ms is a result based of experiments. It is an attempted balance due to a few
* facts about the scheduling interval:
*
*
1. A large interval adds systematic delay. In an extreme case, a visit which is supposed to
* take only 1ms now may take 5ms. For most visits which take longer than a few hundred
* milliseconds, it should not be noticeable.
*
*
2. A zero-interval config eats too much CPU.
*
*
Even though the scheduler runs once every 1 ms, it does not try to drain it every time.
* Pending visits are drained only certain criteria are met.
*/
private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1;
/**
* The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on
* experiments. We do not want to schedule tasks too frequently to miss the benefits of large
* number of keys being grouped by packages. On the other hand, we want to keep all threads in the
* pool busy to achieve full capacity. A low number here will cause some of the worker threads to
* go idle at times before the next scheduling cycle.
*
*
TODO(shazh): Revisit the choice of task target based on real-prod performance.
*/
private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
/**
* Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code
* RuntimeQueryException}, which are resulted from InterruptedException and QueryException.
*/
static final ErrorClassifier PARALLEL_VISITOR_ERROR_CLASSIFIER =
new ErrorClassifier() {
@Override
protected ErrorClassification classifyException(Exception e) {
return (e instanceof RuntimeException)
? ErrorClassification.CRITICAL_AND_LOG
: ErrorClassification.NOT_CRITICAL;
}
};
/** All visitors share a single global fixed thread pool. */
private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(
/*corePoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
/*maximumPoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
/*keepAliveTime=*/ 1,
/*units=*/ TimeUnit.SECONDS,
/*workQueue=*/ new BlockingStack(),
new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build());
protected ParallelVisitor(
Uniquifier uniquifier, Callback callback, int visitBatchSize) {
this.uniquifier = uniquifier;
this.callback = callback;
this.visitBatchSize = visitBatchSize;
this.executor =
new VisitingTaskExecutor(FIXED_THREAD_POOL_EXECUTOR, PARALLEL_VISITOR_ERROR_CLASSIFIER);
}
/** Factory for {@link ParallelVisitor} instances. */
public interface Factory {
ParallelVisitor> create();
}
/**
* Returns a {@link Callback} which kicks off a parallel visitation when {@link Callback#process}
* is invoked.
*/
public static Callback createParallelVisitorCallback(Factory visitorFactory) {
return new ParallelVisitorCallback(visitorFactory);
}
/** An object to hold keys to visit and keys ready for processing. */
protected final class Visit {
private final Iterable keysToUseForResult;
private final Iterable keysToVisit;
public Visit(Iterable keysToUseForResult, Iterable keysToVisit) {
this.keysToUseForResult = keysToUseForResult;
this.keysToVisit = keysToVisit;
}
}
void visitAndWaitForCompletion(Iterable keys)
throws QueryException, InterruptedException {
Streams.stream(preprocessInitialVisit(keys)).forEachOrdered(processingQueue::add);
executor.visitAndWaitForCompletion();
}
/**
* Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
* the full visitation to the given {@link Callback}.
*/
protected abstract void processResultantTargets(
Iterable keysToUseForResult, Callback callback)
throws QueryException, InterruptedException;
/** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
protected abstract Visit getVisitResult(Iterable values) throws InterruptedException;
/** Gets the first {@link Visit} representing the entry-level SkyKeys. */
protected abstract Iterable preprocessInitialVisit(Iterable keys);
/** Gets tasks to visit pending keys. */
protected Iterable getVisitTasks(Collection pendingKeysToVisit) {
ImmutableList.Builder builder = ImmutableList.builder();
for (Iterable keysToVisitBatch : Iterables.partition(pendingKeysToVisit, visitBatchSize)) {
builder.add(new VisitTask(keysToVisitBatch));
}
return builder.build();
}
/** A {@link Runnable} which handles {@link QueryException} and {@link InterruptedException}. */
protected abstract static class Task implements Runnable {
@Override
public void run() {
try {
process();
} catch (QueryException e) {
throw new RuntimeQueryException(e);
} catch (InterruptedException e) {
throw new RuntimeInterruptedException(e);
}
}
abstract void process() throws QueryException, InterruptedException;
}
class VisitTask extends Task {
private final Iterable keysToVisit;
VisitTask(Iterable keysToVisit) {
this.keysToVisit = keysToVisit;
}
@Override
void process() throws InterruptedException {
ImmutableList uniqueKeys = uniquifier.unique(keysToVisit);
if (uniqueKeys.isEmpty()) {
return;
}
Visit visit = getVisitResult(uniqueKeys);
for (Iterable keysToUseForResultBatch :
Iterables.partition(visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch));
}
Streams.stream(visit.keysToVisit).forEachOrdered(processingQueue::add);
}
}
private class GetAndProcessResultsTask extends Task {
private final Iterable keysToUseForResult;
private GetAndProcessResultsTask(Iterable keysToUseForResult) {
this.keysToUseForResult = keysToUseForResult;
}
@Override
protected void process() throws QueryException, InterruptedException {
processResultantTargets(keysToUseForResult, callback);
}
}
/**
* A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
* scheduler for parallel visitations.
*/
private class VisitingTaskExecutor extends AbstractQueueVisitor {
private VisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
super(
/*executorService=*/ executor,
// Leave the thread pool active for other current and future callers.
/*shutdownOnCompletion=*/ false,
/*failFastOnException=*/ true,
/*errorClassifier=*/ errorClassifier);
}
private void visitAndWaitForCompletion() throws QueryException, InterruptedException {
// The scheduler keeps running until either of the following two conditions are met.
//
// 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
// fast.
// 2. There is no pending visit in the queue and no pending task running.
while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
// To achieve maximum efficiency, queue is drained in either of the following two
// conditions:
//
// 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
// 2. The process queue size is large.
if (getTaskCount() < MIN_PENDING_TASKS
|| processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
Collection pendingKeysToVisit = new ArrayList<>(processingQueue.size());
processingQueue.drainTo(pendingKeysToVisit);
for (Task task : getVisitTasks(pendingKeysToVisit)) {
execute(task);
}
}
try {
Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
} catch (InterruptedException e) {
// If the main thread waiting for completion of the visitation is interrupted, we should
// gracefully terminate all running and pending tasks before exit. If QueryException
// occurred in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
// propagates the QueryException instead of InterruptedException.
setInterrupted();
awaitTerminationAndPropagateErrorsIfAny();
}
}
// We reach here either because the visitation is complete, or because an error prevents us
// from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
// gracefully exit if the visitation is complete, or propagate the exception if error
// occurred.
awaitTerminationAndPropagateErrorsIfAny();
}
private void awaitTerminationAndPropagateErrorsIfAny()
throws QueryException, InterruptedException {
try {
awaitTermination(/*interruptWorkers=*/ true);
} catch (RuntimeQueryException e) {
throw (QueryException) e.getCause();
} catch (RuntimeInterruptedException e) {
throw (InterruptedException) e.getCause();
}
}
}
/**
* A {@link Callback} whose {@link Callback#process} method kicks off a visitation via a fresh
* {@link ParallelVisitor} instance.
*/
private static class ParallelVisitorCallback implements Callback {
private final ParallelVisitor.Factory visitorFactory;
private ParallelVisitorCallback(ParallelVisitor.Factory visitorFactory) {
this.visitorFactory = visitorFactory;
}
@Override
public void process(Iterable partialResult)
throws QueryException, InterruptedException {
ParallelVisitor> visitor = visitorFactory.create();
// TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
// another, potentially expensive computation. Refactor to something like "processAsync".
visitor.visitAndWaitForCompletion(
SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
}
}
private static class RuntimeQueryException extends RuntimeException {
private RuntimeQueryException(QueryException queryException) {
super(queryException);
}
}
private static class RuntimeInterruptedException extends RuntimeException {
private RuntimeInterruptedException(InterruptedException interruptedException) {
super(interruptedException);
}
}
}