This file is indexed.

/usr/share/pyshared/desktopcouch/records/field_registry.py is in python-desktopcouch-records 1.0.8-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# Copyright 2009 Canonical Ltd.
#
# This file is part of desktopcouch.
#
#  desktopcouch is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# desktopcouch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with desktopcouch.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Eric Casteleijn <eric.casteleijn@canonical.com>
#          Vincenzo Di Somma <vincenzo.di.somma@canonical.com>

"""Registry utility classes"""

import copy
import warnings

from desktopcouch.records import MergeableList
ANNOTATION_NAMESPACE = 'application_annotations'


class Transformer(object):
    """Two way transformation of between Records and app specific
    data.
    """

    def __init__(self, app_name, field_registry, record_class=None):
        self.app_name = app_name
        self.field_registry = field_registry
        self.record_class = record_class

    def from_app(self, data, record=None):
        """Transform from application data to record data."""
        if record is None:
            record = self.record_class()
        for key, val in data.items():
            if key in self.field_registry:
                self.field_registry[key].set_value(record, val)
            else:
                record.application_annotations.setdefault(
                    self.app_name, {}).setdefault(
                    'application_fields', {})[key] = val
        return record

    def to_app(self, record, data=None):
        """Transform from record data to application data."""
        if data is None:
            data = {}
        annotations = record.application_annotations.get(
            self.app_name, {}).get('application_fields', {})
        for key, value in annotations.items():
            data[key] = value
        for key in self.field_registry:
            data[key] = self.field_registry[key].get_value(record)
        return data


class SimpleFieldMapping(object):
    """Simple field mapping of fieldname to fieldname."""

    def __init__(self, fieldname):
        """initialize the fieldname"""
        self._fieldname = fieldname

    def get_value(self, record):
        """get the value for the registered field"""
        return record.get(self._fieldname)

    def getValue(self, record):         # pylint: disable=C0103
        """Deprecated"""
        warnings.warn(
            ".getValue is deprecated, use .get_value instead",
            DeprecationWarning, stacklevel=2)
        return self.get_value(record)

    def delete_value(self, record):
        """Delete a value"""
        if self._fieldname in record:
            del record[self._fieldname]

    def deleteValue(self, record):      # pylint: disable=C0103
        """Deprecated."""
        warnings.warn(
            ".deleteValue is deprecated, use .delete_value instead",
            DeprecationWarning, stacklevel=2)
        return self.delete_value(record)

    def set_value(self, record, value):
        """set the value for the registered field"""
        if value is None:
            self.delete_value(record)
            return
        record[self._fieldname] = value

    def setValue(self, record, value):  # pylint: disable=C0103
        """Deprecated."""
        warnings.warn(
            ".setValue is deprecated, use .set_value instead",
            DeprecationWarning, stacklevel=2)
        return self.set_value(record, value)


class MergeableListFieldMapping(object):
    """Mapping between MergeableLists and application fields."""

    def __init__(self, app_name, uuid_field, root_list, field_name,
                 default_values=None):
        """initialize the default values"""
        self._app_name = app_name
        self._uuid_field = uuid_field
        self._root_list = root_list
        self._field_name = field_name
        self._default_values = default_values or {}

    def get_application_annotations(self, record):
        """get the application config"""
        annotation = record.application_annotations.setdefault(
            self._app_name, {})
        private = annotation.setdefault('private_application_annotations', {})
        return private

    def _uuid_lookup(self, record):
        """Return the uuid key to lookup the registered field."""
        return self.get_application_annotations(record).get(self._uuid_field)

    def get_value(self, record):
        """Get the value for the registered field."""
        uuid_key = self._uuid_lookup(record)
        try:
            return record[
                self._root_list].get_value_for_uuid(uuid_key).get(
                self._field_name)
        except KeyError:
            return None

    def getValue(self, record):         # pylint: disable=C0103
        """Deprecated"""
        warnings.warn(
            ".getValue is deprecated, use .get_value instead",
            DeprecationWarning, stacklevel=2)
        return self.get_value(record)

    def delete_value(self, record):
        """Delete a value"""
        root_list = record.get(self._root_list, None)
        if not root_list:
            return
        uuid_key = self._uuid_lookup(record)
        if not uuid_key:
            return
        # pylint: disable=W0212
        if self._field_name in root_list._data.get(uuid_key, []):
            del root_list._data[uuid_key][self._field_name]
        # pylint: enable=W0212

    def deleteValue(self, record):      # pylint: disable=C0103
        """Deprecated."""
        warnings.warn(
            ".deleteValue is deprecated, use .delete_value instead",
            DeprecationWarning, stacklevel=2)
        return self.delete_value(record)

    def set_value(self, record, value):
        """Set the value for the registered field."""
        if not value:
            self.delete_value(record)
            return
        root_list = record.get(self._root_list, None)
        application_annotations = self.get_application_annotations(record)
        values = copy.deepcopy(self._default_values)
        values.update({self._field_name: value})
        if not root_list:
            # This is the first value we add to the list
            ml = MergeableList([values])
            record[self._root_list] = ml
            uuid_key = record[self._root_list].get_uuid_for_index(-1)
            application_annotations[self._uuid_field] = uuid_key
            return
        uuid_key = self._uuid_lookup(record)
        if not uuid_key:
            # We add a new value to an existing list
            root_list.append(values)
            uuid_key = root_list.get_uuid_for_index(-1)
            application_annotations[self._uuid_field] = uuid_key
            return
        try:
            # Look up the mapped value in the list
            record_dict = root_list.get_value_for_uuid(uuid_key)
        except KeyError:
            # The list no longer has the mapped value, add it anew
            root_list.append(values)
            uuid_key = root_list.get_uuid_for_index(-1)
            application_annotations[self._uuid_field] = uuid_key
            return
        record_dict[self._field_name] = value

    def setValue(self, record, value):  # pylint: disable=C0103
        """Deprecated."""
        warnings.warn(
            ".setValue is deprecated, use .set_value instead",
            DeprecationWarning, stacklevel=2)
        return self.set_value(record, value)


class ListToMergeableListFieldMapping(object):
    """Mapping from a list field in the application to a mergeable
    list in desktopcouch.

    """

    def __init__(self, fieldname, subfield=None):
        """Initialize the fieldmapping.

        Pass in an optional subfield if the mergeable list in
        desktopcouch consists of dictionary values where your
        application only cares about a single field in those
        dictionaries.

        """
        self._fieldname = fieldname
        self._subfield = subfield

    def get_value(self, record):
        """Get the value for the registered field."""
        if self._subfield:
            return [
                item.get(self._subfield) for item in
                record.get(self._fieldname)]
        return [item for item in record.get(self._fieldname)]

    def delete_value(self, record):
        """Delete a value."""
        if self._fieldname in record:
            del record[self._fieldname]

    def _set_field_at(self, record, index, value):
        """Set the item (or optionally its subfield) at index."""
        if self._subfield is None:
            record[self._fieldname][index] = value
        else:
            record[self._fieldname][index][self._subfield] = value

    def _extend(self, record, values):
        """Extend the field with a list of values."""
        if self._subfield is None:
            record[self._fieldname].extend(values)
        else:
            for value in values:
                record[self._fieldname].append({self._subfield: value})

    def _set_new_value(self, record, values):
        """Create and assign a new mergeable list."""
        if self._subfield is None:
            record[self._fieldname] = MergeableList(values)
        else:
            record[self._fieldname] = MergeableList(
                [{self._subfield: value} for value in values])

    def set_value(self, record, value):
        """Set the value for the field, preserving the UUIDs as much
        as possible.

        The value needs to be a sequence of elements. If the field is
        not already present, it will be added and its value will be
        set to a MergeableList containing the new elements.

        If the field is already present, the current elements will be
        overwritten by the new ones.

        If the new elements are fewer than the current ones, the
        non-overwritten current elements (the tail) will be dropped.

        If the new elements are more than the current ones, the new
        ones in excess will be appended to the list, and will get new
        UUIDs.

        """
        if value is None:
            # we remove the field altogether
            self.delete_value(record)
            return
        # if we have existing values in the record
        if record.get(self._fieldname):
            # loop through them, and
            for index in range(len(record[self._fieldname])):
                # if we have a new value for this position
                if value:
                    # set it
                    self._set_field_at(record, index, value.pop(0))
                else:
                    # delete it
                    del record[self._fieldname][index]
            # if we have new values left after the old values are
            # exhausted
            if value:
                # extend the record field with them
                self._extend(record, value)
        else:
            # create a new mergeable list from the list
            self._set_new_value(record, value)