This file is indexed.

/usr/share/gocode/src/github.com/hashicorp/consul/consul/prepared_query_endpoint.go is in golang-github-hashicorp-consul-dev 0.6.4~dfsg-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
package consul

import (
	"errors"
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/armon/go-metrics"
	"github.com/hashicorp/consul/consul/structs"
	"github.com/hashicorp/go-uuid"
)

var (
	// ErrQueryNotFound is returned if the query lookup failed.
	ErrQueryNotFound = errors.New("Query not found")
)

// PreparedQuery manages the prepared query endpoint.
type PreparedQuery struct {
	srv *Server
}

// Apply is used to apply a modifying request to the data store. This should
// only be used for operations that modify the data. The ID of the session is
// returned in the reply.
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
	if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
		return err
	}
	defer metrics.MeasureSince([]string{"consul", "prepared-query", "apply"}, time.Now())

	// Validate the ID. We must create new IDs before applying to the Raft
	// log since it's not deterministic.
	if args.Op == structs.PreparedQueryCreate {
		if args.Query.ID != "" {
			return fmt.Errorf("ID must be empty when creating a new prepared query")
		}

		// We are relying on the fact that UUIDs are random and unlikely
		// to collide since this isn't inside a write transaction.
		state := p.srv.fsm.State()
		for {
			if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
				return fmt.Errorf("UUID generation for prepared query failed: %v", err)
			}
			_, query, err := state.PreparedQueryGet(args.Query.ID)
			if err != nil {
				return fmt.Errorf("Prepared query lookup failed: %v", err)
			}
			if query == nil {
				break
			}
		}
	}
	*reply = args.Query.ID

	// Get the ACL token for the request for the checks below.
	acl, err := p.srv.resolveToken(args.Token)
	if err != nil {
		return err
	}

	// If prefix ACLs apply to the incoming query, then do an ACL check. We
	// need to make sure they have write access for whatever they are
	// proposing.
	if prefix, ok := args.Query.GetACLPrefix(); ok {
		if acl != nil && !acl.PreparedQueryWrite(prefix) {
			p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
			return permissionDeniedErr
		}
	}

	// This is the second part of the check above. If they are referencing
	// an existing query then make sure it exists and that they have write
	// access to whatever they are changing, if prefix ACLs apply to it.
	if args.Op != structs.PreparedQueryCreate {
		state := p.srv.fsm.State()
		_, query, err := state.PreparedQueryGet(args.Query.ID)
		if err != nil {
			return fmt.Errorf("Prepared Query lookup failed: %v", err)
		}
		if query == nil {
			return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
		}

		if prefix, ok := query.GetACLPrefix(); ok {
			if acl != nil && !acl.PreparedQueryWrite(prefix) {
				p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
				return permissionDeniedErr
			}
		}
	}

	// Parse the query and prep it for the state store.
	switch args.Op {
	case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
		if err := parseQuery(args.Query); err != nil {
			return fmt.Errorf("Invalid prepared query: %v", err)
		}

	case structs.PreparedQueryDelete:
		// Nothing else to verify here, just do the delete (we only look
		// at the ID field for this op).

	default:
		return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
	}

	// Commit the query to the state store.
	resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
	if err != nil {
		p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
		return err
	}
	if respErr, ok := resp.(error); ok {
		return respErr
	}

	return nil
}

// parseQuery makes sure the entries of a query are valid for a create or
// update operation. Some of the fields are not checked or are partially
// checked, as noted in the comments below. This also updates all the parsed
// fields of the query.
func parseQuery(query *structs.PreparedQuery) error {
	// We skip a few fields:
	// - ID is checked outside this fn.
	// - Name is optional with no restrictions, except for uniqueness which
	//   is checked for integrity during the transaction. We also make sure
	//   names do not overlap with IDs, which is also checked during the
	//   transaction. Otherwise, people could "steal" queries that they don't
	//   have proper ACL rights to change.
	// - Session is optional and checked for integrity during the transaction.
	// - Template is checked during the transaction since that's where we
	//   compile it.

	// Token is checked when the query is executed, but we do make sure the
	// user hasn't accidentally pasted-in the special redacted token name,
	// which if we allowed in would be super hard to debug and understand.
	if query.Token == redactedToken {
		return fmt.Errorf("Bad Token '%s', it looks like a query definition with a redacted token was submitted", query.Token)
	}

	// Parse the service query sub-structure.
	if err := parseService(&query.Service); err != nil {
		return err
	}

	// Parse the DNS options sub-structure.
	if err := parseDNS(&query.DNS); err != nil {
		return err
	}

	return nil
}

// parseService makes sure the entries of a query are valid for a create or
// update operation. Some of the fields are not checked or are partially
// checked, as noted in the comments below. This also updates all the parsed
// fields of the query.
func parseService(svc *structs.ServiceQuery) error {
	// Service is required.
	if svc.Service == "" {
		return fmt.Errorf("Must provide a Service name to query")
	}

	// NearestN can be 0 which means "don't fail over by RTT".
	if svc.Failover.NearestN < 0 {
		return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
	}

	// We skip a few fields:
	// - There's no validation for Datacenters; we skip any unknown entries
	//   at execution time.
	// - OnlyPassing is just a boolean so doesn't need further validation.
	// - Tags is a free-form list of tags and doesn't need further validation.

	return nil
}

// parseDNS makes sure the entries of a query are valid for a create or
// update operation. This also updates all the parsed fields of the query.
func parseDNS(dns *structs.QueryDNSOptions) error {
	if dns.TTL != "" {
		ttl, err := time.ParseDuration(dns.TTL)
		if err != nil {
			return fmt.Errorf("Bad DNS TTL '%s': %v", dns.TTL, err)
		}

		if ttl < 0 {
			return fmt.Errorf("DNS TTL '%d', must be >=0", ttl)
		}
	}

	return nil
}

// Get returns a single prepared query by ID.
func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
	reply *structs.IndexedPreparedQueries) error {
	if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done {
		return err
	}

	// Get the requested query.
	state := p.srv.fsm.State()
	return p.srv.blockingRPC(
		&args.QueryOptions,
		&reply.QueryMeta,
		state.GetQueryWatch("PreparedQueryGet"),
		func() error {
			index, query, err := state.PreparedQueryGet(args.QueryID)
			if err != nil {
				return err
			}
			if query == nil {
				return ErrQueryNotFound
			}

			// If no prefix ACL applies to this query, then they are
			// always allowed to see it if they have the ID. We still
			// have to filter the remaining object for tokens.
			reply.Index = index
			reply.Queries = structs.PreparedQueries{query}
			if _, ok := query.GetACLPrefix(); !ok {
				return p.srv.filterACL(args.Token, &reply.Queries[0])
			}

			// Otherwise, attempt to filter it the usual way.
			if err := p.srv.filterACL(args.Token, reply); err != nil {
				return err
			}

			// Since this is a GET of a specific query, if ACLs have
			// prevented us from returning something that exists,
			// then alert the user with a permission denied error.
			if len(reply.Queries) == 0 {
				p.srv.logger.Printf("[WARN] consul.prepared_query: Request to get prepared query '%s' denied due to ACLs", args.QueryID)
				return permissionDeniedErr
			}

			return nil
		})
}

// List returns all the prepared queries.
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
	if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
		return err
	}

	// Get the list of queries.
	state := p.srv.fsm.State()
	return p.srv.blockingRPC(
		&args.QueryOptions,
		&reply.QueryMeta,
		state.GetQueryWatch("PreparedQueryList"),
		func() error {
			index, queries, err := state.PreparedQueryList()
			if err != nil {
				return err
			}

			reply.Index, reply.Queries = index, queries
			return p.srv.filterACL(args.Token, reply)
		})
}

// Explain resolves a prepared query and returns the (possibly rendered template)
// to the caller. This is useful for letting operators figure out which query is
// picking up a given name. We can also add additional info about how the query
// will be executed here.
func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
	reply *structs.PreparedQueryExplainResponse) error {
	if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done {
		return err
	}
	defer metrics.MeasureSince([]string{"consul", "prepared-query", "explain"}, time.Now())

	// We have to do this ourselves since we are not doing a blocking RPC.
	p.srv.setQueryMeta(&reply.QueryMeta)
	if args.RequireConsistent {
		if err := p.srv.consistentRead(); err != nil {
			return err
		}
	}

	// Try to locate the query.
	state := p.srv.fsm.State()
	_, query, err := state.PreparedQueryResolve(args.QueryIDOrName)
	if err != nil {
		return err
	}
	if query == nil {
		return ErrQueryNotFound
	}

	// Place the query into a list so we can run the standard ACL filter on
	// it.
	queries := &structs.IndexedPreparedQueries{
		Queries: structs.PreparedQueries{query},
	}
	if err := p.srv.filterACL(args.Token, queries); err != nil {
		return err
	}

	// If the query was filtered out, return an error.
	if len(queries.Queries) == 0 {
		p.srv.logger.Printf("[WARN] consul.prepared_query: Explain on prepared query '%s' denied due to ACLs", query.ID)
		return permissionDeniedErr
	}

	reply.Query = *(queries.Queries[0])
	return nil
}

// Execute runs a prepared query and returns the results. This will perform the
// failover logic if no local results are available. This is typically called as
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
	reply *structs.PreparedQueryExecuteResponse) error {
	if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
		return err
	}
	defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute"}, time.Now())

	// We have to do this ourselves since we are not doing a blocking RPC.
	p.srv.setQueryMeta(&reply.QueryMeta)
	if args.RequireConsistent {
		if err := p.srv.consistentRead(); err != nil {
			return err
		}
	}

	// Try to locate the query.
	state := p.srv.fsm.State()
	_, query, err := state.PreparedQueryResolve(args.QueryIDOrName)
	if err != nil {
		return err
	}
	if query == nil {
		return ErrQueryNotFound
	}

	// Execute the query for the local DC.
	if err := p.execute(query, reply); err != nil {
		return err
	}

	// If they supplied a token with the query, use that, otherwise use the
	// token passed in with the request.
	token := args.QueryOptions.Token
	if query.Token != "" {
		token = query.Token
	}
	if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
		return err
	}

	// TODO (slackpad) We could add a special case here that will avoid the
	// fail over if we filtered everything due to ACLs. This seems like it
	// might not be worth the code complexity and behavior differences,
	// though, since this is essentially a misconfiguration.

	// Shuffle the results in case coordinates are not available if they
	// requested an RTT sort.
	reply.Nodes.Shuffle()
	if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
		return err
	}

	// Apply the limit if given.
	if args.Limit > 0 && len(reply.Nodes) > args.Limit {
		reply.Nodes = reply.Nodes[:args.Limit]
	}

	// In the happy path where we found some healthy nodes we go with that
	// and bail out. Otherwise, we fail over and try remote DCs, as allowed
	// by the query setup.
	if len(reply.Nodes) == 0 {
		wrapper := &queryServerWrapper{p.srv}
		if err := queryFailover(wrapper, query, args.Limit, args.QueryOptions, reply); err != nil {
			return err
		}
	}

	return nil
}

// ExecuteRemote is used when a local node doesn't have any instances of a
// service available and needs to probe remote DCs. This sends the full query
// over since the remote side won't have it in its state store, and this doesn't
// do the failover logic since that's already being run on the originating DC.
// We don't want things to fan out further than one level.
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
	reply *structs.PreparedQueryExecuteResponse) error {
	if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
		return err
	}
	defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute_remote"}, time.Now())

	// We have to do this ourselves since we are not doing a blocking RPC.
	p.srv.setQueryMeta(&reply.QueryMeta)
	if args.RequireConsistent {
		if err := p.srv.consistentRead(); err != nil {
			return err
		}
	}

	// Run the query locally to see what we can find.
	if err := p.execute(&args.Query, reply); err != nil {
		return err
	}

	// If they supplied a token with the query, use that, otherwise use the
	// token passed in with the request.
	token := args.QueryOptions.Token
	if args.Query.Token != "" {
		token = args.Query.Token
	}
	if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
		return err
	}

	// We don't bother trying to do an RTT sort here since we are by
	// definition in another DC. We just shuffle to make sure that we
	// balance the load across the results.
	reply.Nodes.Shuffle()

	// Apply the limit if given.
	if args.Limit > 0 && len(reply.Nodes) > args.Limit {
		reply.Nodes = reply.Nodes[:args.Limit]
	}

	return nil
}

// execute runs a prepared query in the local DC without any failover. We don't
// apply any sorting options or ACL checks at this level - it should be done up above.
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
	reply *structs.PreparedQueryExecuteResponse) error {
	state := p.srv.fsm.State()
	_, nodes, err := state.CheckServiceNodes(query.Service.Service)
	if err != nil {
		return err
	}

	// Filter out any unhealthy nodes.
	nodes = nodes.Filter(query.Service.OnlyPassing)

	// Apply the tag filters, if any.
	if len(query.Service.Tags) > 0 {
		nodes = tagFilter(query.Service.Tags, nodes)
	}

	// Capture the nodes and pass the DNS information through to the reply.
	reply.Service = query.Service.Service
	reply.Nodes = nodes
	reply.DNS = query.DNS

	// Stamp the result for this datacenter.
	reply.Datacenter = p.srv.config.Datacenter

	return nil
}

// tagFilter returns a list of nodes who satisfy the given tags. Nodes must have
// ALL the given tags, and NONE of the forbidden tags (prefixed with !). Note
// for performance this modifies the original slice.
func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
	// Build up lists of required and disallowed tags.
	must, not := make([]string, 0), make([]string, 0)
	for _, tag := range tags {
		tag = strings.ToLower(tag)
		if strings.HasPrefix(tag, "!") {
			tag = tag[1:]
			not = append(not, tag)
		} else {
			must = append(must, tag)
		}
	}

	n := len(nodes)
	for i := 0; i < n; i++ {
		node := nodes[i]

		// Index the tags so lookups this way are cheaper.
		index := make(map[string]struct{})
		if node.Service != nil {
			for _, tag := range node.Service.Tags {
				tag = strings.ToLower(tag)
				index[tag] = struct{}{}
			}
		}

		// Bail if any of the required tags are missing.
		for _, tag := range must {
			if _, ok := index[tag]; !ok {
				goto DELETE
			}
		}

		// Bail if any of the disallowed tags are present.
		for _, tag := range not {
			if _, ok := index[tag]; ok {
				goto DELETE
			}
		}

		// At this point, the service is ok to leave in the list.
		continue

	DELETE:
		nodes[i], nodes[n-1] = nodes[n-1], structs.CheckServiceNode{}
		n--
		i--
	}
	return nodes[:n]
}

// queryServer is a wrapper that makes it easier to test the failover logic.
type queryServer interface {
	GetLogger() *log.Logger
	GetOtherDatacentersByDistance() ([]string, error)
	ForwardDC(method, dc string, args interface{}, reply interface{}) error
}

// queryServerWrapper applies the queryServer interface to a Server.
type queryServerWrapper struct {
	srv *Server
}

// GetLogger returns the server's logger.
func (q *queryServerWrapper) GetLogger() *log.Logger {
	return q.srv.logger
}

// GetOtherDatacentersByDistance calls into the server's fn and filters out the
// server's own DC.
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
	dcs, err := q.srv.getDatacentersByDistance()
	if err != nil {
		return nil, err
	}

	var result []string
	for _, dc := range dcs {
		if dc != q.srv.config.Datacenter {
			result = append(result, dc)
		}
	}
	return result, nil
}

// ForwardDC calls into the server's RPC forwarder.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
	return q.srv.forwardDC(method, dc, args, reply)
}

// queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery,
	limit int, options structs.QueryOptions,
	reply *structs.PreparedQueryExecuteResponse) error {

	// Pull the list of other DCs. This is sorted by RTT in case the user
	// has selected that.
	nearest, err := q.GetOtherDatacentersByDistance()
	if err != nil {
		return err
	}

	// This will help us filter unknown DCs supplied by the user.
	known := make(map[string]struct{})
	for _, dc := range nearest {
		known[dc] = struct{}{}
	}

	// Build a candidate list of DCs to try, starting with the nearest N
	// from RTTs.
	var dcs []string
	index := make(map[string]struct{})
	if query.Service.Failover.NearestN > 0 {
		for i, dc := range nearest {
			if !(i < query.Service.Failover.NearestN) {
				break
			}

			dcs = append(dcs, dc)
			index[dc] = struct{}{}
		}
	}

	// Then add any DCs explicitly listed that weren't selected above.
	for _, dc := range query.Service.Failover.Datacenters {
		// This will prevent a log of other log spammage if we do not
		// attempt to talk to datacenters we don't know about.
		if _, ok := known[dc]; !ok {
			q.GetLogger().Printf("[DEBUG] consul.prepared_query: Skipping unknown datacenter '%s' in prepared query", dc)
			continue
		}

		// This will make sure we don't re-try something that fails
		// from the NearestN list.
		if _, ok := index[dc]; !ok {
			dcs = append(dcs, dc)
		}
	}

	// Now try the selected DCs in priority order.
	failovers := 0
	for _, dc := range dcs {
		// This keeps track of how many iterations we actually run.
		failovers++

		// Be super paranoid and set the nodes slice to nil since it's
		// the same slice we used before. We know there's nothing in
		// there, but the underlying msgpack library has a policy of
		// updating the slice when it's non-nil, and that feels dirty.
		// Let's just set it to nil so there's no way to communicate
		// through this slice across successive RPC calls.
		reply.Nodes = nil

		// Note that we pass along the limit since it can be applied
		// remotely to save bandwidth. We also pass along the consistency
		// mode information and token we were given, so that applies to
		// the remote query as well.
		remote := &structs.PreparedQueryExecuteRemoteRequest{
			Datacenter:   dc,
			Query:        *query,
			Limit:        limit,
			QueryOptions: options,
		}
		if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
			q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
			continue
		}

		// We can stop if we found some nodes.
		if len(reply.Nodes) > 0 {
			break
		}
	}

	// Set this at the end because the response from the remote doesn't have
	// this information.
	reply.Failovers = failovers

	return nil
}