aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/appengine/blobstore/read.go
blob: 578b1f550ad16b7962dd0e159eed5dba2f67f687 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2012 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.

package blobstore

import (
	"errors"
	"fmt"
	"io"
	"os"
	"sync"

	"github.com/golang/protobuf/proto"
	"golang.org/x/net/context"

	"google.golang.org/appengine"
	"google.golang.org/appengine/internal"

	blobpb "google.golang.org/appengine/internal/blobstore"
)

// openBlob returns a reader for a blob. It always succeeds; if the blob does
// not exist then an error will be reported upon first read.
func openBlob(c context.Context, blobKey appengine.BlobKey) Reader {
	return &reader{
		c:       c,
		blobKey: blobKey,
	}
}

const readBufferSize = 256 * 1024

// reader is a blob reader. It implements the Reader interface.
type reader struct {
	c context.Context

	// Either blobKey or filename is set:
	blobKey  appengine.BlobKey
	filename string

	closeFunc func() // is nil if unavailable or already closed.

	// buf is the read buffer. r is how much of buf has been read.
	// off is the offset of buf[0] relative to the start of the blob.
	// An invariant is 0 <= r && r <= len(buf).
	// Reads that don't require an RPC call will increment r but not off.
	// Seeks may modify r without discarding the buffer, but only if the
	// invariant can be maintained.
	mu  sync.Mutex
	buf []byte
	r   int
	off int64
}

func (r *reader) Close() error {
	if f := r.closeFunc; f != nil {
		f()
	}
	r.closeFunc = nil
	return nil
}

func (r *reader) Read(p []byte) (int, error) {
	if len(p) == 0 {
		return 0, nil
	}
	r.mu.Lock()
	defer r.mu.Unlock()
	if r.r == len(r.buf) {
		if err := r.fetch(r.off + int64(r.r)); err != nil {
			return 0, err
		}
	}
	n := copy(p, r.buf[r.r:])
	r.r += n
	return n, nil
}

func (r *reader) ReadAt(p []byte, off int64) (int, error) {
	if len(p) == 0 {
		return 0, nil
	}
	r.mu.Lock()
	defer r.mu.Unlock()
	// Convert relative offsets to absolute offsets.
	ab0 := r.off + int64(r.r)
	ab1 := r.off + int64(len(r.buf))
	ap0 := off
	ap1 := off + int64(len(p))
	// Check if we can satisfy the read entirely out of the existing buffer.
	if r.off <= ap0 && ap1 <= ab1 {
		// Convert off from an absolute offset to a relative offset.
		rp0 := int(ap0 - r.off)
		return copy(p, r.buf[rp0:]), nil
	}
	// Restore the original Read/Seek offset after ReadAt completes.
	defer r.seek(ab0)
	// Repeatedly fetch and copy until we have filled p.
	n := 0
	for len(p) > 0 {
		if err := r.fetch(off + int64(n)); err != nil {
			return n, err
		}
		r.r = copy(p, r.buf)
		n += r.r
		p = p[r.r:]
	}
	return n, nil
}

func (r *reader) Seek(offset int64, whence int) (ret int64, err error) {
	r.mu.Lock()
	defer r.mu.Unlock()
	switch whence {
	case os.SEEK_SET:
		ret = offset
	case os.SEEK_CUR:
		ret = r.off + int64(r.r) + offset
	case os.SEEK_END:
		return 0, errors.New("seeking relative to the end of a blob isn't supported")
	default:
		return 0, fmt.Errorf("invalid Seek whence value: %d", whence)
	}
	if ret < 0 {
		return 0, errors.New("negative Seek offset")
	}
	return r.seek(ret)
}

// fetch fetches readBufferSize bytes starting at the given offset. On success,
// the data is saved as r.buf.
func (r *reader) fetch(off int64) error {
	req := &blobpb.FetchDataRequest{
		BlobKey:    proto.String(string(r.blobKey)),
		StartIndex: proto.Int64(off),
		EndIndex:   proto.Int64(off + readBufferSize - 1), // EndIndex is inclusive.
	}
	res := &blobpb.FetchDataResponse{}
	if err := internal.Call(r.c, "blobstore", "FetchData", req, res); err != nil {
		return err
	}
	if len(res.Data) == 0 {
		return io.EOF
	}
	r.buf, r.r, r.off = res.Data, 0, off
	return nil
}

// seek seeks to the given offset with an effective whence equal to SEEK_SET.
// It discards the read buffer if the invariant cannot be maintained.
func (r *reader) seek(off int64) (int64, error) {
	delta := off - r.off
	if delta >= 0 && delta < int64(len(r.buf)) {
		r.r = int(delta)
		return off, nil
	}
	r.buf, r.r, r.off = nil, 0, off
	return off, nil
}