/usr/bin/trytond_import_zip is in tryton-modules-country 4.2.0-1.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | #!/usr/bin/python
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import urllib2
import zipfile
import csv
import sys
from io import BytesIO
from argparse import ArgumentParser
try:
from progressbar import ProgressBar, Bar, ETA, SimpleProgress
except ImportError:
ProgressBar = None
from proteus import Model, config
def clean(code):
sys.stderr.write('Cleaning')
Zip = Model.get('country.zip')
Zip._proxy.delete(
[z.id for z in Zip.find([('country.code', '=', code)])], {})
print >> sys.stderr, '.'
def fetch(code):
sys.stderr.write('Fetching')
url = 'http://download.geonames.org/export/zip/%s.zip' % code
responce = urllib2.urlopen(url)
data = responce.read()
with zipfile.ZipFile(BytesIO(data)) as zf:
data = zf.read('%s.txt' % code)
print >> sys.stderr, '.'
return data
def import_(data):
Zip = Model.get('country.zip')
Country = Model.get('country.country')
Subdivision = Model.get('country.subdivision')
print >> sys.stderr, 'Importing'
def get_country(code):
country = countries.get(code)
if not country:
country, = Country.find([('code', '=', code)])
countries[code] = country
return country
countries = {}
def get_subdivision(country, code):
code = '%s-%s' % (country, code)
subdivision = subdivisions.get(code)
if not subdivision:
try:
subdivision, = Subdivision.find([('code', '=', code)])
except ValueError:
return
subdivisions[code] = subdivision
return subdivision
subdivisions = {}
if ProgressBar:
pbar = ProgressBar(
widgets=[SimpleProgress(), Bar(), ETA()])
else:
pbar = iter
for row in pbar(list(csv.DictReader(BytesIO(data),
fieldnames=_fieldnames, delimiter='\t'))):
country = get_country(row['country'])
subdivision = get_subdivision(row['country'], row['code1'])
Zip(country=country, subdivision=subdivision, zip=row['postal'],
city=row['place']).save()
_fieldnames = ['country', 'postal', 'place', 'name1', 'code1',
'name2', 'code2', 'name3', 'code3', 'latitude', 'longitude', 'accuracy']
def main(database, codes, config_file=None):
config.set_trytond(database, config_file=config_file)
for code in codes:
print >> sys.stderr, code
code = code.upper()
clean(code)
import_(fetch(code))
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-d', '--database', dest='database')
parser.add_argument('-c', '--config', dest='config_file',
help='the trytond config file')
parser.add_argument('codes', nargs='+')
args = parser.parse_args()
if not args.database:
parser.error('Missing database')
main(args.database, args.codes, args.config_file)
|