diff options
author | janakr <janakr@google.com> | 2018-04-30 13:35:22 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-04-30 13:36:40 -0700 |
commit | bdd4eb2be39adb92dda0bb91cb6f2a32419e8694 (patch) | |
tree | fcb43740a40ea5b65a14fb139fb6f1b37a6296cd /src/main/java/com/google/devtools/build/lib/skyframe/serialization | |
parent | 968f87900dce45a7af749a965b72dbac51b176b3 (diff) |
Add ability for serialization to inform the SerializationContext that any remote write should block on the provided future.
PiperOrigin-RevId: 194836516
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/skyframe/serialization')
4 files changed, 193 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD index a2fe0b53ac..93412b31be 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD +++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD @@ -15,7 +15,7 @@ java_library( exclude = ["SerializationConstants.java"], ), deps = [ - ":constants", + "//src/main/java/com/google/devtools/build/lib:crash-utils", "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:registered-singleton", "//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:unsafe-provider", "//third_party:guava", diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java index 557ed17255..5f81394129 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java @@ -47,16 +47,26 @@ public class ObjectCodecs { } public void serialize(Object subject, CodedOutputStream codedOut) throws SerializationException { - serializeImpl(subject, codedOut, /*memoize=*/ false); + serializeImpl(subject, codedOut, serializationContext); } public ByteString serializeMemoized(Object subject) throws SerializationException { return serializeToByteString(subject, this::serializeMemoized); } + public SerializationResult<ByteString> serializeMemoizedAndBlocking(Object subject) + throws SerializationException { + SerializationContext memoizingContext = + serializationContext.getMemoizingAndBlockingOnWriteContext(); + ByteString byteString = + serializeToByteString( + subject, (subj, codedOut) -> serializeImpl(subj, codedOut, memoizingContext)); + return SerializationResult.create(byteString, memoizingContext.createFutureToBlockWritingOn()); + } + public void serializeMemoized(Object subject, CodedOutputStream codedOut) throws SerializationException { - serializeImpl(subject, codedOut, /*memoize=*/ true); + serializeImpl(subject, codedOut, serializationContext.getMemoizingContext()); } public Object deserialize(ByteString data) throws SerializationException { @@ -75,14 +85,11 @@ public class ObjectCodecs { return deserializeImpl(codedIn, /*memoize=*/ true); } - private void serializeImpl(Object subject, CodedOutputStream codedOut, boolean memoize) + private void serializeImpl( + Object subject, CodedOutputStream codedOut, SerializationContext serializationContext) throws SerializationException { try { - if (memoize) { - serializationContext.getMemoizingContext().serialize(subject, codedOut); - } else { - serializationContext.serialize(subject, codedOut); - } + serializationContext.serialize(subject, codedOut); } catch (IOException e) { throw new SerializationException("Failed to serialize " + subject, e); } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java index 1fe8555944..2c19c6b8dc 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java @@ -17,37 +17,51 @@ package com.google.devtools.build.lib.skyframe.serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.skyframe.serialization.Memoizer.Serializer; import com.google.devtools.build.lib.skyframe.serialization.SerializationException.NoCodecException; +import com.google.devtools.build.lib.util.BazelCrashUtils; import com.google.protobuf.CodedOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; /** * Stateful class for providing additional context to a single serialization "session". This class - * is thread-safe so long as {@link #serializer} is null. If it is not null, this class is not - * thread-safe and should only be accessed on a single thread for serializing one object (that may - * involve serializing other objects contained in it). + * is thread-safe so long as {@link #serializer} is null (which also implies that {@link + * #allowFuturesToBlockWritingOn) is false). If it is not null, this class is not thread-safe and + * should only be accessed on a single thread for serializing one object (that may involve + * serializing other objects contained in it). */ public class SerializationContext { private final ObjectCodecRegistry registry; private final ImmutableMap<Class<?>, Object> dependencies; @Nullable private final Memoizer.Serializer serializer; + /** Initialized lazily. */ + @Nullable private List<ListenableFuture<Void>> futuresToBlockWritingOn; + + private final boolean allowFuturesToBlockWritingOn; private SerializationContext( ObjectCodecRegistry registry, ImmutableMap<Class<?>, Object> dependencies, - @Nullable Serializer serializer) { + @Nullable Serializer serializer, + boolean allowFuturesToBlockWritingOn) { this.registry = registry; this.dependencies = dependencies; this.serializer = serializer; + this.allowFuturesToBlockWritingOn = allowFuturesToBlockWritingOn; } @VisibleForTesting public SerializationContext( ObjectCodecRegistry registry, ImmutableMap<Class<?>, Object> dependencies) { - this(registry, dependencies, /*serializer=*/ null); + this(registry, dependencies, /*serializer=*/ null, /*allowFuturesToBlockWritingOn=*/ false); } @VisibleForTesting @@ -92,7 +106,16 @@ public class SerializationContext { if (serializer != null) { return this; } - return getNewMemoizingContext(); + return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ false); + } + + @CheckReturnValue + SerializationContext getMemoizingAndBlockingOnWriteContext() { + Preconditions.checkState( + serializer == null, "Should only be called on base serializationContext"); + Preconditions.checkState( + !allowFuturesToBlockWritingOn, "Should only be called on base serializationContext"); + return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ true); } /** @@ -100,7 +123,51 @@ public class SerializationContext { * getMemoizingContext, this method is not idempotent - the returned context will always be fresh. */ public SerializationContext getNewMemoizingContext() { - return new SerializationContext(this.registry, this.dependencies, new Memoizer.Serializer()); + return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ false); + } + + private SerializationContext getNewMemoizingContext(boolean allowFuturesToBlockWritingOn) { + return new SerializationContext( + this.registry, this.dependencies, new Memoizer.Serializer(), allowFuturesToBlockWritingOn); + } + + /** + * Register a {@link ListenableFuture} that must complete successfully before the serialized bytes + * generated using this context can be written remotely. Failure of the future implies a bug or + * other unrecoverable error that should crash this JVM. + */ + public void addFutureToBlockWritingOn(ListenableFuture<Void> future) { + Preconditions.checkState(allowFuturesToBlockWritingOn, "This context cannot block on a future"); + if (futuresToBlockWritingOn == null) { + futuresToBlockWritingOn = new ArrayList<>(); + } + Futures.addCallback(future, crashTerminatingCallback, MoreExecutors.directExecutor()); + futuresToBlockWritingOn.add(future); + } + + private static final FutureCallback<Void> crashTerminatingCallback = + new FutureCallback<Void>() { + @Override + public void onSuccess(@Nullable Void result) { + // Do nothing. + } + + @Override + public void onFailure(Throwable t) { + throw BazelCrashUtils.halt(t); + } + }; + + /** + * Creates a future that succeeds when all futures stored in this context via {@link + * #addFutureToBlockWritingOn} have succeeded, or null if no such futures were stored. + */ + @Nullable + public ListenableFuture<Void> createFutureToBlockWritingOn() { + return futuresToBlockWritingOn != null + ? Futures.whenAllSucceed(futuresToBlockWritingOn) + .call(() -> null, MoreExecutors.directExecutor()) + : null; } private boolean writeNullOrConstant(@Nullable Object object, CodedOutputStream codedOut) diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java new file mode 100644 index 0000000000..90a1598700 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java @@ -0,0 +1,103 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.skyframe.serialization; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import javax.annotation.Nullable; + +/** + * A container class that holds an {@link #object} of type {@link T} and a possibly null {@link + * ListenableFuture}. If the {@link ListenableFuture} returned by {@link #getFutureToBlockWritesOn} + * is non-null, then, if {@link #object} is the serialized representation of some Bazel object, then + * it should not be written anywhere until the {@link ListenableFuture} in {@link + * #getFutureToBlockWritesOn} completes successfully. + * + * @param <T> Some serialized representation of an object, for instance a {@code byte[]} or a {@link + * ByteString}. + */ +public abstract class SerializationResult<T> { + private final T object; + + private SerializationResult(T object) { + this.object = object; + } + + /** + * Returns a new {@link SerializationResult} with the same future (if any) and {@code newObj} + * replacing the current {@link #getObject}. + */ + public abstract <S> SerializationResult<S> with(S newObj); + + /** + * Returns a {@link ListenableFuture} that, if not null, must complete successfully before {@link + * #getObject} can be written remotely. + */ + @Nullable + public abstract ListenableFuture<Void> getFutureToBlockWritesOn(); + + /** Returns the stored object that should not be written remotely before the future completes. */ + public T getObject() { + return object; + } + + static <T> SerializationResult<T> create( + T object, @Nullable ListenableFuture<Void> futureToBlockWritesOn) { + return futureToBlockWritesOn != null + ? new ObjectWithFuture<>(object, futureToBlockWritesOn) + : createWithoutFuture(object); + } + + /** Creates an {@link SerializationResult} with a null future (no waiting necessary. */ + public static <T> SerializationResult<T> createWithoutFuture(T object) { + return new ObjectWithoutFuture<>(object); + } + + private static class ObjectWithoutFuture<T> extends SerializationResult<T> { + ObjectWithoutFuture(T obj) { + super(obj); + } + + @Override + public <S> SerializationResult<S> with(S newObj) { + return new ObjectWithoutFuture<>(newObj); + } + + @Override + public ListenableFuture<Void> getFutureToBlockWritesOn() { + return null; + } + } + + private static class ObjectWithFuture<T> extends SerializationResult<T> { + private final ListenableFuture<Void> futureToBlockWritesOn; + + ObjectWithFuture(T obj, @Nullable ListenableFuture<Void> futureToBlockWritesOn) { + super(obj); + this.futureToBlockWritesOn = Preconditions.checkNotNull(futureToBlockWritesOn, obj); + } + + @Override + public <S> SerializationResult<S> with(S newObj) { + return new ObjectWithFuture<>(newObj, futureToBlockWritesOn); + } + + @Override + public ListenableFuture<Void> getFutureToBlockWritesOn() { + return futureToBlockWritesOn; + } + } +} |