This file is indexed.

/usr/lib/python2.7/dist-packages/ZODB/Connection.py is in python-zodb 1:3.9.7-5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Database connection support

$Id: Connection.py 117033 2010-09-28 20:27:49Z jim $"""

import logging
import sys
import tempfile
import threading
import warnings
import os
import shutil
import time

from persistent import PickleCache

# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
from zope.interface import implements

import transaction

import ZODB
from ZODB.blob import SAVEPOINT_SUFFIX
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils

global_reset_counter = 0

def resetCaches():
    """Causes all connection caches to be reset as connections are reopened.

    Zope's refresh feature uses this.  When you reload Python modules,
    instances of classes continue to use the old class definitions.
    To use the new code immediately, the refresh feature asks ZODB to
    clear caches by calling resetCaches().  When the instances are
    loaded by subsequent connections, they will use the new class
    definitions.
    """
    global global_reset_counter
    global_reset_counter += 1

class Connection(ExportImport, object):
    """Connection to ZODB for loading and storing objects."""

    implements(IConnection,
               ISavepointDataManager,
               IPersistentDataManager,
               ISynchronizer)


    _code_timestamp = 0

    ##########################################################################
    # Connection methods, ZODB.IConnection

    def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
        """Create a new Connection."""

        self._log = logging.getLogger('ZODB.Connection')
        self._debug_info = ()

        self._db = db

        # historical connection
        self.before = before

        # Multi-database support
        self.connections = {self._db.database_name: self}

        storage = db.storage
        if IMVCCStorage.providedBy(storage):
            # Use a connection-specific storage instance.
            self._mvcc_storage = True
            storage = storage.new_instance()
        else:
            self._mvcc_storage = False

        self._normal_storage = self._storage = storage
        self.new_oid = storage.new_oid
        self._savepoint_storage = None

        # Do we need to join a txn manager?
        self._needs_to_join = True
        self.transaction_manager = None
        self.opened = None # time.time() when DB.open() opened us

        self._reset_counter = global_reset_counter
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored

        # Cache which can ghostify (forget the state of) objects not
        # recently used. Its API is roughly that of a dict, with
        # additional gc-related and invalidation-related methods.
        self._cache = PickleCache(self, cache_size, cache_size_bytes)

        # The pre-cache is used by get to avoid infinite loops when
        # objects immediately load their state whern they get their
        # persistent data set.
        self._pre_cache = {}

        # List of all objects (not oids) registered as modified by the
        # persistence machinery, or by add(), or whose access caused a
        # ReadConflictError (just to be able to clean them up from the
        # cache on abort with the other modified objects). All objects
        # of this list are either in _cache or in _added.
        self._registered_objects = []

        # Dict of oid->obj added explicitly through add(). Used as a
        # preliminary cache until commit time when objects are all moved
        # to the real _cache. The objects are moved to _creating at
        # commit time.
        self._added = {}

        # During commit this is turned into a list, which receives
        # objects added as a side-effect of storing a modified object.
        self._added_during_commit = None

        # During commit, all objects go to either _modified or _creating:

        # Dict of oid->flag of new objects (without serial), either
        # added by add() or implicitly added (discovered by the
        # serializer during commit). The flag is True for implicit
        # adding. Used during abort to remove created objects from the
        # _cache, and by persistent_id to check that a new object isn't
        # reachable from multiple databases.
        self._creating = {}

        # List of oids of modified objects, which have to be invalidated
        # in the cache on abort and in other connections on finish.
        self._modified = []

        # _invalidated queues invalidate messages delivered from the DB
        # _inv_lock prevents one thread from modifying the set while
        # another is processing invalidations.  All the invalidations
        # from a single transaction should be applied atomically, so
        # the lock must be held when reading _invalidated.

        # It sucks that we have to hold the lock to read _invalidated.
        # Normally, _invalidated is written by calling dict.update, which
        # will execute atomically by virtue of the GIL.  But some storage
        # might generate oids where hash or compare invokes Python code.  In
        # that case, the GIL can't save us.
        # Note:  since that was written, it was officially declared that the
        # type of an oid is str.  TODO:  remove the related now-unnecessary
        # critical sections (if any -- this needs careful thought).

        self._inv_lock = threading.Lock()
        self._invalidated = set()

        # Flag indicating whether the cache has been invalidated:
        self._invalidatedCache = False

        # We intend to prevent committing a transaction in which
        # ReadConflictError occurs.  _conflicts is the set of oids that
        # experienced ReadConflictError.  Any time we raise ReadConflictError,
        # the oid should be added to this set, and we should be sure that the
        # object is registered.  Because it's registered, Connection.commit()
        # will raise ReadConflictError again (because the oid is in
        # _conflicts).
        self._conflicts = {}

        # _txn_time stores the upper bound on transactions visible to
        # this connection. That is, all object revisions must be
        # written before _txn_time. If it is None, then the current
        # revisions are acceptable.
        self._txn_time = None

        # To support importFile(), implemented in the ExportImport base
        # class, we need to run _importDuringCommit() from our commit()
        # method.  If _import is not None, it is a two-tuple of arguments
        # to pass to _importDuringCommit().
        self._import = None

        self._reader = ObjectReader(self, self._cache, self._db.classFactory)


    def add(self, obj):
        """Add a new object 'obj' to the database and assign it an oid."""
        if self.opened is None:
            raise ConnectionStateError("The database connection is closed")

        marker = object()
        oid = getattr(obj, "_p_oid", marker)
        if oid is marker:
            raise TypeError("Only first-class persistent objects may be"
                            " added to a Connection.", obj)
        elif obj._p_jar is None:
            assert obj._p_oid is None
            oid = obj._p_oid = self._storage.new_oid()
            obj._p_jar = self
            if self._added_during_commit is not None:
                self._added_during_commit.append(obj)
            self._register(obj)
            # Add to _added after calling register(), so that _added
            # can be used as a test for whether the object has been
            # registered with the transaction.
            self._added[oid] = obj
        elif obj._p_jar is not self:
            raise InvalidObjectReference(obj, obj._p_jar)

    def get(self, oid):
        """Return the persistent object with oid 'oid'."""
        if self.opened is None:
            raise ConnectionStateError("The database connection is closed")

        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
        obj = self._added.get(oid, None)
        if obj is not None:
            return obj
        obj = self._pre_cache.get(oid, None)
        if obj is not None:
            return obj

        # This appears to be an MVCC violation because we are loading
        # the must recent data when perhaps we shouldnt. The key is
        # that we are only creating a ghost!
        # A disadvantage to this optimization is that _p_serial cannot be
        # trusted until the object has been loaded, which affects both MVCC
        # and historical connections.
        p, serial = self._storage.load(oid, '')
        obj = self._reader.getGhost(p)

        # Avoid infiniate loop if obj tries to load its state before
        # it is added to the cache and it's state refers to it.
        self._pre_cache[oid] = obj
        obj._p_oid = oid
        obj._p_jar = self
        obj._p_changed = None
        obj._p_serial = serial
        self._pre_cache.pop(oid)
        self._cache[oid] = obj
        return obj

    def cacheMinimize(self):
        """Deactivate all unmodified objects in the cache.
        """
        for connection in self.connections.itervalues():
            connection._cache.minimize()

    # TODO: we should test what happens when cacheGC is called mid-transaction.
    def cacheGC(self):
        """Reduce cache size to target size.
        """
        for connection in self.connections.itervalues():
            connection._cache.incrgc()

    __onCloseCallbacks = None
    def onCloseCallback(self, f):
        """Register a callable, f, to be called by close()."""
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)

    def close(self, primary=True):
        """Close the Connection."""
        if not self._needs_to_join:
            # We're currently joined to a transaction.
            raise ConnectionStateError("Cannot close a connection joined to "
                                       "a transaction")

        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC

        # Call the close callbacks.
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
                try:
                    f()
                except: # except what?
                    f = getattr(f, 'im_self', f)
                    self._log.error("Close callback failed for %s", f,
                                    exc_info=sys.exc_info())
            self.__onCloseCallbacks = None

        self._debug_info = ()

        if self.opened:
            self.transaction_manager.unregisterSynch(self)

        if self._mvcc_storage:
            self._storage.sync(force=False)

        if primary:
            for connection in self.connections.values():
                if connection is not self:
                    connection.close(False)

            # Return the connection to the pool.
            if self.opened is not None:
                self._db._returnToPool(self)

                # _returnToPool() set self.opened to None.
                # However, we can't assert that here, because self may
                # have been reused (by another thread) by the time we
                # get back here.
        else:
            self.opened = None

    def db(self):
        """Returns a handle to the database this connection belongs to."""
        return self._db

    def isReadOnly(self):
        """Returns True if this connection is read only."""
        if self.opened is None:
            raise ConnectionStateError("The database connection is closed")
        return self.before is not None or self._storage.isReadOnly()

    def invalidate(self, tid, oids):
        """Notify the Connection that transaction 'tid' invalidated oids."""
        if self.before is not None:
            # This is a historical connection.  Invalidations are irrelevant.
            return
        self._inv_lock.acquire()
        try:
            if self._txn_time is None:
                self._txn_time = tid
            elif (tid < self._txn_time) and (tid is not None):
                raise AssertionError("invalidations out of order, %r < %r"
                                     % (tid, self._txn_time))

            self._invalidated.update(oids)
        finally:
            self._inv_lock.release()

    def invalidateCache(self):
        self._inv_lock.acquire()
        try:
            self._invalidatedCache = True
        finally:
            self._inv_lock.release()

    @property
    def root(self):
        """Return the database root object."""
        return RootConvenience(self.get(z64))

    def get_connection(self, database_name):
        """Return a Connection for the named database."""
        connection = self.connections.get(database_name)
        if connection is None:
            new_con = self._db.databases[database_name].open(
                transaction_manager=self.transaction_manager,
                before=self.before,
                )
            self.connections.update(new_con.connections)
            new_con.connections = self.connections
            connection = new_con
        return connection

    def _implicitlyAdding(self, oid):
        """Are we implicitly adding an object within the current transaction

        This is used in a check to avoid implicitly adding an object
        to a database in a multi-database situation.
        See serialize.ObjectWriter.persistent_id.

        """
        return (self._creating.get(oid, 0)
                or
                ((self._savepoint_storage is not None)
                 and
                 self._savepoint_storage.creating.get(oid, 0)
                 )
                )

    def sync(self):
        """Manually update the view on the database."""
        self.transaction_manager.abort()
        self._storage_sync()

    def getDebugInfo(self):
        """Returns a tuple with different items for debugging the
        connection.
        """
        return self._debug_info

    def setDebugInfo(self, *args):
        """Add the given items to the debug information of this connection."""
        self._debug_info = self._debug_info + args

    def getTransferCounts(self, clear=False):
        """Returns the number of objects loaded and stored."""
        res = self._load_count, self._store_count
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res

    # Connection methods
    ##########################################################################

    ##########################################################################
    # Data manager (ISavepointDataManager) methods

    def abort(self, transaction):
        """Abort a transaction and forget all changes."""

        # The order is important here.  We want to abort registered
        # objects before we process the cache.  Otherwise, we may un-add
        # objects added in savepoints.  If they've been modified since
        # the savepoint, then they won't have _p_oid or _p_jar after
        # they've been unadded. This will make the code in _abort
        # confused.

        self._abort()

        if self._savepoint_storage is not None:
            self._abort_savepoint()

        self._tpc_cleanup()

    def _abort(self):
        """Abort a transaction and forget all changes."""

        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid is not None
            if oid in self._added:
                del self._added[oid]
                if self._cache.get(oid) is not None:
                    del self._cache[oid]
                del obj._p_jar
                del obj._p_oid
                if obj._p_changed:
                    obj._p_changed = False
            else:

                # Note: If we invalidate a non-ghostifiable object
                # (i.e. a persistent class), the object will
                # immediately reread its state.  That means that the
                # following call could result in a call to
                # self.setstate, which, of course, must succeed.
                # In general, it would be better if the read could be
                # delayed until the start of the next transaction.  If
                # we read at the end of a transaction and if the
                # object was invalidated during this transaction, then
                # we'll read non-current data, which we'll discard
                # later in transaction finalization.  Unfortnately, we
                # can only delay the read if this abort corresponds to
                # a top-level-transaction abort.  We can't tell if
                # this is a top-level-transaction abort, so we have to
                # go ahead and invalidate now.  Fortunately, it's
                # pretty unlikely that the object we are invalidating
                # was invalidated by another thread, so the risk of a
                # reread is pretty low.

                self._cache.invalidate(oid)

    def _tpc_cleanup(self):
        """Performs cleanup operations to support tpc_finish and tpc_abort."""
        self._conflicts.clear()
        self._needs_to_join = True
        self._registered_objects = []
        self._creating.clear()

    # Process pending invalidations.
    def _flush_invalidations(self):
        if self._mvcc_storage:
            # Poll the storage for invalidations.
            invalidated = self._storage.poll_invalidations()
            if invalidated is None:
                # special value: the transaction is so old that
                # we need to flush the whole cache.
                self._cache.invalidate(self._cache.cache_data.keys())
            elif invalidated:
                self._cache.invalidate(invalidated)

        self._inv_lock.acquire()
        try:
            # Non-ghostifiable objects may need to read when they are
            # invalidated, so we'll quickly just replace the
            # invalidating dict with a new one.  We'll then process
            # the invalidations after freeing the lock *and* after
            # resetting the time.  This means that invalidations will
            # happen after the start of the transactions.  They are
            # subject to conflict errors and to reading old data.

            # TODO: There is a potential problem lurking for persistent
            # classes.  Suppose we have an invalidation of a persistent
            # class and of an instance.  If the instance is
            # invalidated first and if the invalidation logic uses
            # data read from the class, then the invalidation could
            # be performed with stale data.  Or, suppose that there
            # are instances of the class that are freed as a result of
            # invalidating some object.  Perhaps code in their __del__
            # uses class data.  Really, the only way to properly fix
            # this is to, in fact, make classes ghostifiable.  Then
            # we'd have to reimplement attribute lookup to check the
            # class state and, if necessary, activate the class.  It's
            # much worse than that though, because we'd also need to
            # deal with slots.  When a class is ghostified, we'd need
            # to replace all of the slot operations with versions that
            # reloaded the object when called. It's hard to say which
            # is better or worse.  For now, it seems the risk of
            # using a class while objects are being invalidated seems
            # small enough to be acceptable.

            invalidated = dict.fromkeys(self._invalidated)
            self._invalidated = set()
            self._txn_time = None
            if self._invalidatedCache:
                self._invalidatedCache = False
                invalidated = self._cache.cache_data.copy()
        finally:
            self._inv_lock.release()

        self._cache.invalidate(invalidated)

        # Now is a good time to collect some garbage.
        self._cache.incrgc()

    def tpc_begin(self, transaction):
        """Begin commit of a transaction, starting the two-phase commit."""
        self._modified = []

        # _creating is a list of oids of new objects, which is used to
        # remove them from the cache if a transaction aborts.
        self._creating.clear()
        self._normal_storage.tpc_begin(transaction)

    def commit(self, transaction):
        """Commit changes to an object"""

        if self._savepoint_storage is not None:

            # We first checkpoint the current changes to the savepoint
            self.savepoint()

            # then commit all of the savepoint changes at once
            self._commit_savepoint(transaction)

            # No need to call _commit since savepoint did.

        else:
            self._commit(transaction)

    def _commit(self, transaction):
        """Commit changes to an object"""

        if self.before is not None:
            raise ReadOnlyHistoryError()

        if self._import:
            # We are importing an export file. We alsways do this
            # while making a savepoint so we can copy export data
            # directly to our storage, typically a TmpStore.
            self._importDuringCommit(transaction, *self._import)
            self._import = None

        # Just in case an object is added as a side-effect of storing
        # a modified object.  If, for example, a __getstate__() method
        # calls add(), the newly added objects will show up in
        # _added_during_commit.  This sounds insane, but has actually
        # happened.

        self._added_during_commit = []

        if self._invalidatedCache:
            raise ConflictError()

        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid
            if oid in self._conflicts:
                raise ReadConflictError(object=obj)

            if obj._p_jar is not self:
                raise InvalidObjectReference(obj, obj._p_jar)
            elif oid in self._added:
                assert obj._p_serial == z64
            elif obj._p_changed:
                if oid in self._invalidated:
                    resolve = getattr(obj, "_p_resolveConflict", None)
                    if resolve is None:
                        raise ConflictError(object=obj)
                self._modified.append(oid)
            else:
                # Nothing to do.  It's been said that it's legal, e.g., for
                # an object to set _p_changed to false after it's been
                # changed and registered.
                continue

            self._store_objects(ObjectWriter(obj), transaction)

        for obj in self._added_during_commit:
            self._store_objects(ObjectWriter(obj), transaction)
        self._added_during_commit = None

    def _store_objects(self, writer, transaction):
        for obj in writer:
            oid = obj._p_oid
            serial = getattr(obj, "_p_serial", z64)

            if ((serial == z64)
                and
                ((self._savepoint_storage is None)
                 or (oid not in self._savepoint_storage.creating)
                 or self._savepoint_storage.creating[oid]
                 )
                ):

                # obj is a new object

                # Because obj was added, it is now in _creating, so it
                # can be removed from _added.  If oid wasn't in
                # adding, then we are adding it implicitly.

                implicitly_adding = self._added.pop(oid, None) is None

                self._creating[oid] = implicitly_adding

            else:
                if (oid in self._invalidated
                    and not hasattr(obj, '_p_resolveConflict')):
                    raise ConflictError(object=obj)
                self._modified.append(oid)
            p = writer.serialize(obj)  # This calls __getstate__ of obj

            if isinstance(obj, Blob):
                if not IBlobStorage.providedBy(self._storage):
                    raise Unsupported(
                        "Storing Blobs in %s is not supported." %
                        repr(self._storage))
                if obj.opened():
                    raise ValueError("Can't commit with opened blobs.")
                s = self._storage.storeBlob(oid, serial, p,
                                            obj._uncommitted(),
                                            '', transaction)
                # we invalidate the object here in order to ensure
                # that that the next attribute access of its name
                # unghostify it, which will cause its blob data
                # to be reattached "cleanly"
                obj._p_invalidate()
            else:
                s = self._storage.store(oid, serial, p, '', transaction)

            self._store_count += 1
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
            try:
                self._cache[oid] = obj
            except:
                # Dang, I bet it's wrapped:
                # TODO:  Deprecate, then remove, this.
                if hasattr(obj, 'aq_base'):
                    self._cache[oid] = obj.aq_base
                else:
                    raise

            self._cache.update_object_size_estimation(oid, len(p))
            obj._p_estimated_size = len(p)

            self._handle_serial(s, oid)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When conflict resolution occurs, the object state held by
        # the connection does not match what is written to the
        # database.  Invalidate the object here to guarantee that
        # the new state is read the next time the object is used.

        if not store_return:
            return
        if isinstance(store_return, str):
            assert oid is not None
            self._handle_one_serial(oid, store_return, change)
        else:
            for oid, serial in store_return:
                self._handle_one_serial(oid, serial, change)

    def _handle_one_serial(self, oid, serial, change):
        if not isinstance(serial, str):
            raise serial
        obj = self._cache.get(oid, None)
        if obj is None:
            return
        if serial == ResolvedSerial:
            del obj._p_changed # transition from changed to ghost
        else:
            if change:
                obj._p_changed = 0 # transition from changed to up-to-date
            obj._p_serial = serial

    def tpc_abort(self, transaction):
        if self._import:
            self._import = None

        if self._savepoint_storage is not None:
            self._abort_savepoint()

        self._storage.tpc_abort(transaction)

        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread its
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

        self._cache.invalidate(self._modified)
        self._invalidate_creating()
        while self._added:
            oid, obj = self._added.popitem()
            if obj._p_changed:
                obj._p_changed = False
            del obj._p_oid
            del obj._p_jar
        self._tpc_cleanup()

    def _invalidate_creating(self, creating=None):
        """Disown any objects newly saved in an uncommitted transaction."""
        if creating is None:
            creating = self._creating

        for oid in creating:
            o = self._cache.get(oid)
            if o is not None:
                del self._cache[oid]
                if o._p_changed:
                    o._p_changed = False
                del o._p_jar
                del o._p_oid

        creating.clear()

    def tpc_vote(self, transaction):
        """Verify that a data manager can commit the transaction."""
        try:
            vote = self._storage.tpc_vote
        except AttributeError:
            return
        s = vote(transaction)
        self._handle_serial(s)

    def tpc_finish(self, transaction):
        """Indicate confirmation that the transaction is done."""

        def callback(tid):
            if self._mvcc_storage:
                # Inter-connection invalidation is not needed when the
                # storage provides MVCC.
                return
            d = dict.fromkeys(self._modified)
            self._db.invalidate(tid, d, self)
#       It's important that the storage calls the passed function
#       while it still has its lock.  We don't want another thread
#       to be able to read any updated data until we've had a chance
#       to send an invalidation message to all of the other
#       connections!
        self._storage.tpc_finish(transaction, callback)
        self._tpc_cleanup()

    def sortKey(self):
        """Return a consistent sort key for this connection."""
        return "%s:%s" % (self._storage.sortKey(), id(self))

    # Data manager (ISavepointDataManager) methods
    ##########################################################################

    ##########################################################################
    # Transaction-manager synchronization -- ISynchronizer

    def beforeCompletion(self, txn):
        # We don't do anything before a commit starts.
        pass

    # Call the underlying storage's sync() method (if any), and process
    # pending invalidations regardless.  Of course this should only be
    # called at transaction boundaries.
    def _storage_sync(self, *ignored):
        sync = getattr(self._storage, 'sync', 0)
        if sync:
            sync()
        self._flush_invalidations()

    afterCompletion =  _storage_sync
    newTransaction = _storage_sync

     # Transaction-manager synchronization -- ISynchronizer
    ##########################################################################

    ##########################################################################
    # persistent.interfaces.IPersistentDatamanager

    def oldstate(self, obj, tid):
        """Return copy of 'obj' that was written by transaction 'tid'."""
        assert obj._p_jar is self
        p = self._storage.loadSerial(obj._p_oid, tid)
        return self._reader.getState(p)

    def setstate(self, obj):
        """Turns the ghost 'obj' into a real object by loading its state from
        the database."""
        oid = obj._p_oid

        if self.opened is None:
            msg = ("Shouldn't load state for %s "
                   "when the connection is closed" % oid_repr(oid))
            self._log.error(msg)
            raise ConnectionStateError(msg)

        try:
            self._setstate(obj)
        except ConflictError:
            raise
        except:
            self._log.error("Couldn't load state for %s", oid_repr(oid),
                            exc_info=sys.exc_info())
            raise

    def _setstate(self, obj):
        # Helper for setstate(), which provides logging of failures.

        # The control flow is complicated here to avoid loading an
        # object revision that we are sure we aren't going to use.  As
        # a result, invalidation tests occur before and after the
        # load.  We can only be sure about invalidations after the
        # load.

        # If an object has been invalidated, there are several cases
        # to consider:
        # 1. Check _p_independent()
        # 2. Try MVCC
        # 3. Raise ConflictError.

        # Does anything actually use _p_independent()?  It would simplify
        # the code if we could drop support for it.
        # (BTrees.Length does.)


        if self.before is not None:
            # Load data that was current before the time we have.
            before = self.before
            t = self._storage.loadBefore(obj._p_oid, before)
            if t is None:
                raise POSKeyError() # historical connection!
            p, serial, end = t

        else:
            # There is a harmless data race with self._invalidated.  A
            # dict update could go on in another thread, but we don't care
            # because we have to check again after the load anyway.

            if self._invalidatedCache:
                raise ReadConflictError()

            if (obj._p_oid in self._invalidated and
                    not myhasattr(obj, "_p_independent")):
                # If the object has _p_independent(), we will handle it below.
                self._load_before_or_conflict(obj)
                return

            p, serial = self._storage.load(obj._p_oid, '')
            self._load_count += 1

            self._inv_lock.acquire()
            try:
                invalid = obj._p_oid in self._invalidated
            finally:
                self._inv_lock.release()

            if invalid:
                if myhasattr(obj, "_p_independent"):
                    # This call will raise a ReadConflictError if something
                    # goes wrong
                    self._handle_independent(obj)
                else:
                    self._load_before_or_conflict(obj)
                    return

        self._reader.setGhostState(obj, p)
        obj._p_serial = serial
        self._cache.update_object_size_estimation(obj._p_oid, len(p))
        obj._p_estimated_size = len(p)

        # Blob support
        if isinstance(obj, Blob):
            obj._p_blob_uncommitted = None
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, serial)

    def _load_before_or_conflict(self, obj):
        """Load non-current state for obj or raise ReadConflictError."""
        if not self._setstate_noncurrent(obj):
            self._register(obj)
            self._conflicts[obj._p_oid] = True
            raise ReadConflictError(object=obj)

    def _setstate_noncurrent(self, obj):
        """Set state using non-current data.

        Return True if state was available, False if not.
        """
        try:
            # Load data that was current before the commit at txn_time.
            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
        except KeyError:
            return False
        if t is None:
            return False
        data, start, end = t
        # The non-current transaction must have been written before
        # txn_time.  It must be current at txn_time, but could have
        # been modified at txn_time.

        assert start < self._txn_time, (u64(start), u64(self._txn_time))
        assert end is not None
        assert self._txn_time <= end, (u64(self._txn_time), u64(end))
        self._reader.setGhostState(obj, data)
        obj._p_serial = start

        # MVCC Blob support
        if isinstance(obj, Blob):
            obj._p_blob_uncommitted = None
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)

        return True

    def _handle_independent(self, obj):
        # Helper method for setstate() handles possibly independent objects
        # Call _p_independent(), if it returns True, setstate() wins.
        # Otherwise, raise a ConflictError.

        if obj._p_independent():
            self._inv_lock.acquire()
            try:
                try:
                    self._invalidated.remove(obj._p_oid)
                except KeyError:
                    pass
            finally:
                self._inv_lock.release()
        else:
            self._conflicts[obj._p_oid] = 1
            self._register(obj)
            raise ReadConflictError(object=obj)

    def register(self, obj):
        """Register obj with the current transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.

        obj must be an object loaded from this Connection.
        """
        assert obj._p_jar is self
        if obj._p_oid is None:
            # The actual complaint here is that an object without
            # an oid is being registered.  I can't think of any way to
            # achieve that without assignment to _p_jar.  If there is
            # a way, this will be a very confusing exception.
            raise ValueError("assigning to _p_jar is not supported")
        elif obj._p_oid in self._added:
            # It was registered before it was added to _added.
            return
        self._register(obj)

    def _register(self, obj=None):

        # The order here is important.  We need to join before
        # registering the object, because joining may take a
        # savepoint, and the savepoint should not reflect the change
        # to the object.

        if self._needs_to_join:
            self.transaction_manager.get().join(self)
            self._needs_to_join = False

        if obj is not None:
            self._registered_objects.append(obj)


    # persistent.interfaces.IPersistentDatamanager
    ##########################################################################

    ##########################################################################
    # PROTECTED stuff (used by e.g. ZODB.DB.DB)

    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def open(self, transaction_manager=None, delegate=True):
        """Register odb, the DB that this Connection uses.

        This method is called by the DB every time a Connection
        is opened.  Any invalidations received while the Connection
        was closed will be processed.

        If the global module function resetCaches() was called, the
        cache will be cleared.

        Parameters:
        odb: database that owns the Connection
        transaction_manager: transaction manager to use.  None means
            use the default transaction manager.
        register for afterCompletion() calls.
        """

        self.opened = time.time()

        if transaction_manager is None:
            transaction_manager = transaction.manager

        self.transaction_manager = transaction_manager

        if self._reset_counter != global_reset_counter:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._flush_invalidations()

        transaction_manager.registerSynch(self)

        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC

        if delegate:
            # delegate open to secondary connections
            for connection in self.connections.values():
                if connection is not self:
                    connection.open(transaction_manager, False)

    def _resetCache(self):
        """Creates a new cache, discarding the old one.

        See the docstring for the resetCaches() function.
        """
        self._reset_counter = global_reset_counter
        self._invalidated.clear()
        self._invalidatedCache = False
        cache_size = self._cache.cache_size
        cache_size_bytes = self._cache.cache_size_bytes
        self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
        if getattr(self, '_reader', None) is not None:
            self._reader._cache = cache

    def _releaseStorage(self):
        """Tell the storage to release resources it's using"""
        if self._mvcc_storage:
            self._storage.release()

    ##########################################################################
    # Python protocol

    def __repr__(self):
        return '<Connection at %08x>' % (positive_id(self),)

    # Python protocol
    ##########################################################################

    ##########################################################################
    # DEPRECATION candidates

    __getitem__ = get

    def exchange(self, old, new):
        # called by a ZClasses method that isn't executed by the test suite
        oid = old._p_oid
        new._p_oid = oid
        new._p_jar = self
        new._p_changed = 1
        self._register(new)
        self._cache[oid] = new

    # DEPRECATION candidates
    ##########################################################################

    ##########################################################################
    # DEPRECATED methods

    # None at present.

    # DEPRECATED methods
    ##########################################################################

    #####################################################################
    # Savepoint support

    def savepoint(self):
        if self._savepoint_storage is None:
            tmpstore = TmpStore(self._normal_storage)
            self._savepoint_storage = tmpstore
            self._storage = self._savepoint_storage

        self._creating.clear()
        self._commit(None)
        self._storage.creating.update(self._creating)
        self._creating.clear()
        self._registered_objects = []

        state = (self._storage.position,
                 self._storage.index.copy(),
                 self._storage.creating.copy(),
                 )
        result = Savepoint(self, state)
        # While the interface doesn't guarantee this, savepoints are
        # sometimes used just to "break up" very long transactions, and as
        # a pragmatic matter this is a good time to reduce the cache
        # memory burden.
        self.cacheGC()
        return result

    def _rollback(self, state):
        self._abort()
        self._registered_objects = []
        src = self._storage
        self._invalidate_creating(src.creating)
        index = src.index
        src.reset(*state)
        self._cache.invalidate(index)

    def _commit_savepoint(self, transaction):
        """Commit all changes made in savepoints and begin 2-phase commit
        """
        src = self._savepoint_storage
        self._storage = self._normal_storage
        self._savepoint_storage = None

        try:
            self._log.debug("Committing savepoints of size %s", src.getSize())
            oids = src.index.keys()

            # Copy invalidating and creating info from temporary storage:
            self._modified.extend(oids)
            self._creating.update(src.creating)

            for oid in oids:
                data, serial = src.load(oid, src)
                obj = self._cache.get(oid, None)
                if obj is not None:
                    self._cache.update_object_size_estimation(
                        obj._p_oid, len(data))
                    obj._p_estimated_size = len(data)
                if isinstance(self._reader.getGhost(data), Blob):
                    blobfilename = src.loadBlob(oid, serial)
                    s = self._storage.storeBlob(
                        oid, serial, data, blobfilename,
                        '', transaction)
                    # we invalidate the object here in order to ensure
                    # that that the next attribute access of its name
                    # unghostify it, which will cause its blob data
                    # to be reattached "cleanly"
                    self.invalidate(None, (oid, ))
                else:
                    s = self._storage.store(oid, serial, data,
                                            '', transaction)

                self._handle_serial(s, oid, change=False)
        finally:
            src.close()

    def _abort_savepoint(self):
        """Discard all savepoint data."""
        src = self._savepoint_storage
        self._invalidate_creating(src.creating)
        self._storage = self._normal_storage
        self._savepoint_storage = None

        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread it's
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

        # Note that we do this *after* reseting the storage so that, if
        # data are read, we read it from the reset storage!

        self._cache.invalidate(src.index)

        src.close()

    # Savepoint support
    #####################################################################

class Savepoint:

    implements(IDataManagerSavepoint)

    def __init__(self, datamanager, state):
        self.datamanager = datamanager
        self.state = state

    def rollback(self):
        self.datamanager._rollback(self.state)

class TmpStore:
    """A storage-like thing to support savepoints."""

    implements(IBlobStorage)

    def __init__(self, storage):
        self._storage = storage
        for method in (
            'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
            'isReadOnly'
            ):
            setattr(self, method, getattr(storage, method))

        self._file = tempfile.TemporaryFile()
        # position: current file position
        # _tpos: file position at last commit point
        self.position = 0L
        # index: map oid to pos of last committed version
        self.index = {}
        self.creating = {}

        self._blob_dir = None

    def __len__(self):
        return len(self.index)

    def close(self):
        self._file.close()
        if self._blob_dir is not None:
            remove_committed_dir(self._blob_dir)
            self._blob_dir = None

    def load(self, oid, version):
        pos = self.index.get(oid)
        if pos is None:
            return self._storage.load(oid, '')
        self._file.seek(pos)
        h = self._file.read(8)
        oidlen = u64(h)
        read_oid = self._file.read(oidlen)
        if read_oid != oid:
            raise POSException.StorageSystemError('Bad temporary storage')
        h = self._file.read(16)
        size = u64(h[8:])
        serial = h[:8]
        return self._file.read(size), serial

    def store(self, oid, serial, data, version, transaction):
        # we have this funny signature so we can reuse the normal non-commit
        # commit logic
        assert version == ''
        self._file.seek(self.position)
        l = len(data)
        if serial is None:
            serial = z64
        header = p64(len(oid)) + oid + serial + p64(l)
        self._file.write(header)
        self._file.write(data)
        self.index[oid] = self.position
        self.position += l + len(header)
        return serial

    def storeBlob(self, oid, serial, data, blobfilename, version,
                  transaction):
        assert version == ''
        serial = self.store(oid, serial, data, '', transaction)

        targetpath = self._getBlobPath()
        if not os.path.exists(targetpath):
            os.makedirs(targetpath, 0700)

        targetname = self._getCleanFilename(oid, serial)
        rename_or_copy_blob(blobfilename, targetname, chmod=False)

    def loadBlob(self, oid, serial):
        """Return the filename where the blob file can be found.
        """
        if not IBlobStorage.providedBy(self._storage):
            raise Unsupported(
                "Blobs are not supported by the underlying storage %r." %
                self._storage)
        filename = self._getCleanFilename(oid, serial)
        if not os.path.exists(filename):
            return self._storage.loadBlob(oid, serial)
        return filename

    def openCommittedBlobFile(self, oid, serial, blob=None):
        blob_filename = self.loadBlob(oid, serial)
        if blob is None:
            return open(blob_filename, 'rb')
        else:
            return ZODB.blob.BlobFile(blob_filename, 'r', blob)

    def _getBlobPath(self):
        blob_dir = self._blob_dir
        if blob_dir is None:
            blob_dir = tempfile.mkdtemp(dir=self.temporaryDirectory(),
                                        prefix='savepoints')
            self._blob_dir = blob_dir
        return blob_dir

    def _getCleanFilename(self, oid, tid):
        return os.path.join(
            self._getBlobPath(),
            "%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid),
                         SAVEPOINT_SUFFIX,)
            )

    def temporaryDirectory(self):
        return self._storage.temporaryDirectory()

    def reset(self, position, index, creating):
        self._file.truncate(position)
        self.position = position
        # Caution:  We're typically called as part of a savepoint rollback.
        # Other machinery remembers the index to restore, and passes it to
        # us.  If we simply bind self.index to `index`, then if the caller
        # didn't pass a copy of the index, the caller's index will mutate
        # when self.index mutates.  This can be a disaster if the caller is a
        # savepoint to which the user rolls back again later (the savepoint
        # loses the original index it passed).  Therefore, to be safe, we make
        # a copy of the index here.  An alternative would be to ensure that
        # all callers pass copies.  As is, our callers do not make copies.
        self.index = index.copy()
        self.creating = creating

class RootConvenience(object):

    def __init__(self, root):
        self.__dict__['_root'] = root

    def __getattr__(self, name):
        try:
            return self._root[name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, v):
        self._root[name] = v

    def __delattr__(self, name):
        try:
            del self._root[name]
        except KeyError:
            raise AttributeError(name)

    def __call__(self):
        return self._root

    def __repr__(self):
        names = " ".join(sorted(self._root))
        if len(names) > 60:
            names = names[:57].rsplit(' ', 1)[0] + ' ...'
        return "<root: %s>" % names