aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/skyframe/serialization
diff options
context:
space:
mode:
authorGravatar janakr <janakr@google.com>2018-04-30 13:35:22 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-04-30 13:36:40 -0700
commitbdd4eb2be39adb92dda0bb91cb6f2a32419e8694 (patch)
treefcb43740a40ea5b65a14fb139fb6f1b37a6296cd /src/main/java/com/google/devtools/build/lib/skyframe/serialization
parent968f87900dce45a7af749a965b72dbac51b176b3 (diff)
Add ability for serialization to inform the SerializationContext that any remote write should block on the provided future.
PiperOrigin-RevId: 194836516
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/skyframe/serialization')
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD2
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java23
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java81
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java103
4 files changed, 193 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD
index a2fe0b53ac..93412b31be 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/BUILD
@@ -15,7 +15,7 @@ java_library(
exclude = ["SerializationConstants.java"],
),
deps = [
- ":constants",
+ "//src/main/java/com/google/devtools/build/lib:crash-utils",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:registered-singleton",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/autocodec:unsafe-provider",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java
index 557ed17255..5f81394129 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/ObjectCodecs.java
@@ -47,16 +47,26 @@ public class ObjectCodecs {
}
public void serialize(Object subject, CodedOutputStream codedOut) throws SerializationException {
- serializeImpl(subject, codedOut, /*memoize=*/ false);
+ serializeImpl(subject, codedOut, serializationContext);
}
public ByteString serializeMemoized(Object subject) throws SerializationException {
return serializeToByteString(subject, this::serializeMemoized);
}
+ public SerializationResult<ByteString> serializeMemoizedAndBlocking(Object subject)
+ throws SerializationException {
+ SerializationContext memoizingContext =
+ serializationContext.getMemoizingAndBlockingOnWriteContext();
+ ByteString byteString =
+ serializeToByteString(
+ subject, (subj, codedOut) -> serializeImpl(subj, codedOut, memoizingContext));
+ return SerializationResult.create(byteString, memoizingContext.createFutureToBlockWritingOn());
+ }
+
public void serializeMemoized(Object subject, CodedOutputStream codedOut)
throws SerializationException {
- serializeImpl(subject, codedOut, /*memoize=*/ true);
+ serializeImpl(subject, codedOut, serializationContext.getMemoizingContext());
}
public Object deserialize(ByteString data) throws SerializationException {
@@ -75,14 +85,11 @@ public class ObjectCodecs {
return deserializeImpl(codedIn, /*memoize=*/ true);
}
- private void serializeImpl(Object subject, CodedOutputStream codedOut, boolean memoize)
+ private void serializeImpl(
+ Object subject, CodedOutputStream codedOut, SerializationContext serializationContext)
throws SerializationException {
try {
- if (memoize) {
- serializationContext.getMemoizingContext().serialize(subject, codedOut);
- } else {
- serializationContext.serialize(subject, codedOut);
- }
+ serializationContext.serialize(subject, codedOut);
} catch (IOException e) {
throw new SerializationException("Failed to serialize " + subject, e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java
index 1fe8555944..2c19c6b8dc 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationContext.java
@@ -17,37 +17,51 @@ package com.google.devtools.build.lib.skyframe.serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.skyframe.serialization.Memoizer.Serializer;
import com.google.devtools.build.lib.skyframe.serialization.SerializationException.NoCodecException;
+import com.google.devtools.build.lib.util.BazelCrashUtils;
import com.google.protobuf.CodedOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
/**
* Stateful class for providing additional context to a single serialization "session". This class
- * is thread-safe so long as {@link #serializer} is null. If it is not null, this class is not
- * thread-safe and should only be accessed on a single thread for serializing one object (that may
- * involve serializing other objects contained in it).
+ * is thread-safe so long as {@link #serializer} is null (which also implies that {@link
+ * #allowFuturesToBlockWritingOn) is false). If it is not null, this class is not thread-safe and
+ * should only be accessed on a single thread for serializing one object (that may involve
+ * serializing other objects contained in it).
*/
public class SerializationContext {
private final ObjectCodecRegistry registry;
private final ImmutableMap<Class<?>, Object> dependencies;
@Nullable private final Memoizer.Serializer serializer;
+ /** Initialized lazily. */
+ @Nullable private List<ListenableFuture<Void>> futuresToBlockWritingOn;
+
+ private final boolean allowFuturesToBlockWritingOn;
private SerializationContext(
ObjectCodecRegistry registry,
ImmutableMap<Class<?>, Object> dependencies,
- @Nullable Serializer serializer) {
+ @Nullable Serializer serializer,
+ boolean allowFuturesToBlockWritingOn) {
this.registry = registry;
this.dependencies = dependencies;
this.serializer = serializer;
+ this.allowFuturesToBlockWritingOn = allowFuturesToBlockWritingOn;
}
@VisibleForTesting
public SerializationContext(
ObjectCodecRegistry registry, ImmutableMap<Class<?>, Object> dependencies) {
- this(registry, dependencies, /*serializer=*/ null);
+ this(registry, dependencies, /*serializer=*/ null, /*allowFuturesToBlockWritingOn=*/ false);
}
@VisibleForTesting
@@ -92,7 +106,16 @@ public class SerializationContext {
if (serializer != null) {
return this;
}
- return getNewMemoizingContext();
+ return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ false);
+ }
+
+ @CheckReturnValue
+ SerializationContext getMemoizingAndBlockingOnWriteContext() {
+ Preconditions.checkState(
+ serializer == null, "Should only be called on base serializationContext");
+ Preconditions.checkState(
+ !allowFuturesToBlockWritingOn, "Should only be called on base serializationContext");
+ return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ true);
}
/**
@@ -100,7 +123,51 @@ public class SerializationContext {
* getMemoizingContext, this method is not idempotent - the returned context will always be fresh.
*/
public SerializationContext getNewMemoizingContext() {
- return new SerializationContext(this.registry, this.dependencies, new Memoizer.Serializer());
+ return getNewMemoizingContext(/*allowFuturesToBlockWritingOn=*/ false);
+ }
+
+ private SerializationContext getNewMemoizingContext(boolean allowFuturesToBlockWritingOn) {
+ return new SerializationContext(
+ this.registry, this.dependencies, new Memoizer.Serializer(), allowFuturesToBlockWritingOn);
+ }
+
+ /**
+ * Register a {@link ListenableFuture} that must complete successfully before the serialized bytes
+ * generated using this context can be written remotely. Failure of the future implies a bug or
+ * other unrecoverable error that should crash this JVM.
+ */
+ public void addFutureToBlockWritingOn(ListenableFuture<Void> future) {
+ Preconditions.checkState(allowFuturesToBlockWritingOn, "This context cannot block on a future");
+ if (futuresToBlockWritingOn == null) {
+ futuresToBlockWritingOn = new ArrayList<>();
+ }
+ Futures.addCallback(future, crashTerminatingCallback, MoreExecutors.directExecutor());
+ futuresToBlockWritingOn.add(future);
+ }
+
+ private static final FutureCallback<Void> crashTerminatingCallback =
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ // Do nothing.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ throw BazelCrashUtils.halt(t);
+ }
+ };
+
+ /**
+ * Creates a future that succeeds when all futures stored in this context via {@link
+ * #addFutureToBlockWritingOn} have succeeded, or null if no such futures were stored.
+ */
+ @Nullable
+ public ListenableFuture<Void> createFutureToBlockWritingOn() {
+ return futuresToBlockWritingOn != null
+ ? Futures.whenAllSucceed(futuresToBlockWritingOn)
+ .call(() -> null, MoreExecutors.directExecutor())
+ : null;
}
private boolean writeNullOrConstant(@Nullable Object object, CodedOutputStream codedOut)
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java
new file mode 100644
index 0000000000..90a1598700
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/serialization/SerializationResult.java
@@ -0,0 +1,103 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.skyframe.serialization;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import javax.annotation.Nullable;
+
+/**
+ * A container class that holds an {@link #object} of type {@link T} and a possibly null {@link
+ * ListenableFuture}. If the {@link ListenableFuture} returned by {@link #getFutureToBlockWritesOn}
+ * is non-null, then, if {@link #object} is the serialized representation of some Bazel object, then
+ * it should not be written anywhere until the {@link ListenableFuture} in {@link
+ * #getFutureToBlockWritesOn} completes successfully.
+ *
+ * @param <T> Some serialized representation of an object, for instance a {@code byte[]} or a {@link
+ * ByteString}.
+ */
+public abstract class SerializationResult<T> {
+ private final T object;
+
+ private SerializationResult(T object) {
+ this.object = object;
+ }
+
+ /**
+ * Returns a new {@link SerializationResult} with the same future (if any) and {@code newObj}
+ * replacing the current {@link #getObject}.
+ */
+ public abstract <S> SerializationResult<S> with(S newObj);
+
+ /**
+ * Returns a {@link ListenableFuture} that, if not null, must complete successfully before {@link
+ * #getObject} can be written remotely.
+ */
+ @Nullable
+ public abstract ListenableFuture<Void> getFutureToBlockWritesOn();
+
+ /** Returns the stored object that should not be written remotely before the future completes. */
+ public T getObject() {
+ return object;
+ }
+
+ static <T> SerializationResult<T> create(
+ T object, @Nullable ListenableFuture<Void> futureToBlockWritesOn) {
+ return futureToBlockWritesOn != null
+ ? new ObjectWithFuture<>(object, futureToBlockWritesOn)
+ : createWithoutFuture(object);
+ }
+
+ /** Creates an {@link SerializationResult} with a null future (no waiting necessary. */
+ public static <T> SerializationResult<T> createWithoutFuture(T object) {
+ return new ObjectWithoutFuture<>(object);
+ }
+
+ private static class ObjectWithoutFuture<T> extends SerializationResult<T> {
+ ObjectWithoutFuture(T obj) {
+ super(obj);
+ }
+
+ @Override
+ public <S> SerializationResult<S> with(S newObj) {
+ return new ObjectWithoutFuture<>(newObj);
+ }
+
+ @Override
+ public ListenableFuture<Void> getFutureToBlockWritesOn() {
+ return null;
+ }
+ }
+
+ private static class ObjectWithFuture<T> extends SerializationResult<T> {
+ private final ListenableFuture<Void> futureToBlockWritesOn;
+
+ ObjectWithFuture(T obj, @Nullable ListenableFuture<Void> futureToBlockWritesOn) {
+ super(obj);
+ this.futureToBlockWritesOn = Preconditions.checkNotNull(futureToBlockWritesOn, obj);
+ }
+
+ @Override
+ public <S> SerializationResult<S> with(S newObj) {
+ return new ObjectWithFuture<>(newObj, futureToBlockWritesOn);
+ }
+
+ @Override
+ public ListenableFuture<Void> getFutureToBlockWritesOn() {
+ return futureToBlockWritesOn;
+ }
+ }
+}