/usr/share/pyshared/soaplib/easy_client.py is in python-soaplib 0.8.1-2build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | from soaplib.serializers.clazz import ClassSerializer
from soaplib.serializers.primitive import *
from soaplib.soap import *
from soaplib.util import split_url
from soaplib.etimport import ElementTree as et
import new, datetime, httplib
_builtin = {
int : Integer,
str : String,
datetime.datetime : DateTime,
float : Float,
bool : Boolean,
}
class caller(object):
def __init__(self,host,namespace,method):
self.method = method
self.host = host
self.namespace = namespace
def __call__(self,data,raw=False):
e = data.to_xml(data,data.__class__.__name__)
envelope = make_soap_envelope(e)
body = et.tostring(envelope)
methodName = '\"%s\"'%self.method
httpHeaders = {"Content-Length":len(body),
"Content-type":'text/xml; charset="UTF-8"',
"Accept":"application/soap+xml, application/dime, multipart/related, text/*",
'User-Agent':'Soaplib/1.0',
'SOAPAction':methodName
}
scheme,host,path = split_url(self.host)
if scheme == "http":
conn = httplib.HTTPConnection(host)
elif scheme == "https":
conn = httplib.HTTPSConnection(host)
else:
raise RuntimeError("Unsupported URI connection scheme: %s" % scheme)
conn.request("POST",path,body=body,headers=httpHeaders)
response = conn.getresponse()
raw_data = response.read()
retval = et.fromstring(raw_data)
d = retval.find('{http://schemas.xmlsoap.org/soap/envelope/}Body').getchildren()[0]
if raw:
return d
return objectify(d)
class DocumentClient(object):
def __init__(self,host,methods,namespace=None):
for method in methods:
setattr(self,method,caller(host,namespace,method))
def get_serializer(value):
_type = type(value)
if value is None:
return Null
elif hasattr(value,'to_xml') and hasattr(value,'from_xml'):
# this object can act as it's own serializer
return value
elif _builtin.has_key(_type):
# found primitive: string, int, etc.
return _builtin[_type]
elif _type == list:
# must assume that all the elements in the list are of the same type
# and use the same serializer
if not len(value):
return Null
else:
return Array(get_serializer(value[0]))
else:
raise Exception("Could not find serializer for [%s]"%value)
def denamespace(tag):
if tag.find('}') > -1:
tag = tag.replace("{","")
return tag.split('}')
return None, tag
def objectify(element):
class ElementWrapper(object):
def __init__(self,element):
self.__element = element
self.__tags = {}
for child in element.getchildren():
ns,tag = denamespace(child.tag)
if self.__tags.has_key(tag):
self.__tags[tag].append(child)
else:
self.__tags[tag] = child
if hasattr(self,tag):
spot = getattr(self,tag)
if type(spot) != list:
spot = [spot]
#spot.append(objectify(child))
if len(child.getchildren()):
spot.append(new.classobj(tag,(ElementWrapper,),{})(child))
else:
spot.append(child.text)
setattr(self,tag,spot)
setattr(self,'__islist__',True)
elif len(child.getchildren()):
setattr(self,tag,new.classobj(tag,(ElementWrapper,),{})(child))
else:
setattr(self,denamespace(child.tag)[1],child.text) # marshall the type here!
def __getitem__(self,index):
if len(self.__tags.items()) == 1:
# this *could* be an array
k = self.__tags.keys()[0]
return getattr(self,k)[index]
if index == 0:
return self
raise StopIteration
ns,tag = denamespace(element.tag)
return new.classobj(tag,(ElementWrapper,),{})(element)
def make(__type_name,**kwargs):
serializer = new.classobj(__type_name,(ClassSerializer,),{})
namespace = None
setattr(serializer,'soap_members',{})
setattr(serializer,'namespace',namespace)
for k,v in kwargs.items():
serializer.soap_members[k] = get_serializer(v)
o = serializer()
for k,v in kwargs.items():
setattr(o,k,v)
return o
if __name__ == '__main__':
echoInteger = make('echoInteger',i=34)
print et.tostring(echoInteger.to_xml(echoInteger,'echoInteger'))
c = DocumentClient('http://localhost:9753/',['echoInteger','echoSimpleClass','echoSimpleClassArray'])
print c.echoInteger(make('echoInteger',i=3)).retval
print c.echoSimpleClass(make('echoSimpleClass',sc=make('SimpleClass',i=34,s='bobo'))).retval.s
d = c.echoSimpleClassArray(make('echoSimpleClassArray',
sca=[
make('SimpleClass',i=34,s='jasdf'),
make('SimpleClass',i=34,s='bobo'),
make('SimpleClass',i=34,s='poo'),
]
))
print '*'*80
for sc in d.retval:
print sc.s
|