aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
blob: bb1ca512d5201fc434e2b7d8f3fef72aaeb91d7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote;

import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Semaphore;

/**
 * A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files
 * and action output.
 *
 * <p>The thread safety is guaranteed by the underlying map.
 */
@ThreadSafe
public final class SimpleBlobStoreActionCache implements RemoteActionCache {
  private final SimpleBlobStore blobStore;
  private static final int MAX_MEMORY_KBYTES = 512 * 1024;
  private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true);

  public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) {
    this.blobStore = blobStore;
  }

  @Override
  public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
      throws IOException, InterruptedException {
    repository.computeMerkleDigests(root);
    for (FileNode fileNode : repository.treeToFileNodes(root)) {
      uploadBlob(fileNode.toByteArray());
    }
    // TODO(ulfjack): Only upload files that aren't in the CAS yet?
    for (TreeNode leaf : repository.leaves(root)) {
      uploadFileContents(leaf.getActionInput(), execRoot, repository.getInputFileCache());
    }
  }

  @Override
  public void downloadTree(ContentDigest rootDigest, Path rootLocation)
      throws IOException, CacheNotFoundException {
    FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest));
    if (fileNode.hasFileMetadata()) {
      FileMetadata meta = fileNode.getFileMetadata();
      downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable());
    }
    for (FileNode.Child child : fileNode.getChildList()) {
      downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath()));
    }
  }

  @Override
  public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
    // This unconditionally reads the whole file into memory first!
    return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray());
  }

  @Override
  public ContentDigest uploadFileContents(
      ActionInput input, Path execRoot, ActionInputFileCache inputCache)
      throws IOException, InterruptedException {
    // This unconditionally reads the whole file into memory first!
    if (input instanceof VirtualActionInput) {
      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
      ((VirtualActionInput) input).writeTo(buffer);
      byte[] blob = buffer.toByteArray();
      return uploadBlob(blob, ContentDigests.computeDigest(blob));
    }
    return uploadBlob(
        ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream())
            .toByteArray(),
        ContentDigests.getDigestFromInputCache(input, inputCache));
  }

  @Override
  public void downloadAllResults(ActionResult result, Path execRoot)
      throws IOException, CacheNotFoundException {
    for (Output output : result.getOutputList()) {
      if (output.getContentCase() == ContentCase.FILE_METADATA) {
        FileMetadata m = output.getFileMetadata();
        downloadFileContents(
            m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable());
      } else {
        downloadTree(output.getDigest(), execRoot.getRelative(output.getPath()));
      }
    }
  }

  @Override
  public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
      throws IOException, InterruptedException {
    for (Path file : files) {
      if (!file.exists()) {
        continue;
      }
      if (file.isDirectory()) {
        // TODO(olaola): to implement this for a directory, will need to create or pass a
        // TreeNodeRepository to call uploadTree.
        throw new UnsupportedOperationException("Storing a directory is not yet supported.");
      }
      // First put the file content to cache.
      ContentDigest digest = uploadFileContents(file);
      // Add to protobuf.
      result
          .addOutputBuilder()
          .setPath(file.relativeTo(execRoot).getPathString())
          .getFileMetadataBuilder()
          .setDigest(digest)
          .setExecutable(file.isExecutable());
    }
  }

  private void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
      throws IOException, CacheNotFoundException {
    // This unconditionally downloads the whole file into memory first!
    byte[] contents = downloadBlob(digest);
    FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
    try (OutputStream stream = dest.getOutputStream()) {
      stream.write(contents);
    }
    dest.setExecutable(executable);
  }

  @Override
  public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
      throws InterruptedException {
    ArrayList<ContentDigest> digests = new ArrayList<>();
    for (byte[] blob : blobs) {
      digests.add(uploadBlob(blob));
    }
    return ImmutableList.copyOf(digests);
  }

  private void checkBlobSize(long blobSizeKBytes, String type) {
    Preconditions.checkArgument(
        blobSizeKBytes < MAX_MEMORY_KBYTES,
        type + ": maximum blob size exceeded: %sK > %sK.",
        blobSizeKBytes,
        MAX_MEMORY_KBYTES);
  }

  @Override
  public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
    return uploadBlob(blob, ContentDigests.computeDigest(blob));
  }

  private ContentDigest uploadBlob(byte[] blob, ContentDigest digest) throws InterruptedException {
    int blobSizeKBytes = blob.length / 1024;
    checkBlobSize(blobSizeKBytes, "Upload");
    uploadMemoryAvailable.acquire(blobSizeKBytes);
    try {
      blobStore.put(ContentDigests.toHexString(digest), blob);
    } finally {
      uploadMemoryAvailable.release(blobSizeKBytes);
    }
    return digest;
  }

  @Override
  public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
    if (digest.getSizeBytes() == 0) {
      return new byte[0];
    }
    // This unconditionally downloads the whole blob into memory!
    checkBlobSize(digest.getSizeBytes() / 1024, "Download");
    byte[] data = blobStore.get(ContentDigests.toHexString(digest));
    if (data == null) {
      throw new CacheNotFoundException(digest);
    }
    return data;
  }

  @Override
  public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
      throws CacheNotFoundException {
    ArrayList<byte[]> blobs = new ArrayList<>();
    for (ContentDigest c : digests) {
      blobs.add(downloadBlob(c));
    }
    return ImmutableList.copyOf(blobs);
  }

  public boolean containsKey(ContentDigest digest) {
    return blobStore.containsKey(ContentDigests.toHexString(digest));
  }

  @Override
  public ActionResult getCachedActionResult(ActionKey actionKey) {
    byte[] data = blobStore.get(ContentDigests.toHexString(actionKey.getDigest()));
    if (data == null) {
      return null;
    }
    try {
      return ActionResult.parseFrom(data);
    } catch (InvalidProtocolBufferException e) {
      return null;
    }
  }

  @Override
  public void setCachedActionResult(ActionKey actionKey, ActionResult result)
      throws InterruptedException {
    blobStore.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray());
  }
}