/usr/share/pyshared/pebl/taskcontroller/ec2ipy1.py is in python-pebl 1.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | import sys, os, time
import ConfigParser
from itertools import groupby
try:
import ipython1.kernel.api as kernel
except ImportError:
kernel = False
import boto
# options required in the config file
required_config_options = [
('access_key', 'Please specify your AWS access key ID.'),
('secret_access_key', 'Please specify your AWS secret access key.'),
('ami', 'Please specify the AMI to use for the controller and engines.'),
('key_name', 'Please specify the key_name to use with the EC2 instances.'),
('credential', 'Please specify the ssh credential file.'),
]
class EC2Cluster:
"""
* starts desired number of EC2 instances
* starts controller on first instance
* starts engines on all other instances
* includes methods for:
* creating and terminating cluster
* creating RemoteController and TaskController from cluster
states:
* aws_connected: have connection to AWS
* instances_reserved
* instances_running
* cluster_ready: instances are running and IPython1 controller/engines setup
"""
def __init__(self, configfile, instances=[]):
self.config = self._check_config(configfile)
self.conn = boto.connect_ec2(
self.config['access_key'],
self.config['secret_access_key']
)
self._state = ['aws_connected']
self.instances = instances if instances else []
def _check_config(self, configfile):
configp = ConfigParser.SafeConfigParser()
configp.read(configfile)
config = dict(configp.items('EC2'))
for key, error in required_config_options:
if key not in config:
print error
sys.exit(1)
return config
def _wait_till_instances_in_state(self, waitingfor, resulting_state, sleepfor=10):
print "Waiting till all instances are %s. Will check every %s seconds." % (waitingfor, sleepfor)
print "Hit Ctrl-C to stop waiting."
while True:
statuses = [i.update() for i in self.instances]
if all(status == waitingfor for status in statuses):
print "All instances %s" % waitingfor
self._state.append(resulting_state)
return
else:
print "Not all instances are %s" % waitingfor
statuses.sort()
for statustype, statuses in groupby(statuses, lambda x: x):
print "\t%s: %s instances" % (statustype, len(list(statuses)))
time.sleep(sleepfor)
def wait_till_instances_running(self, sleepfor=10):
self._wait_till_instances_in_state('running', 'instances_running', sleepfor)
def wait_till_instances_terminated(self, sleepfor=10):
self._wait_till_instances_in_state('terminated', 'instances_terminated', sleepfor)
def create_instances(self, min_count=1, max_count=None):
# if max not specified, it's the same as the min
max_count = max_count or min_count
# reserve instances
print "Reserving EC2 instances."
self.reservation = self.conn.run_instances(
self.config['ami'],
min_count, max_count,
self.config['key_name'],
)
self._state.append('instances_reserved')
self.instances = self.reservation.instances
self.wait_till_instances_running()
print "Waiting for firewall ports to open up (10 secs)"
time.sleep(10)
print "Trying to connect to worker nodes using ssh"
self._check_ssh_connection()
def _check_ssh_connection(self):
instances = [i for i in self.instances]
while instances:
for i in instances:
time.sleep(1) # so we're not bombarding the servers
if 0 == self.remote(i, "ls /"):
instances.remove(i)
def start_ipython1(self, engine_on_controller=False):
if not 'instances_running' in self._state:
print "Not all instances are running."
return False
if not hasattr(self, 'instances'):
print "Create EC2 instances before starting cluster."
return False
print "Starting ipython1 controller/engines on running instances"
# redirect stdin, stdout and stderr on remote processes so ssh terminates.
# we could use 'ssh -f' but that will fork ssh in the background
# and on large clusters that could mean many ssh background procs
cmd_postfix = "</dev/null >&0 2>&0 &"
# run ipcontroller on the first controller instance
controller_ip = self.instances[0].public_dns_name
controller_port = kernel.defaultRemoteController[1]
print "Starting controller on %s" % controller_ip
self.remote(
host = self.instances[0],
cmd = "nohup /usr/local/bin/ipcontroller -l /mnt/ipcontroller_ %s" % cmd_postfix,
)
print "Waiting for controller to start (6 secs)"
time.sleep(6)
# run engine on the same instance as controller?
engine_instances = self.instances[1:] if not engine_on_controller else self.instances
# run ipengine on selected instances
for inst in engine_instances:
print "Starting engine on %s" % inst.public_dns_name
self.remote(
host = inst,
cmd = "nohup /usr/local/bin/ipengine --controller-ip=%s -l /mnt/ipengine_ %s" % (controller_ip, cmd_postfix),
)
time.sleep(1) # so we don't bombard the controller..
print "-"*70
print "Ipython1 controller running on %s:%s" % (controller_ip, controller_port)
print "Type the following to login to controller:"
print "ssh -i %s root@%s" % (self.config['credential'], controller_ip)
self._state.append('ipython1_running')
time.sleep(6) # waiting for cluster to be setup
return True
def reboot_instances(self):
print "Rebooting all instances"
for inst in self.instances:
inst.reboot()
self._state = ['instances_reserved']
self.wait_till_instances_running()
def terminate_instances(self):
for i in self.instances:
i.stop()
self.wait_till_instances_terminated()
def authorize_access_to_controller(self, from_ip):
ports = [kernel.defaultRemoteController[1], kernel.defaultTaskController[1]]
for port in ports:
print "Authorizing access for group default for port %s from IP %s" % (port, from_ip)
self.conn.authorize_security_group('default', ip_protocol='tcp', from_port=port,
to_port=port, cidr_ip=from_ip)
@property
def remote_controller(self):
return kernel.RemoteController((
self.instances[0].public_dns_name,
kernel.defaultRemoteController[1]
))
@property
def task_controller(self):
return kernel.TaskController((
self.instances[0].public_dns_name,
kernel.defaultTaskController[1]
))
@property
def task_controller_url(self):
return "%s:%s" % (self.instances[0].public_dns_name,
kernel.defaultTaskController[1])
@property
def remote_controller_url(self):
return "%s:%s" % (self.instances[0].public_dns_name,
kernel.defaultRemoteController[1])
# from Peter Skomoroch's ec2-mpi-config.py (see http://datawrangling.com)
def remote(self, host, cmd='scp', src=None, dest=None, test=False):
""" Run a command on remote machine (or copy files) using ssh.
@param host: boto ec2 instance, ip address or dns name
"""
d = {
'cmd':cmd,
'src':src,
'dest':dest,
'host':getattr(host, 'public_dns_name', str(host)),
'switches': ''
}
d['switches'] += " -i %s " % self.config['credential']
if cmd == 'scp':
template = '%(cmd)s %(switches)s -o "StrictHostKeyChecking no" %(src)s root@%(host)s:%(dest)s'
else:
template = 'ssh %(switches)s -o "StrictHostKeyChecking no" root@%(host)s "%(cmd)s" '
cmdline = template % d
print "Trying: ", cmdline
if not test:
return os.system(cmdline)
def remote_all(self, cmd='scp', src=None, dest=None, test=False):
for i in self.instances:
self.remote(i.public_dns_name, cmd, src, dest, test)
def tofile(self, filename):
f = file(filename, 'w')
f.writelines(inst.id + "\n" for inst in self.instances)
f.close()
def fromfile(self, filename):
def _instance(id):
inst = boto.ec2.instance.Instance(self.conn)
inst.id = id
inst.update()
return inst
self.instances = [_instance(id[:-1]) for id in file(filename).readlines()]
# USAGE
#ec2 = EC2Cluster()
#ec2.create_instances()
#ec2.start_ipython1()
#tc = ec2.task_controller
#ec2.terminate_instances()
|