/usr/share/pyshared/ZEO/tests/zeo-fan-out.test is in python-zodb 1:3.9.7-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | ZEO Fan Out
===========
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
>>> (_, port0), adminaddr0 = start_server(
... '<filestorage>\npath fs\n</filestorage>', keep=1)
Then we'll start 2 others that use this one:
>>> addr1, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
>>> addr2, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
Now, let's create some client storages that connect to these:
>>> import ZEO, transaction
>>> db1 = ZEO.DB(addr1)
>>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root()
>>> r1
{}
>>> db2 = ZEO.DB(addr2)
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2
{}
If we update c1, we'll eventually see the change in c2:
>>> import persistent.mapping
>>> r1[1] = persistent.mapping.PersistentMapping()
>>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
>>> tm1.commit()
>>> import time
>>> for i in range(100):
... t = tm2.begin()
... if 1 in r2:
... break
... time.sleep(0.01)
>>> r2[1].v
1000
>>> r2[2].v
-1000
Now, let's see if we can break it. :)
>>> def f():
... for i in range(100):
... r1[1].v -= 1
... r1[2].v += 1
... tm1.commit()
... time.sleep(0.01)
>>> import threading
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> for i in range(1000):
... t = tm2.begin()
... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2]
... if r1[1].v == 900:
... break # we caught up
... time.sleep(0.01)
>>> thread.join()
If we shutdown and restart the source server, the variables will be
invalidated:
>>> stop_server(adminaddr0)
>>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
... port=port0)
>>> for i in range(1000):
... c1.sync()
... c2.sync()
... if (
... (r1[1]._p_changed is None)
... and
... (r1[2]._p_changed is None)
... and
... (r2[1]._p_changed is None)
... and
... (r2[2]._p_changed is None)
... ):
... print 'Cool'
... break
... time.sleep(0.01)
... else:
... print 'Dang'
Cool
Cleanup:
>>> db1.close()
>>> db2.close()
|