/usr/share/pyshared/ZODB/tests/IExternalGC.test is in python-zodb 1:3.9.7-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | Storage Support for external GC
===============================
A storage that provides IExternalGC supports external garbage
collectors by providing a deleteObject method that transactionally
deletes an object.
A create_storage function is provided that creates a storage.
>>> storage = create_storage()
>>> import ZODB.blob, transaction
>>> db = ZODB.DB(storage)
>>> conn = db.open()
>>> conn.root()[0] = conn.root().__class__()
>>> conn.root()[1] = ZODB.blob.Blob('some data')
>>> transaction.commit()
>>> oid0 = conn.root()[0]._p_oid
>>> oid1 = conn.root()[1]._p_oid
>>> del conn.root()[0]
>>> del conn.root()[1]
>>> transaction.commit()
At this point, object 0 and 1 is garbage, but it's still in the storage:
>>> p0, s0 = storage.load(oid0, '')
>>> p1, s1 = storage.load(oid1, '')
The storage is configured not to gc on pack, so even if we pack, these
objects won't go away:
>>> len(storage)
3
>>> import time
>>> db.pack(time.time()+1)
>>> len(storage)
3
>>> p0, s0 = storage.load(oid0, '')
>>> p1, s1 = storage.load(oid1, '')
Now we'll use the new deleteObject API to delete the objects. We can't
go through the database to do this, so we'll have to manage the
transaction ourselves.
>>> txn = transaction.begin()
>>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid0, s0, txn)
>>> storage.deleteObject(oid1, s1, txn)
>>> storage.tpc_vote(txn)
>>> storage.tpc_finish(txn)
>>> tid = storage.lastTransaction()
Now if we try to load data for the objects, we get a POSKeyError:
>>> storage.load(oid0, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.load(oid1, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
We can still get the data if we load before the time we deleted.
>>> storage.loadBefore(oid0, conn.root()._p_serial) == (p0, s0, tid)
True
>>> storage.loadBefore(oid1, conn.root()._p_serial) == (p1, s1, tid)
True
>>> open(storage.loadBlob(oid1, s1)).read()
'some data'
If we pack, however, the old data will be removed and the data will be
gone:
>>> db.pack(time.time()+1)
>>> len(db.storage)
1
>>> time.sleep(.1)
>>> storage.load(oid0, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.load(oid1, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBefore(oid0, conn.root()._p_serial) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBefore(oid1, conn.root()._p_serial) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBlob(oid1, s1) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
A conflict error is raised if the serial we provide to deleteObject
isn't current:
>>> conn.root()[0] = conn.root().__class__()
>>> transaction.commit()
>>> oid = conn.root()[0]._p_oid
>>> bad_serial = conn.root()[0]._p_serial
>>> conn.root()[0].x = 1
>>> transaction.commit()
>>> txn = transaction.begin()
>>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid, bad_serial, txn); storage.tpc_vote(txn)
... # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ConflictError: database conflict error ...
>>> storage.tpc_abort(txn)
>>> storage.close()
|