aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/appengine/taskqueue/taskqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/appengine/taskqueue/taskqueue.go')
-rw-r--r--vendor/google.golang.org/appengine/taskqueue/taskqueue.go496
1 files changed, 496 insertions, 0 deletions
diff --git a/vendor/google.golang.org/appengine/taskqueue/taskqueue.go b/vendor/google.golang.org/appengine/taskqueue/taskqueue.go
new file mode 100644
index 0000000..9b62fac
--- /dev/null
+++ b/vendor/google.golang.org/appengine/taskqueue/taskqueue.go
@@ -0,0 +1,496 @@
+// Copyright 2011 Google Inc. All rights reserved.
+// Use of this source code is governed by the Apache 2.0
+// license that can be found in the LICENSE file.
+
+/*
+Package taskqueue provides a client for App Engine's taskqueue service.
+Using this service, applications may perform work outside a user's request.
+
+A Task may be constructed manually; alternatively, since the most common
+taskqueue operation is to add a single POST task, NewPOSTTask makes it easy.
+
+ t := taskqueue.NewPOSTTask("/worker", url.Values{
+ "key": {key},
+ })
+ taskqueue.Add(c, t, "") // add t to the default queue
+*/
+package taskqueue // import "google.golang.org/appengine/taskqueue"
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+
+ "google.golang.org/appengine"
+ "google.golang.org/appengine/internal"
+ dspb "google.golang.org/appengine/internal/datastore"
+ pb "google.golang.org/appengine/internal/taskqueue"
+)
+
+var (
+ // ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name.
+ ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added")
+)
+
+// RetryOptions let you control whether to retry a task and the backoff intervals between tries.
+type RetryOptions struct {
+ // Number of tries/leases after which the task fails permanently and is deleted.
+ // If AgeLimit is also set, both limits must be exceeded for the task to fail permanently.
+ RetryLimit int32
+
+ // Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks).
+ // If RetryLimit is also set, both limits must be exceeded for the task to fail permanently.
+ AgeLimit time.Duration
+
+ // Minimum time between successive tries (only for push tasks).
+ MinBackoff time.Duration
+
+ // Maximum time between successive tries (only for push tasks).
+ MaxBackoff time.Duration
+
+ // Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks).
+ MaxDoublings int32
+
+ // If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value.
+ // Otherwise a zero MaxDoublings is ignored and the default is used.
+ ApplyZeroMaxDoublings bool
+}
+
+// toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters.
+func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters {
+ params := &pb.TaskQueueRetryParameters{}
+ if opt.RetryLimit > 0 {
+ params.RetryLimit = proto.Int32(opt.RetryLimit)
+ }
+ if opt.AgeLimit > 0 {
+ params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds()))
+ }
+ if opt.MinBackoff > 0 {
+ params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds())
+ }
+ if opt.MaxBackoff > 0 {
+ params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds())
+ }
+ if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) {
+ params.MaxDoublings = proto.Int32(opt.MaxDoublings)
+ }
+ return params
+}
+
+// A Task represents a task to be executed.
+type Task struct {
+ // Path is the worker URL for the task.
+ // If unset, it will default to /_ah/queue/<queue_name>.
+ Path string
+
+ // Payload is the data for the task.
+ // This will be delivered as the HTTP request body.
+ // It is only used when Method is POST, PUT or PULL.
+ // url.Values' Encode method may be used to generate this for POST requests.
+ Payload []byte
+
+ // Additional HTTP headers to pass at the task's execution time.
+ // To schedule the task to be run with an alternate app version
+ // or backend, set the "Host" header.
+ Header http.Header
+
+ // Method is the HTTP method for the task ("GET", "POST", etc.),
+ // or "PULL" if this is task is destined for a pull-based queue.
+ // If empty, this defaults to "POST".
+ Method string
+
+ // A name for the task.
+ // If empty, a name will be chosen.
+ Name string
+
+ // Delay specifies the duration the task queue service must wait
+ // before executing the task.
+ // Either Delay or ETA may be set, but not both.
+ Delay time.Duration
+
+ // ETA specifies the earliest time a task may be executed (push queues)
+ // or leased (pull queues).
+ // Either Delay or ETA may be set, but not both.
+ ETA time.Time
+
+ // The number of times the task has been dispatched or leased.
+ RetryCount int32
+
+ // Tag for the task. Only used when Method is PULL.
+ Tag string
+
+ // Retry options for this task. May be nil.
+ RetryOptions *RetryOptions
+}
+
+func (t *Task) method() string {
+ if t.Method == "" {
+ return "POST"
+ }
+ return t.Method
+}
+
+// NewPOSTTask creates a Task that will POST to a path with the given form data.
+func NewPOSTTask(path string, params url.Values) *Task {
+ h := make(http.Header)
+ h.Set("Content-Type", "application/x-www-form-urlencoded")
+ return &Task{
+ Path: path,
+ Payload: []byte(params.Encode()),
+ Header: h,
+ Method: "POST",
+ }
+}
+
+var (
+ currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
+ defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
+)
+
+func getDefaultNamespace(ctx context.Context) string {
+ return internal.IncomingHeaders(ctx).Get(defaultNamespace)
+}
+
+func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) {
+ if queueName == "" {
+ queueName = "default"
+ }
+ path := task.Path
+ if path == "" {
+ path = "/_ah/queue/" + queueName
+ }
+ eta := task.ETA
+ if eta.IsZero() {
+ eta = time.Now().Add(task.Delay)
+ } else if task.Delay != 0 {
+ panic("taskqueue: both Delay and ETA are set")
+ }
+ req := &pb.TaskQueueAddRequest{
+ QueueName: []byte(queueName),
+ TaskName: []byte(task.Name),
+ EtaUsec: proto.Int64(eta.UnixNano() / 1e3),
+ }
+ method := task.method()
+ if method == "PULL" {
+ // Pull-based task
+ req.Body = task.Payload
+ req.Mode = pb.TaskQueueMode_PULL.Enum()
+ if task.Tag != "" {
+ req.Tag = []byte(task.Tag)
+ }
+ } else {
+ // HTTP-based task
+ if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok {
+ req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum()
+ } else {
+ return nil, fmt.Errorf("taskqueue: bad method %q", method)
+ }
+ req.Url = []byte(path)
+ for k, vs := range task.Header {
+ for _, v := range vs {
+ req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
+ Key: []byte(k),
+ Value: []byte(v),
+ })
+ }
+ }
+ if method == "POST" || method == "PUT" {
+ req.Body = task.Payload
+ }
+
+ // Namespace headers.
+ if _, ok := task.Header[currentNamespace]; !ok {
+ // Fetch the current namespace of this request.
+ ns := internal.NamespaceFromContext(c)
+ req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
+ Key: []byte(currentNamespace),
+ Value: []byte(ns),
+ })
+ }
+ if _, ok := task.Header[defaultNamespace]; !ok {
+ // Fetch the X-AppEngine-Default-Namespace header of this request.
+ if ns := getDefaultNamespace(c); ns != "" {
+ req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
+ Key: []byte(defaultNamespace),
+ Value: []byte(ns),
+ })
+ }
+ }
+ }
+
+ if task.RetryOptions != nil {
+ req.RetryParameters = task.RetryOptions.toRetryParameters()
+ }
+
+ return req, nil
+}
+
+var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{
+ pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true,
+ pb.TaskQueueServiceError_TOMBSTONED_TASK: true,
+}
+
+// Add adds the task to a named queue.
+// An empty queue name means that the default queue will be used.
+// Add returns an equivalent Task with defaults filled in, including setting
+// the task's Name field to the chosen name if the original was empty.
+func Add(c context.Context, task *Task, queueName string) (*Task, error) {
+ req, err := newAddReq(c, task, queueName)
+ if err != nil {
+ return nil, err
+ }
+ res := &pb.TaskQueueAddResponse{}
+ if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil {
+ apiErr, ok := err.(*internal.APIError)
+ if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] {
+ return nil, ErrTaskAlreadyAdded
+ }
+ return nil, err
+ }
+ resultTask := *task
+ resultTask.Method = task.method()
+ if task.Name == "" {
+ resultTask.Name = string(res.ChosenTaskName)
+ }
+ return &resultTask, nil
+}
+
+// AddMulti adds multiple tasks to a named queue.
+// An empty queue name means that the default queue will be used.
+// AddMulti returns a slice of equivalent tasks with defaults filled in, including setting
+// each task's Name field to the chosen name if the original was empty.
+// If a given task is badly formed or could not be added, an appengine.MultiError is returned.
+func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) {
+ req := &pb.TaskQueueBulkAddRequest{
+ AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)),
+ }
+ me, any := make(appengine.MultiError, len(tasks)), false
+ for i, t := range tasks {
+ req.AddRequest[i], me[i] = newAddReq(c, t, queueName)
+ any = any || me[i] != nil
+ }
+ if any {
+ return nil, me
+ }
+ res := &pb.TaskQueueBulkAddResponse{}
+ if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil {
+ return nil, err
+ }
+ if len(res.Taskresult) != len(tasks) {
+ return nil, errors.New("taskqueue: server error")
+ }
+ tasksOut := make([]*Task, len(tasks))
+ for i, tr := range res.Taskresult {
+ tasksOut[i] = new(Task)
+ *tasksOut[i] = *tasks[i]
+ tasksOut[i].Method = tasksOut[i].method()
+ if tasksOut[i].Name == "" {
+ tasksOut[i].Name = string(tr.ChosenTaskName)
+ }
+ if *tr.Result != pb.TaskQueueServiceError_OK {
+ if alreadyAddedErrors[*tr.Result] {
+ me[i] = ErrTaskAlreadyAdded
+ } else {
+ me[i] = &internal.APIError{
+ Service: "taskqueue",
+ Code: int32(*tr.Result),
+ }
+ }
+ any = true
+ }
+ }
+ if any {
+ return tasksOut, me
+ }
+ return tasksOut, nil
+}
+
+// Delete deletes a task from a named queue.
+func Delete(c context.Context, task *Task, queueName string) error {
+ err := DeleteMulti(c, []*Task{task}, queueName)
+ if me, ok := err.(appengine.MultiError); ok {
+ return me[0]
+ }
+ return err
+}
+
+// DeleteMulti deletes multiple tasks from a named queue.
+// If a given task could not be deleted, an appengine.MultiError is returned.
+func DeleteMulti(c context.Context, tasks []*Task, queueName string) error {
+ taskNames := make([][]byte, len(tasks))
+ for i, t := range tasks {
+ taskNames[i] = []byte(t.Name)
+ }
+ if queueName == "" {
+ queueName = "default"
+ }
+ req := &pb.TaskQueueDeleteRequest{
+ QueueName: []byte(queueName),
+ TaskName: taskNames,
+ }
+ res := &pb.TaskQueueDeleteResponse{}
+ if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil {
+ return err
+ }
+ if a, b := len(req.TaskName), len(res.Result); a != b {
+ return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b)
+ }
+ me, any := make(appengine.MultiError, len(res.Result)), false
+ for i, ec := range res.Result {
+ if ec != pb.TaskQueueServiceError_OK {
+ me[i] = &internal.APIError{
+ Service: "taskqueue",
+ Code: int32(ec),
+ }
+ any = true
+ }
+ }
+ if any {
+ return me
+ }
+ return nil
+}
+
+func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) {
+ if queueName == "" {
+ queueName = "default"
+ }
+ req := &pb.TaskQueueQueryAndOwnTasksRequest{
+ QueueName: []byte(queueName),
+ LeaseSeconds: proto.Float64(float64(leaseTime)),
+ MaxTasks: proto.Int64(int64(maxTasks)),
+ GroupByTag: proto.Bool(groupByTag),
+ Tag: tag,
+ }
+ res := &pb.TaskQueueQueryAndOwnTasksResponse{}
+ if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil {
+ return nil, err
+ }
+ tasks := make([]*Task, len(res.Task))
+ for i, t := range res.Task {
+ tasks[i] = &Task{
+ Payload: t.Body,
+ Name: string(t.TaskName),
+ Method: "PULL",
+ ETA: time.Unix(0, *t.EtaUsec*1e3),
+ RetryCount: *t.RetryCount,
+ Tag: string(t.Tag),
+ }
+ }
+ return tasks, nil
+}
+
+// Lease leases tasks from a queue.
+// leaseTime is in seconds.
+// The number of tasks fetched will be at most maxTasks.
+func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) {
+ return lease(c, maxTasks, queueName, leaseTime, false, nil)
+}
+
+// LeaseByTag leases tasks from a queue, grouped by tag.
+// If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA.
+// leaseTime is in seconds.
+// The number of tasks fetched will be at most maxTasks.
+func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) {
+ return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag))
+}
+
+// Purge removes all tasks from a queue.
+func Purge(c context.Context, queueName string) error {
+ if queueName == "" {
+ queueName = "default"
+ }
+ req := &pb.TaskQueuePurgeQueueRequest{
+ QueueName: []byte(queueName),
+ }
+ res := &pb.TaskQueuePurgeQueueResponse{}
+ return internal.Call(c, "taskqueue", "PurgeQueue", req, res)
+}
+
+// ModifyLease modifies the lease of a task.
+// Used to request more processing time, or to abandon processing.
+// leaseTime is in seconds and must not be negative.
+func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error {
+ if queueName == "" {
+ queueName = "default"
+ }
+ req := &pb.TaskQueueModifyTaskLeaseRequest{
+ QueueName: []byte(queueName),
+ TaskName: []byte(task.Name),
+ EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership.
+ LeaseSeconds: proto.Float64(float64(leaseTime)),
+ }
+ res := &pb.TaskQueueModifyTaskLeaseResponse{}
+ if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil {
+ return err
+ }
+ task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3)
+ return nil
+}
+
+// QueueStatistics represents statistics about a single task queue.
+type QueueStatistics struct {
+ Tasks int // may be an approximation
+ OldestETA time.Time // zero if there are no pending tasks
+
+ Executed1Minute int // tasks executed in the last minute
+ InFlight int // tasks executing now
+ EnforcedRate float64 // requests per second
+}
+
+// QueueStats retrieves statistics about queues.
+func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) {
+ req := &pb.TaskQueueFetchQueueStatsRequest{
+ QueueName: make([][]byte, len(queueNames)),
+ }
+ for i, q := range queueNames {
+ if q == "" {
+ q = "default"
+ }
+ req.QueueName[i] = []byte(q)
+ }
+ res := &pb.TaskQueueFetchQueueStatsResponse{}
+ if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil {
+ return nil, err
+ }
+ qs := make([]QueueStatistics, len(res.Queuestats))
+ for i, qsg := range res.Queuestats {
+ qs[i] = QueueStatistics{
+ Tasks: int(*qsg.NumTasks),
+ }
+ if eta := *qsg.OldestEtaUsec; eta > -1 {
+ qs[i].OldestETA = time.Unix(0, eta*1e3)
+ }
+ if si := qsg.ScannerInfo; si != nil {
+ qs[i].Executed1Minute = int(*si.ExecutedLastMinute)
+ qs[i].InFlight = int(si.GetRequestsInFlight())
+ qs[i].EnforcedRate = si.GetEnforcedRate()
+ }
+ }
+ return qs, nil
+}
+
+func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) {
+ x.Transaction = t
+}
+
+func init() {
+ internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name)
+
+ // Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue.
+ dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT)
+ internal.RegisterTimeoutErrorCode("taskqueue", dsCode)
+
+ // Transaction registration.
+ internal.RegisterTransactionSetter(setTransaction)
+ internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) {
+ for _, req := range x.AddRequest {
+ setTransaction(req, t)
+ }
+ })
+}