// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.query2.engine; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations * of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. */ public abstract class AbstractQueryEnvironment implements QueryEnvironment { /** Concrete implementation of {@link QueryTaskFuture}. */ protected static final class QueryTaskFutureImpl extends QueryTaskFutureImplBase implements ListenableFuture { private final ListenableFuture delegate; private QueryTaskFutureImpl(ListenableFuture delegate) { this.delegate = delegate; } public static QueryTaskFutureImpl ofDelegate(ListenableFuture delegate) { return (delegate instanceof QueryTaskFutureImpl) ? (QueryTaskFutureImpl) delegate : new QueryTaskFutureImpl<>(delegate); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return delegate.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); } @Override public T get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } @Override public void addListener(Runnable listener, Executor executor) { delegate.addListener(listener, executor); } @Override public T getIfSuccessful() { try { return Futures.getDone(delegate); } catch (CancellationException | ExecutionException e) { throw new IllegalStateException(e); } } public T getChecked() throws InterruptedException, QueryException { try { return get(); } catch (CancellationException e) { throw new InterruptedException(); } catch (ExecutionException e) { Throwable cause = e.getCause(); Throwables.propagateIfPossible(cause, QueryException.class); Throwables.propagateIfPossible(cause, InterruptedException.class); throw new IllegalStateException(e.getCause()); } } } @Override public QueryTaskFuture immediateSuccessfulFuture(R value) { return new QueryTaskFutureImpl<>(Futures.immediateFuture(value)); } @Override public QueryTaskFuture immediateFailedFuture(QueryException e) { return new QueryTaskFutureImpl<>(Futures.immediateFailedFuture(e)); } @Override public QueryTaskFuture immediateCancelledFuture() { return new QueryTaskFutureImpl<>(Futures.immediateCancelledFuture()); } @Override public QueryTaskFuture eval( QueryExpression expr, QueryExpressionContext context, final Callback callback) { // Not all QueryEnvironment implementations embrace the async+streaming evaluation framework. In // particular, the streaming callbacks employed by functions like 'deps' use // QueryEnvironment#buildTransitiveClosure. So if the implementation of that method does some // heavyweight blocking work, then it's best to do this blocking work in a single batch. // Importantly, the callback we pass in needs to maintain order. final QueryUtil.AggregateAllCallback aggregateAllCallback = QueryUtil.newOrderedAggregateAllOutputFormatterCallback(this); QueryTaskFuture evalAllFuture = expr.eval(this, context, aggregateAllCallback); return whenSucceedsCall( evalAllFuture, new QueryTaskCallable() { @Override public Void call() throws QueryException, InterruptedException { callback.process(aggregateAllCallback.getResult()); return null; } }); } @Override public QueryTaskFuture executeAsync(QueryTaskCallable callable) { try { return immediateSuccessfulFuture(callable.call()); } catch (QueryException e) { return immediateFailedFuture(e); } catch (InterruptedException e) { return immediateCancelledFuture(); } } @Override public QueryTaskFuture whenSucceedsCall( QueryTaskFuture future, QueryTaskCallable callable) { return whenAllSucceedCall(ImmutableList.of(future), callable); } private static class Dummy implements QueryTaskCallable { public static final Dummy INSTANCE = new Dummy(); private Dummy() {} @Override public Void call() { return null; } } @Override public QueryTaskFuture whenAllSucceed(Iterable> futures) { return whenAllSucceedCall(futures, Dummy.INSTANCE); } @Override public QueryTaskFuture whenAllSucceedCall( Iterable> futures, QueryTaskCallable callable) { return QueryTaskFutureImpl.ofDelegate( Futures.whenAllSucceed(cast(futures)).call(callable, directExecutor())); } @Override public QueryTaskFuture transformAsync( QueryTaskFuture future, final Function> function) { return QueryTaskFutureImpl.ofDelegate( Futures.transformAsync( (QueryTaskFutureImpl) future, input -> (QueryTaskFutureImpl) function.apply(input), directExecutor())); } protected static Iterable> cast( Iterable> futures) { return Iterables.transform(futures, future -> (QueryTaskFutureImpl) future); } }