aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/appengine/blobstore/read.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/appengine/blobstore/read.go')
-rw-r--r--vendor/google.golang.org/appengine/blobstore/read.go160
1 files changed, 160 insertions, 0 deletions
diff --git a/vendor/google.golang.org/appengine/blobstore/read.go b/vendor/google.golang.org/appengine/blobstore/read.go
new file mode 100644
index 0000000..578b1f5
--- /dev/null
+++ b/vendor/google.golang.org/appengine/blobstore/read.go
@@ -0,0 +1,160 @@
+// Copyright 2012 Google Inc. All rights reserved.
+// Use of this source code is governed by the Apache 2.0
+// license that can be found in the LICENSE file.
+
+package blobstore
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+
+ "google.golang.org/appengine"
+ "google.golang.org/appengine/internal"
+
+ blobpb "google.golang.org/appengine/internal/blobstore"
+)
+
+// openBlob returns a reader for a blob. It always succeeds; if the blob does
+// not exist then an error will be reported upon first read.
+func openBlob(c context.Context, blobKey appengine.BlobKey) Reader {
+ return &reader{
+ c: c,
+ blobKey: blobKey,
+ }
+}
+
+const readBufferSize = 256 * 1024
+
+// reader is a blob reader. It implements the Reader interface.
+type reader struct {
+ c context.Context
+
+ // Either blobKey or filename is set:
+ blobKey appengine.BlobKey
+ filename string
+
+ closeFunc func() // is nil if unavailable or already closed.
+
+ // buf is the read buffer. r is how much of buf has been read.
+ // off is the offset of buf[0] relative to the start of the blob.
+ // An invariant is 0 <= r && r <= len(buf).
+ // Reads that don't require an RPC call will increment r but not off.
+ // Seeks may modify r without discarding the buffer, but only if the
+ // invariant can be maintained.
+ mu sync.Mutex
+ buf []byte
+ r int
+ off int64
+}
+
+func (r *reader) Close() error {
+ if f := r.closeFunc; f != nil {
+ f()
+ }
+ r.closeFunc = nil
+ return nil
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ if len(p) == 0 {
+ return 0, nil
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.r == len(r.buf) {
+ if err := r.fetch(r.off + int64(r.r)); err != nil {
+ return 0, err
+ }
+ }
+ n := copy(p, r.buf[r.r:])
+ r.r += n
+ return n, nil
+}
+
+func (r *reader) ReadAt(p []byte, off int64) (int, error) {
+ if len(p) == 0 {
+ return 0, nil
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ // Convert relative offsets to absolute offsets.
+ ab0 := r.off + int64(r.r)
+ ab1 := r.off + int64(len(r.buf))
+ ap0 := off
+ ap1 := off + int64(len(p))
+ // Check if we can satisfy the read entirely out of the existing buffer.
+ if r.off <= ap0 && ap1 <= ab1 {
+ // Convert off from an absolute offset to a relative offset.
+ rp0 := int(ap0 - r.off)
+ return copy(p, r.buf[rp0:]), nil
+ }
+ // Restore the original Read/Seek offset after ReadAt completes.
+ defer r.seek(ab0)
+ // Repeatedly fetch and copy until we have filled p.
+ n := 0
+ for len(p) > 0 {
+ if err := r.fetch(off + int64(n)); err != nil {
+ return n, err
+ }
+ r.r = copy(p, r.buf)
+ n += r.r
+ p = p[r.r:]
+ }
+ return n, nil
+}
+
+func (r *reader) Seek(offset int64, whence int) (ret int64, err error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ switch whence {
+ case os.SEEK_SET:
+ ret = offset
+ case os.SEEK_CUR:
+ ret = r.off + int64(r.r) + offset
+ case os.SEEK_END:
+ return 0, errors.New("seeking relative to the end of a blob isn't supported")
+ default:
+ return 0, fmt.Errorf("invalid Seek whence value: %d", whence)
+ }
+ if ret < 0 {
+ return 0, errors.New("negative Seek offset")
+ }
+ return r.seek(ret)
+}
+
+// fetch fetches readBufferSize bytes starting at the given offset. On success,
+// the data is saved as r.buf.
+func (r *reader) fetch(off int64) error {
+ req := &blobpb.FetchDataRequest{
+ BlobKey: proto.String(string(r.blobKey)),
+ StartIndex: proto.Int64(off),
+ EndIndex: proto.Int64(off + readBufferSize - 1), // EndIndex is inclusive.
+ }
+ res := &blobpb.FetchDataResponse{}
+ if err := internal.Call(r.c, "blobstore", "FetchData", req, res); err != nil {
+ return err
+ }
+ if len(res.Data) == 0 {
+ return io.EOF
+ }
+ r.buf, r.r, r.off = res.Data, 0, off
+ return nil
+}
+
+// seek seeks to the given offset with an effective whence equal to SEEK_SET.
+// It discards the read buffer if the invariant cannot be maintained.
+func (r *reader) seek(off int64) (int64, error) {
+ delta := off - r.off
+ if delta >= 0 && delta < int64(len(r.buf)) {
+ r.r = int(delta)
+ return off, nil
+ }
+ r.buf, r.r, r.off = nil, 0, off
+ return off, nil
+}