// Copyright 2011 Google Inc. All rights reserved. // Use of this source code is governed by the Apache 2.0 // license that can be found in the LICENSE file. package datastore import ( "encoding/base64" "errors" "fmt" "math" "reflect" "strings" "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/appengine/internal" pb "google.golang.org/appengine/internal/datastore" ) type operator int const ( lessThan operator = iota lessEq equal greaterEq greaterThan ) var operatorToProto = map[operator]*pb.Query_Filter_Operator{ lessThan: pb.Query_Filter_LESS_THAN.Enum(), lessEq: pb.Query_Filter_LESS_THAN_OR_EQUAL.Enum(), equal: pb.Query_Filter_EQUAL.Enum(), greaterEq: pb.Query_Filter_GREATER_THAN_OR_EQUAL.Enum(), greaterThan: pb.Query_Filter_GREATER_THAN.Enum(), } // filter is a conditional filter on query results. type filter struct { FieldName string Op operator Value interface{} } type sortDirection int const ( ascending sortDirection = iota descending ) var sortDirectionToProto = map[sortDirection]*pb.Query_Order_Direction{ ascending: pb.Query_Order_ASCENDING.Enum(), descending: pb.Query_Order_DESCENDING.Enum(), } // order is a sort order on query results. type order struct { FieldName string Direction sortDirection } // NewQuery creates a new Query for a specific entity kind. // // An empty kind means to return all entities, including entities created and // managed by other App Engine features, and is called a kindless query. // Kindless queries cannot include filters or sort orders on property values. func NewQuery(kind string) *Query { return &Query{ kind: kind, limit: -1, } } // Query represents a datastore query. type Query struct { kind string ancestor *Key filter []filter order []order projection []string distinct bool keysOnly bool eventual bool limit int32 offset int32 start *pb.CompiledCursor end *pb.CompiledCursor err error } func (q *Query) clone() *Query { x := *q // Copy the contents of the slice-typed fields to a new backing store. if len(q.filter) > 0 { x.filter = make([]filter, len(q.filter)) copy(x.filter, q.filter) } if len(q.order) > 0 { x.order = make([]order, len(q.order)) copy(x.order, q.order) } return &x } // Ancestor returns a derivative query with an ancestor filter. // The ancestor should not be nil. func (q *Query) Ancestor(ancestor *Key) *Query { q = q.clone() if ancestor == nil { q.err = errors.New("datastore: nil query ancestor") return q } q.ancestor = ancestor return q } // EventualConsistency returns a derivative query that returns eventually // consistent results. // It only has an effect on ancestor queries. func (q *Query) EventualConsistency() *Query { q = q.clone() q.eventual = true return q } // Filter returns a derivative query with a field-based filter. // The filterStr argument must be a field name followed by optional space, // followed by an operator, one of ">", "<", ">=", "<=", or "=". // Fields are compared against the provided value using the operator. // Multiple filters are AND'ed together. func (q *Query) Filter(filterStr string, value interface{}) *Query { q = q.clone() filterStr = strings.TrimSpace(filterStr) if len(filterStr) < 1 { q.err = errors.New("datastore: invalid filter: " + filterStr) return q } f := filter{ FieldName: strings.TrimRight(filterStr, " ><=!"), Value: value, } switch op := strings.TrimSpace(filterStr[len(f.FieldName):]); op { case "<=": f.Op = lessEq case ">=": f.Op = greaterEq case "<": f.Op = lessThan case ">": f.Op = greaterThan case "=": f.Op = equal default: q.err = fmt.Errorf("datastore: invalid operator %q in filter %q", op, filterStr) return q } q.filter = append(q.filter, f) return q } // Order returns a derivative query with a field-based sort order. Orders are // applied in the order they are added. The default order is ascending; to sort // in descending order prefix the fieldName with a minus sign (-). func (q *Query) Order(fieldName string) *Query { q = q.clone() fieldName = strings.TrimSpace(fieldName) o := order{ Direction: ascending, FieldName: fieldName, } if strings.HasPrefix(fieldName, "-") { o.Direction = descending o.FieldName = strings.TrimSpace(fieldName[1:]) } else if strings.HasPrefix(fieldName, "+") { q.err = fmt.Errorf("datastore: invalid order: %q", fieldName) return q } if len(o.FieldName) == 0 { q.err = errors.New("datastore: empty order") return q } q.order = append(q.order, o) return q } // Project returns a derivative query that yields only the given fields. It // cannot be used with KeysOnly. func (q *Query) Project(fieldNames ...string) *Query { q = q.clone() q.projection = append([]string(nil), fieldNames...) return q } // Distinct returns a derivative query that yields de-duplicated entities with // respect to the set of projected fields. It is only used for projection // queries. func (q *Query) Distinct() *Query { q = q.clone() q.distinct = true return q } // KeysOnly returns a derivative query that yields only keys, not keys and // entities. It cannot be used with projection queries. func (q *Query) KeysOnly() *Query { q = q.clone() q.keysOnly = true return q } // Limit returns a derivative query that has a limit on the number of results // returned. A negative value means unlimited. func (q *Query) Limit(limit int) *Query { q = q.clone() if limit < math.MinInt32 || limit > math.MaxInt32 { q.err = errors.New("datastore: query limit overflow") return q } q.limit = int32(limit) return q } // Offset returns a derivative query that has an offset of how many keys to // skip over before returning results. A negative value is invalid. func (q *Query) Offset(offset int) *Query { q = q.clone() if offset < 0 { q.err = errors.New("datastore: negative query offset") return q } if offset > math.MaxInt32 { q.err = errors.New("datastore: query offset overflow") return q } q.offset = int32(offset) return q } // Start returns a derivative query with the given start point. func (q *Query) Start(c Cursor) *Query { q = q.clone() if c.cc == nil { q.err = errors.New("datastore: invalid cursor") return q } q.start = c.cc return q } // End returns a derivative query with the given end point. func (q *Query) End(c Cursor) *Query { q = q.clone() if c.cc == nil { q.err = errors.New("datastore: invalid cursor") return q } q.end = c.cc return q } // toProto converts the query to a protocol buffer. func (q *Query) toProto(dst *pb.Query, appID string) error { if len(q.projection) != 0 && q.keysOnly { return errors.New("datastore: query cannot both project and be keys-only") } dst.Reset() dst.App = proto.String(appID) if q.kind != "" { dst.Kind = proto.String(q.kind) } if q.ancestor != nil { dst.Ancestor = keyToProto(appID, q.ancestor) if q.eventual { dst.Strong = proto.Bool(false) } } if q.projection != nil { dst.PropertyName = q.projection if q.distinct { dst.GroupByPropertyName = q.projection } } if q.keysOnly { dst.KeysOnly = proto.Bool(true) dst.RequirePerfectPlan = proto.Bool(true) } for _, qf := range q.filter { if qf.FieldName == "" { return errors.New("datastore: empty query filter field name") } p, errStr := valueToProto(appID, qf.FieldName, reflect.ValueOf(qf.Value), false) if errStr != "" { return errors.New("datastore: bad query filter value type: " + errStr) } xf := &pb.Query_Filter{ Op: operatorToProto[qf.Op], Property: []*pb.Property{p}, } if xf.Op == nil { return errors.New("datastore: unknown query filter operator") } dst.Filter = append(dst.Filter, xf) } for _, qo := range q.order { if qo.FieldName == "" { return errors.New("datastore: empty query order field name") } xo := &pb.Query_Order{ Property: proto.String(qo.FieldName), Direction: sortDirectionToProto[qo.Direction], } if xo.Direction == nil { return errors.New("datastore: unknown query order direction") } dst.Order = append(dst.Order, xo) } if q.limit >= 0 { dst.Limit = proto.Int32(q.limit) } if q.offset != 0 { dst.Offset = proto.Int32(q.offset) } dst.CompiledCursor = q.start dst.EndCompiledCursor = q.end dst.Compile = proto.Bool(true) return nil } // Count returns the number of results for the query. // // The running time and number of API calls made by Count scale linearly with // the sum of the query's offset and limit. Unless the result count is // expected to be small, it is best to specify a limit; otherwise Count will // continue until it finishes counting or the provided context expires. func (q *Query) Count(c context.Context) (int, error) { // Check that the query is well-formed. if q.err != nil { return 0, q.err } // Run a copy of the query, with keysOnly true (if we're not a projection, // since the two are incompatible), and an adjusted offset. We also set the // limit to zero, as we don't want any actual entity data, just the number // of skipped results. newQ := q.clone() newQ.keysOnly = len(newQ.projection) == 0 newQ.limit = 0 if q.limit < 0 { // If the original query was unlimited, set the new query's offset to maximum. newQ.offset = math.MaxInt32 } else { newQ.offset = q.offset + q.limit if newQ.offset < 0 { // Do the best we can, in the presence of overflow. newQ.offset = math.MaxInt32 } } req := &pb.Query{} if err := newQ.toProto(req, internal.FullyQualifiedAppID(c)); err != nil { return 0, err } res := &pb.QueryResult{} if err := internal.Call(c, "datastore_v3", "RunQuery", req, res); err != nil { return 0, err } // n is the count we will return. For example, suppose that our original // query had an offset of 4 and a limit of 2008: the count will be 2008, // provided that there are at least 2012 matching entities. However, the // RPCs will only skip 1000 results at a time. The RPC sequence is: // call RunQuery with (offset, limit) = (2012, 0) // 2012 == newQ.offset // response has (skippedResults, moreResults) = (1000, true) // n += 1000 // n == 1000 // call Next with (offset, limit) = (1012, 0) // 1012 == newQ.offset - n // response has (skippedResults, moreResults) = (1000, true) // n += 1000 // n == 2000 // call Next with (offset, limit) = (12, 0) // 12 == newQ.offset - n // response has (skippedResults, moreResults) = (12, false) // n += 12 // n == 2012 // // exit the loop // n -= 4 // n == 2008 var n int32 for { // The QueryResult should have no actual entity data, just skipped results. if len(res.Result) != 0 { return 0, errors.New("datastore: internal error: Count request returned too much data") } n += res.GetSkippedResults() if !res.GetMoreResults() { break } if err := callNext(c, res, newQ.offset-n, 0); err != nil { return 0, err } } n -= q.offset if n < 0 { // If the offset was greater than the number of matching entities, // return 0 instead of negative. n = 0 } return int(n), nil } // callNext issues a datastore_v3/Next RPC to advance a cursor, such as that // returned by a query with more results. func callNext(c context.Context, res *pb.QueryResult, offset, limit int32) error { if res.Cursor == nil { return errors.New("datastore: internal error: server did not return a cursor") } req := &pb.NextRequest{ Cursor: res.Cursor, } if limit >= 0 { req.Count = proto.Int32(limit) } if offset != 0 { req.Offset = proto.Int32(offset) } if res.CompiledCursor != nil { req.Compile = proto.Bool(true) } res.Reset() return internal.Call(c, "datastore_v3", "Next", req, res) } // GetAll runs the query in the given context and returns all keys that match // that query, as well as appending the values to dst. // // dst must have type *[]S or *[]*S or *[]P, for some struct type S or some non- // interface, non-pointer type P such that P or *P implements PropertyLoadSaver. // // As a special case, *PropertyList is an invalid type for dst, even though a // PropertyList is a slice of structs. It is treated as invalid to avoid being // mistakenly passed when *[]PropertyList was intended. // // The keys returned by GetAll will be in a 1-1 correspondence with the entities // added to dst. // // If q is a ``keys-only'' query, GetAll ignores dst and only returns the keys. // // The running time and number of API calls made by GetAll scale linearly with // with the sum of the query's offset and limit. Unless the result count is // expected to be small, it is best to specify a limit; otherwise GetAll will // continue until it finishes collecting results or the provided context // expires. func (q *Query) GetAll(c context.Context, dst interface{}) ([]*Key, error) { var ( dv reflect.Value mat multiArgType elemType reflect.Type errFieldMismatch error ) if !q.keysOnly { dv = reflect.ValueOf(dst) if dv.Kind() != reflect.Ptr || dv.IsNil() { return nil, ErrInvalidEntityType } dv = dv.Elem() mat, elemType = checkMultiArg(dv) if mat == multiArgTypeInvalid || mat == multiArgTypeInterface { return nil, ErrInvalidEntityType } } var keys []*Key for t := q.Run(c); ; { k, e, err := t.next() if err == Done { break } if err != nil { return keys, err } if !q.keysOnly { ev := reflect.New(elemType) if elemType.Kind() == reflect.Map { // This is a special case. The zero values of a map type are // not immediately useful; they have to be make'd. // // Funcs and channels are similar, in that a zero value is not useful, // but even a freshly make'd channel isn't useful: there's no fixed // channel buffer size that is always going to be large enough, and // there's no goroutine to drain the other end. Theoretically, these // types could be supported, for example by sniffing for a constructor // method or requiring prior registration, but for now it's not a // frequent enough concern to be worth it. Programmers can work around // it by explicitly using Iterator.Next instead of the Query.GetAll // convenience method. x := reflect.MakeMap(elemType) ev.Elem().Set(x) } if err = loadEntity(ev.Interface(), e); err != nil { if _, ok := err.(*ErrFieldMismatch); ok { // We continue loading entities even in the face of field mismatch errors. // If we encounter any other error, that other error is returned. Otherwise, // an ErrFieldMismatch is returned. errFieldMismatch = err } else { return keys, err } } if mat != multiArgTypeStructPtr { ev = ev.Elem() } dv.Set(reflect.Append(dv, ev)) } keys = append(keys, k) } return keys, errFieldMismatch } // Run runs the query in the given context. func (q *Query) Run(c context.Context) *Iterator { if q.err != nil { return &Iterator{err: q.err} } t := &Iterator{ c: c, limit: q.limit, q: q, prevCC: q.start, } var req pb.Query if err := q.toProto(&req, internal.FullyQualifiedAppID(c)); err != nil { t.err = err return t } if err := internal.Call(c, "datastore_v3", "RunQuery", &req, &t.res); err != nil { t.err = err return t } offset := q.offset - t.res.GetSkippedResults() for offset > 0 && t.res.GetMoreResults() { t.prevCC = t.res.CompiledCursor if err := callNext(t.c, &t.res, offset, t.limit); err != nil { t.err = err break } skip := t.res.GetSkippedResults() if skip < 0 { t.err = errors.New("datastore: internal error: negative number of skipped_results") break } offset -= skip } if offset < 0 { t.err = errors.New("datastore: internal error: query offset was overshot") } return t } // Iterator is the result of running a query. type Iterator struct { c context.Context err error // res is the result of the most recent RunQuery or Next API call. res pb.QueryResult // i is how many elements of res.Result we have iterated over. i int // limit is the limit on the number of results this iterator should return. // A negative value means unlimited. limit int32 // q is the original query which yielded this iterator. q *Query // prevCC is the compiled cursor that marks the end of the previous batch // of results. prevCC *pb.CompiledCursor } // Done is returned when a query iteration has completed. var Done = errors.New("datastore: query has no more results") // Next returns the key of the next result. When there are no more results, // Done is returned as the error. // // If the query is not keys only and dst is non-nil, it also loads the entity // stored for that key into the struct pointer or PropertyLoadSaver dst, with // the same semantics and possible errors as for the Get function. func (t *Iterator) Next(dst interface{}) (*Key, error) { k, e, err := t.next() if err != nil { return nil, err } if dst != nil && !t.q.keysOnly { err = loadEntity(dst, e) } return k, err } func (t *Iterator) next() (*Key, *pb.EntityProto, error) { if t.err != nil { return nil, nil, t.err } // Issue datastore_v3/Next RPCs as necessary. for t.i == len(t.res.Result) { if !t.res.GetMoreResults() { t.err = Done return nil, nil, t.err } t.prevCC = t.res.CompiledCursor if err := callNext(t.c, &t.res, 0, t.limit); err != nil { t.err = err return nil, nil, t.err } if t.res.GetSkippedResults() != 0 { t.err = errors.New("datastore: internal error: iterator has skipped results") return nil, nil, t.err } t.i = 0 if t.limit >= 0 { t.limit -= int32(len(t.res.Result)) if t.limit < 0 { t.err = errors.New("datastore: internal error: query returned more results than the limit") return nil, nil, t.err } } } // Extract the key from the t.i'th element of t.res.Result. e := t.res.Result[t.i] t.i++ if e.Key == nil { return nil, nil, errors.New("datastore: internal error: server did not return a key") } k, err := protoToKey(e.Key) if err != nil || k.Incomplete() { return nil, nil, errors.New("datastore: internal error: server returned an invalid key") } return k, e, nil } // Cursor returns a cursor for the iterator's current location. func (t *Iterator) Cursor() (Cursor, error) { if t.err != nil && t.err != Done { return Cursor{}, t.err } // If we are at either end of the current batch of results, // return the compiled cursor at that end. skipped := t.res.GetSkippedResults() if t.i == 0 && skipped == 0 { if t.prevCC == nil { // A nil pointer (of type *pb.CompiledCursor) means no constraint: // passing it as the end cursor of a new query means unlimited results // (glossing over the integer limit parameter for now). // A non-nil pointer to an empty pb.CompiledCursor means the start: // passing it as the end cursor of a new query means 0 results. // If prevCC was nil, then the original query had no start cursor, but // Iterator.Cursor should return "the start" instead of unlimited. return Cursor{&zeroCC}, nil } return Cursor{t.prevCC}, nil } if t.i == len(t.res.Result) { return Cursor{t.res.CompiledCursor}, nil } // Otherwise, re-run the query offset to this iterator's position, starting from // the most recent compiled cursor. This is done on a best-effort basis, as it // is racy; if a concurrent process has added or removed entities, then the // cursor returned may be inconsistent. q := t.q.clone() q.start = t.prevCC q.offset = skipped + int32(t.i) q.limit = 0 q.keysOnly = len(q.projection) == 0 t1 := q.Run(t.c) _, _, err := t1.next() if err != Done { if err == nil { err = fmt.Errorf("datastore: internal error: zero-limit query did not have zero results") } return Cursor{}, err } return Cursor{t1.res.CompiledCursor}, nil } var zeroCC pb.CompiledCursor // Cursor is an iterator's position. It can be converted to and from an opaque // string. A cursor can be used from different HTTP requests, but only with a // query with the same kind, ancestor, filter and order constraints. type Cursor struct { cc *pb.CompiledCursor } // String returns a base-64 string representation of a cursor. func (c Cursor) String() string { if c.cc == nil { return "" } b, err := proto.Marshal(c.cc) if err != nil { // The only way to construct a Cursor with a non-nil cc field is to // unmarshal from the byte representation. We panic if the unmarshal // succeeds but the marshaling of the unchanged protobuf value fails. panic(fmt.Sprintf("datastore: internal error: malformed cursor: %v", err)) } return strings.TrimRight(base64.URLEncoding.EncodeToString(b), "=") } // Decode decodes a cursor from its base-64 string representation. func DecodeCursor(s string) (Cursor, error) { if s == "" { return Cursor{&zeroCC}, nil } if n := len(s) % 4; n != 0 { s += strings.Repeat("=", 4-n) } b, err := base64.URLEncoding.DecodeString(s) if err != nil { return Cursor{}, err } cc := &pb.CompiledCursor{} if err := proto.Unmarshal(b, cc); err != nil { return Cursor{}, err } return Cursor{cc}, nil }