This file is indexed.

/usr/share/arc/examples/PythonBroker/ACIXBroker.py is in nordugrid-arc-python 5.3.0~rc1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
'''
Broker using ACIX information. This broker queries ACIX for cached locations
of input files specified in the job. It then matches those locations against
execution targets and ranks the targets by the number of cached files they
have.

Implements the following methods from BrokerPlugin:
 - bool operator() (const ExecutionTarget&, const ExecutionTarget&) const;
   - Used for sorting targets, here the method is lessthan()
 - bool match(const ExecutionTarget& et) const;
   - Returns whether the target matches
 - void set(const JobDescription& _j) const;
   - Set the job description which is to be brokered

Invoke the broker with:

arcsub -b PythonBroker:ACIXBroker.ACIXBroker:CacheIndexURL

e.g.
arcsub -b PythonBroker:ACIXBroker.ACIXBroker:https://cacheindex.ndgf.org:6443/data/index

or by setting in client.conf:
[common]
brokername=PythonBroker
brokerarguments=ACIXBroker.ACIXBroker:https://cacheindex.ndgf.org:6443/data/index

The PYTHONPATH must contain the path to this file, with a default installation
this is /usr/share/arc/examples/PythonBroker

The log level of this broker can only be set in client.conf, because that is
the only way that verbosity is set in the UserConfig object, eg
verbosity=DEBUG

The default level is WARNING.
'''

import random
import httplib
import re
import logging

# Check if we can use json module (python >= 2.6 only)
try:
    import json
except ImportError:
    json = False

import arc

class ACIXBroker:
    def __init__(self, usercfg):
        '''
        Set up internal fields and get information from UserConfig.
        '''
        self.inputfiles = [] # list of remote input files to check
        self.cachelocations = {} # dict of files to cache locations (hostnames)
        self.targetranking = {} # dict of hostname to ranking (no of cached files)
        self.cacheindex = '' # URL of ACIX index service
        self.uc = usercfg # UserConfig object
        
        loglevel = 'WARNING'
        if usercfg.Verbosity():
            loglevel = usercfg.Verbosity().upper()
            # Map ARC to python log levels
            if loglevel == 'FATAL':
                loglevel = 'CRITICAL'
            if loglevel == 'VERBOSE':
                loglevel = 'INFO' 
            
        logging.basicConfig(format='%(levelname)s: ACIXBroker: %(message)s', level=getattr(logging, loglevel.upper()))
        
        brokerarg = usercfg.Broker()[1]
        if brokerarg.find(':') != -1:
            self.cacheindex = brokerarg[brokerarg.find(':') + 1:]
        logging.info('cache index: %s', self.cacheindex)
        
        # Check ACIX URL is valid
        (procotol, host, port, path) = self.splitURL(self.cacheindex)
        if not host or not path:
            logging.error('Invalid URL for ACIX index: %s', self.cacheindex)
            self.cacheindex = ''

    def match(self, target):
        '''
        Check the number of cache files at target. All targets which match the
        job description are acceptable even if no files are cached. We assume
        only one A-REX per hostname, so multiple interfaces at the same
        hostname are ignored.
        '''
        (procotol, host, port, path) = self.splitURL(target.ComputingEndpoint.URLString)
        
        if not host or host in self.targetranking:
            return True
        
        # First do generic matching
        if not arc.Broker.genericMatch(target, self.job, self.uc):
            return False
        
        cached = 0
        for file in self.cachelocations:
            if host in self.cachelocations[file]:
                cached += 1
                
        self.targetranking[host] = cached
        logging.debug('host: %s, cached files: %i', host, cached)
            
        # Accept target in all cases even if no files are cached
        return True

    def set(self, jobdescription):
        '''
        Extract the input files from the job description and call ACIX to find
        cached locations.
        '''
        self.job = jobdescription
        
        if not self.job or not self.cacheindex:
            return

        self.getInputFiles(self.job.DataStaging.InputFiles)
        
        if not self.inputfiles:
            return
        
        self.queryACIX(0)

    def lessthan(self, lhs, rhs):
        '''
        Used to sort targets
        '''
        (lprocotol, lhost, lport, lpath) = self.splitURL(lhs.ComputingEndpoint.URLString)
        (rprocotol, rhost, rport, rpath) = self.splitURL(rhs.ComputingEndpoint.URLString)
        
        if not lhost or not rhost or lhost not in self.targetranking or rhost not in self.targetranking:
            return random.randint(0, 1)
        
        return self.targetranking[lhost] > self.targetranking[rhost]

    # Internal methods
    
    def getInputFiles(self, inputfilelist):
        '''
        Extract input files and add to our list.
        '''
        for inputfile in inputfilelist:
            # remote input files only
            if inputfile.Sources and inputfile.Sources[0].Protocol() != 'file':
                # Some job desc languages allow multiple sources - for now choose the first
                url = inputfile.Sources[0]
                
                # Use the same string representation of URL as the cache code
                # does, including making sure LFC guids are used properly
                canonic_url = url.plainstr()
                if url.MetaDataOption("guid"):
                    canonic_url += ":guid=" + url.MetaDataOption("guid")
      
                logging.debug('input file: %s', canonic_url)
                self.inputfiles.append(canonic_url)

    
    def queryACIX(self, index):
        '''
        Call ACIX index to get cached locations of self.inputfiles[index:].
        It seems like ACIX has a limit of 64k character URLs, so if we exceed
        that then call recursively.
        '''
        maxACIXurl = 60000
        (procotol, host, port, path) = self.splitURL(self.cacheindex)
                
        # add URLs to path
        path += '?url=' + self.inputfiles[index]
        index += 1
        for file in self.inputfiles[index:]:
            path += ',' + file
            index += 1
            if len(path) > maxACIXurl and index != len(self.inputfiles):
                logging.debug('URL length (%i) for ACIX query exceeds maximum (%i), will call in batches', len(path), maxACIXurl)
                self.queryACIX(index)
                break
            
        conn = httplib.HTTPSConnection(host, port)
        try:
            conn.request('GET', path)
        except Exception, e:
            logging.error('Error connecting to service at %s: %s', host, str(e))
            return
        
        try:
            resp = conn.getresponse()
        except httplib.HTTPException, e:
            logging.error('Bad response from ACIX: %s', str(e))
            return
        
        logging.info('ACIX returned %s %s', resp.status, resp.reason)
        
        data = resp.read()
        conn.close()
        logging.debug('ACIX response: %s', data)
        
        if json:
            try:
                self.cachelocations.update(json.loads(data))
            except ValueError:
                logging.error('Unexpected response from ACIX: %s', data)
        else:
            # Using eval() is unsafe but it will only be used on old OSes
            # (like RHEL5). At least check response looks like a python dictionary
            if data[0] != '{' or data[-1] != '}':
                logging.error('Unexpected response from ACIX: %s', data)
            else:
                self.cachelocations.update(eval(data))

    def splitURL(self, url):
        """
        Split url into (protocol, host, port, path) and return this tuple.
        """
        match = re.match('(\w*)://([^/?#:]*):?(\d*)/?(.*)', url)
        if match is None:
            logging.warning('URL %s is malformed', url)
            return ('', '', 0, '')
        
        port_s = match.group(3)
        if port_s:
            port = int(port_s)
        else:
            port = None
            
        urltuple = (match.group(1), match.group(2), port, '/'+match.group(4))
        return urltuple