This file is indexed.

/usr/share/pyshared/allmydata/mutable/checker.py is in tahoe-lafs 1.9.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
from allmydata.uri import from_string
from allmydata.util import base32, log
from allmydata.check_results import CheckAndRepairResults, CheckResults

from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
from allmydata.mutable.retrieve import Retrieve # for verifying

class MutableChecker:
    SERVERMAP_MODE = MODE_CHECK

    def __init__(self, node, storage_broker, history, monitor):
        self._node = node
        self._storage_broker = storage_broker
        self._history = history
        self._monitor = monitor
        self.bad_shares = [] # list of (server,shnum,failure)
        self._storage_index = self._node.get_storage_index()
        self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
        self.need_repair = False
        self.responded = set() # set of (binary) nodeids

    def check(self, verify=False, add_lease=False):
        servermap = ServerMap()
        # Updating the servermap in MODE_CHECK will stand a good chance
        # of finding all of the shares, and getting a good idea of
        # recoverability, etc, without verifying.
        u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
                             servermap, self.SERVERMAP_MODE,
                             add_lease=add_lease)
        if self._history:
            self._history.notify_mapupdate(u.get_status())
        d = u.update()
        d.addCallback(self._got_mapupdate_results)
        if verify:
            d.addCallback(self._verify_all_shares)
        d.addCallback(lambda res: servermap)
        d.addCallback(self._fill_checker_results, self.results)
        d.addCallback(lambda res: self.results)
        return d

    def _got_mapupdate_results(self, servermap):
        # the file is healthy if there is exactly one recoverable version, it
        # has at least N distinct shares, and there are no unrecoverable
        # versions: all existing shares will be for the same version.
        self._monitor.raise_if_cancelled()
        self.best_version = None
        num_recoverable = len(servermap.recoverable_versions())
        if num_recoverable:
            self.best_version = servermap.best_recoverable_version()

        # The file is unhealthy and needs to be repaired if:
        # - There are unrecoverable versions.
        if servermap.unrecoverable_versions():
            self.need_repair = True
        # - There isn't a recoverable version.
        if num_recoverable != 1:
            self.need_repair = True
        # - The best recoverable version is missing some shares.
        if self.best_version:
            available_shares = servermap.shares_available()
            (num_distinct_shares, k, N) = available_shares[self.best_version]
            if num_distinct_shares < N:
                self.need_repair = True

        return servermap

    def _verify_all_shares(self, servermap):
        # read every byte of each share
        #
        # This logic is going to be very nearly the same as the
        # downloader. I bet we could pass the downloader a flag that
        # makes it do this, and piggyback onto that instead of
        # duplicating a bunch of code.
        #
        # Like:
        #  r = Retrieve(blah, blah, blah, verify=True)
        #  d = r.download()
        #  (wait, wait, wait, d.callback)
        #
        #  Then, when it has finished, we can check the servermap (which
        #  we provided to Retrieve) to figure out which shares are bad,
        #  since the Retrieve process will have updated the servermap as
        #  it went along.
        #
        #  By passing the verify=True flag to the constructor, we are
        #  telling the downloader a few things.
        #
        #  1. It needs to download all N shares, not just K shares.
        #  2. It doesn't need to decrypt or decode the shares, only
        #     verify them.
        if not self.best_version:
            return

        r = Retrieve(self._node, self._storage_broker, servermap,
                     self.best_version, verify=True)
        d = r.download()
        d.addCallback(self._process_bad_shares)
        return d


    def _process_bad_shares(self, bad_shares):
        if bad_shares:
            self.need_repair = True
        self.bad_shares = bad_shares


    def _count_shares(self, smap, version):
        available_shares = smap.shares_available()
        (num_distinct_shares, k, N) = available_shares[version]
        counters = {}
        counters["count-shares-good"] = num_distinct_shares
        counters["count-shares-needed"] = k
        counters["count-shares-expected"] = N
        good_hosts = smap.all_servers_for_version(version)
        counters["count-good-share-hosts"] = len(good_hosts)
        vmap = smap.make_versionmap()
        counters["count-wrong-shares"] = sum([len(shares)
                                          for verinfo,shares in vmap.items()
                                          if verinfo != version])

        return counters

    def _fill_checker_results(self, smap, r):
        self._monitor.raise_if_cancelled()
        r.set_servermap(smap.copy())
        healthy = True
        data = {}
        report = []
        summary = []
        vmap = smap.make_versionmap()
        recoverable = smap.recoverable_versions()
        unrecoverable = smap.unrecoverable_versions()
        data["count-recoverable-versions"] = len(recoverable)
        data["count-unrecoverable-versions"] = len(unrecoverable)

        if recoverable:
            report.append("Recoverable Versions: " +
                          "/".join(["%d*%s" % (len(vmap[v]),
                                               smap.summarize_version(v))
                                    for v in recoverable]))
        if unrecoverable:
            report.append("Unrecoverable Versions: " +
                          "/".join(["%d*%s" % (len(vmap[v]),
                                               smap.summarize_version(v))
                                    for v in unrecoverable]))
        if smap.unrecoverable_versions():
            healthy = False
            summary.append("some versions are unrecoverable")
            report.append("Unhealthy: some versions are unrecoverable")
        if len(recoverable) == 0:
            healthy = False
            summary.append("no versions are recoverable")
            report.append("Unhealthy: no versions are recoverable")
        if len(recoverable) > 1:
            healthy = False
            summary.append("multiple versions are recoverable")
            report.append("Unhealthy: there are multiple recoverable versions")

        needs_rebalancing = False
        if recoverable:
            best_version = smap.best_recoverable_version()
            report.append("Best Recoverable Version: " +
                          smap.summarize_version(best_version))
            counters = self._count_shares(smap, best_version)
            data.update(counters)
            s = counters["count-shares-good"]
            k = counters["count-shares-needed"]
            N = counters["count-shares-expected"]
            if s < N:
                healthy = False
                report.append("Unhealthy: best version has only %d shares "
                              "(encoding is %d-of-%d)" % (s, k, N))
                summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
            hosts = smap.all_servers_for_version(best_version)
            needs_rebalancing = bool( len(hosts) < N )
        elif unrecoverable:
            healthy = False
            # find a k and N from somewhere
            first = list(unrecoverable)[0]
            # not exactly the best version, but that doesn't matter too much
            data.update(self._count_shares(smap, first))
            # leave needs_rebalancing=False: the file being unrecoverable is
            # the bigger problem
        else:
            # couldn't find anything at all
            data["count-shares-good"] = 0
            data["count-shares-needed"] = 3 # arbitrary defaults
            data["count-shares-expected"] = 10
            data["count-good-share-hosts"] = 0
            data["count-wrong-shares"] = 0

        if self.bad_shares:
            data["count-corrupt-shares"] = len(self.bad_shares)
            data["list-corrupt-shares"] = locators = []
            report.append("Corrupt Shares:")
            summary.append("Corrupt Shares:")
            for (server, shnum, f) in sorted(self.bad_shares):
                serverid = server.get_serverid()
                locators.append( (serverid, self._storage_index, shnum) )
                s = "%s-sh%d" % (server.get_name(), shnum)
                if f.check(CorruptShareError):
                    ft = f.value.reason
                else:
                    ft = str(f)
                report.append(" %s: %s" % (s, ft))
                summary.append(s)
                p = (serverid, self._storage_index, shnum, f)
                r.problems.append(p)
                msg = ("CorruptShareError during mutable verify, "
                       "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, "
                       "where=%(where)s")
                log.msg(format=msg, serverid=server.get_name(),
                        si=base32.b2a(self._storage_index),
                        shnum=shnum,
                        where=ft,
                        level=log.WEIRD, umid="EkK8QA")
        else:
            data["count-corrupt-shares"] = 0
            data["list-corrupt-shares"] = []

        sharemap = {}
        for verinfo in vmap:
            for (shnum, server, timestamp) in vmap[verinfo]:
                shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
                if shareid not in sharemap:
                    sharemap[shareid] = []
                sharemap[shareid].append(server.get_serverid())
        data["sharemap"] = sharemap
        data["servers-responding"] = [s.get_serverid() for s in
                                      list(smap.get_reachable_servers())]

        r.set_healthy(healthy)
        r.set_recoverable(bool(recoverable))
        r.set_needs_rebalancing(needs_rebalancing)
        r.set_data(data)
        if healthy:
            r.set_summary("Healthy")
        else:
            r.set_summary("Unhealthy: " + " ".join(summary))
        r.set_report(report)


class MutableCheckAndRepairer(MutableChecker):
    SERVERMAP_MODE = MODE_WRITE # needed to get the privkey

    def __init__(self, node, storage_broker, history, monitor):
        MutableChecker.__init__(self, node, storage_broker, history, monitor)
        self.cr_results = CheckAndRepairResults(self._storage_index)
        self.cr_results.pre_repair_results = self.results
        self.need_repair = False

    def check(self, verify=False, add_lease=False):
        d = MutableChecker.check(self, verify, add_lease)
        d.addCallback(self._maybe_repair)
        d.addCallback(lambda res: self.cr_results)
        return d

    def _maybe_repair(self, res):
        self._monitor.raise_if_cancelled()
        if not self.need_repair:
            self.cr_results.post_repair_results = self.results
            return
        if self._node.is_readonly():
            # ticket #625: we cannot yet repair read-only mutable files
            self.cr_results.post_repair_results = self.results
            self.cr_results.repair_attempted = False
            return
        self.cr_results.repair_attempted = True
        d = self._node.repair(self.results, monitor=self._monitor)
        def _repair_finished(repair_results):
            self.cr_results.repair_successful = repair_results.get_successful()
            r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
            self.cr_results.post_repair_results = r
            self._fill_checker_results(repair_results.servermap, r)
            self.cr_results.repair_results = repair_results # TODO?
        def _repair_error(f):
            # I'm not sure if I want to pass through a failure or not.
            self.cr_results.repair_successful = False
            self.cr_results.repair_failure = f # TODO?
            #self.cr_results.post_repair_results = ??
            return f
        d.addCallbacks(_repair_finished, _repair_error)
        return d