/usr/share/gocode/src/github.com/influxdata/influxdb/services/retention/service.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | package retention // import "github.com/influxdata/influxdb/services/retention"
import (
"io"
"log"
"os"
"sync"
"time"
"github.com/influxdata/influxdb/services/meta"
)
// Service represents the retention policy enforcement service.
type Service struct {
MetaClient interface {
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
}
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error
}
enabled bool
checkInterval time.Duration
wg sync.WaitGroup
done chan struct{}
logger *log.Logger
}
// NewService returns a configured retention policy enforcement service.
func NewService(c Config) *Service {
return &Service{
checkInterval: time.Duration(c.CheckInterval),
done: make(chan struct{}),
logger: log.New(os.Stderr, "[retention] ", log.LstdFlags),
}
}
// Open starts retention policy enforcement.
func (s *Service) Open() error {
s.logger.Println("Starting retention policy enforcement service with check interval of", s.checkInterval)
s.wg.Add(2)
go s.deleteShardGroups()
go s.deleteShards()
return nil
}
// Close stops retention policy enforcement.
func (s *Service) Close() error {
s.logger.Println("retention policy enforcement terminating")
close(s.done)
s.wg.Wait()
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.logger = log.New(w, "[retention] ", log.LstdFlags)
}
func (s *Service) deleteShardGroups() {
defer s.wg.Done()
ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()
for {
select {
case <-s.done:
return
case <-ticker.C:
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error())
} else {
s.logger.Printf("deleted shard group %d from database %s, retention policy %s",
g.ID, d.Name, r.Name)
}
}
}
}
}
}
}
func (s *Service) deleteShards() {
defer s.wg.Done()
ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()
for {
select {
case <-s.done:
return
case <-ticker.C:
s.logger.Println("retention policy shard deletion check commencing")
type deletionInfo struct {
db string
rp string
}
deletedShardIDs := make(map[uint64]deletionInfo, 0)
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
for _, g := range r.DeletedShardGroups() {
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
}
}
}
}
for _, id := range s.TSDBStore.ShardIDs() {
if di, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Printf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, di.db, di.rp, err.Error())
continue
}
s.logger.Printf("shard ID %d from database %s, retention policy %s, deleted",
id, di.db, di.rp)
}
}
}
}
}
|