Package pypln :: Module broker
[hide private]

Source Code for Module pypln.broker

  1  #!/usr/bin/env python 
  2  # coding: utf-8 
  3   
  4  from multiprocessing import Process, Pipe, cpu_count 
  5  from os import kill, getpid 
  6  from time import sleep, time 
  7  from signal import SIGKILL 
  8  import zmq 
  9  from pymongo import Connection 
 10  from gridfs import GridFS 
 11  from bson.objectid import ObjectId 
 12  import pypln.workers as workers 
 13  from pypln.client import ManagerClient 
 14  from pypln.utils import get_host_info, get_outgoing_ip, get_process_info 
 15   
 16   
17 -class Job(object):
18 - def __init__(self, message):
19 self.job_id = message['job id'] 20 self.document_id = message['document'] 21 self.worker = message['worker'] 22 self.parent_connection = None 23 self.child_connection = None 24 self.start_time = None 25 self.process = None 26 self.pid = None
27
28 - def __repr__(self):
29 return ('<Job(worker={}, document_id={}, job_id={}, pid={}, ' 30 'start_time={})>'.format(self.worker, self.document_id, 31 self.job_id, self.pid, 32 self.start_time))
33
34 - def start(self):
35 parent_connection, child_connection = Pipe() 36 self.parent_connection = parent_connection 37 self.child_connection = child_connection 38 #TODO: is there any way to *do not* connect stdout/stderr? 39 self.process = Process(target=workers.wrapper, args=(child_connection, )) 40 self.start_time = time() 41 self.process.start() 42 self.pid = self.process.pid
43
44 - def send(self, message):
45 self.parent_connection.send(message)
46
47 - def get_result(self):
48 return self.parent_connection.recv()
49
50 - def finished(self):
51 return self.parent_connection.poll()
52
53 - def end(self):
54 self.parent_connection.close() 55 self.child_connection.close() 56 self.process.join()
57
58 -class ManagerBroker(ManagerClient):
59 #TODO: should use pypln.stores instead of pymongo directly 60 #TODO: use log4mongo
61 - def __init__(self, api_host_port, broadcast_host_port, logger=None, 62 logger_name='ManagerBroker', poll_time=50):
63 ManagerClient.__init__(self, logger=logger, logger_name=logger_name) 64 self.api_host_port = api_host_port 65 self.broadcast_host_port = broadcast_host_port 66 self.jobs = [] 67 self.max_jobs = cpu_count() 68 self.poll_time = poll_time 69 self.last_time_saved_monitoring_information = 0 70 self.logger.info('Broker started')
71
72 - def request(self, message):
73 self.manager_api.send_json(message) 74 self.logger.info('[API] Request to manager: {}'.format(message))
75
76 - def get_reply(self):
77 message = self.manager_api.recv_json() 78 self.logger.info('[API] Reply from manager: {}'.format(message)) 79 return message
80
81 - def connect_to_database(self):
82 conf = self.config['db'] 83 self.mongo_connection = Connection(conf['host'], conf['port']) 84 self.db = self.mongo_connection[conf['database']] 85 if 'username' in conf and 'password' in conf and conf['username'] and \ 86 conf['password']: 87 self.db.authenticate(conf['username'], conf['password']) 88 self.collection = self.db[conf['collection']] 89 self.monitoring_collection = self.db[conf['monitoring collection']] 90 self.gridfs = GridFS(self.db, conf['gridfs collection'])
91
92 - def get_configuration(self):
93 self.request({'command': 'get configuration'}) 94 self.config = self.get_reply()
95
96 - def connect_to_manager(self):
97 self.logger.info('Trying to connect to manager...') 98 super(ManagerBroker, self).connect(self.api_host_port, 99 self.broadcast_host_port)
100
102 #TODO: should we send average measures insted of instant measures of 103 # some measured variables? 104 ip = get_outgoing_ip(self.api_host_port) 105 host_info = get_host_info() 106 host_info['network']['cluster ip'] = ip 107 broker_process = get_process_info(getpid()) 108 broker_process['type'] = 'broker' 109 broker_process['active workers'] = len(self.jobs) 110 processes = [broker_process] 111 for job in self.jobs: 112 process = get_process_info(job.pid) 113 if process is not None: # worker process not finished yet 114 process['worker'] = job.worker 115 process['document id'] = ObjectId(job.document_id) 116 process['type'] = 'worker' 117 processes.append(process) 118 data = {'host': host_info, 'processes': processes} 119 self.monitoring_collection.insert(data) 120 self.last_time_saved_monitoring_information = time() 121 self.logger.info('Saved monitoring information in MongoDB') 122 self.logger.debug(' Information: {}'.format(data))
123
124 - def start(self):
125 self.started_at = time() 126 self.connect_to_manager() 127 self.manager_broadcast.setsockopt(zmq.SUBSCRIBE, 'new job') 128 self.get_configuration() 129 self.connect_to_database() 130 self.save_monitoring_information() 131 self.run()
132
133 - def start_job(self, job):
134 worker_input = workers.available[job.worker]['from'] 135 data = {} 136 if worker_input == 'document': 137 required_fields = workers.available[job.worker]['requires'] 138 fields = set(['_id', 'meta'] + required_fields) 139 data = self.collection.find({'_id': ObjectId(job.document_id)}, 140 fields=fields)[0] 141 elif worker_input == 'gridfs-file': 142 file_data = self.gridfs.get(ObjectId(job.document_id)) 143 data = {'_id': ObjectId(job.document_id), 144 'length': file_data.length, 145 'md5': file_data.md5, 146 'name': file_data.name, 147 'upload_date': file_data.upload_date, 148 'contents': file_data.read()} 149 #TODO: what if input is a corpus? 150 151 job.start() 152 job.send((job.worker, data)) 153 self.logger.debug('Started worker "{}" for document "{}" (PID: {})'\ 154 .format(job.worker, job.document_id, 155 job.pid))
156
157 - def get_a_job(self):
158 for i in range(self.max_jobs - len(self.jobs)): 159 self.request({'command': 'get job'}) 160 message = self.get_reply() 161 #TODO: if manager stops and doesn't answer, broker will stop here 162 if 'worker' in message and message['worker'] is None: 163 break # Don't have a job, stop asking 164 elif 'worker' in message and 'document' in message and \ 165 message['worker'] in workers.available: 166 job = Job(message) 167 self.jobs.append(job) 168 self.start_job(job) 169 else: 170 self.logger.info('Ignoring malformed job: {}'.format(message))
171 #TODO: send a 'rejecting job' request to Manager 172
173 - def manager_has_job(self):
174 if self.manager_broadcast.poll(self.poll_time): 175 message = self.manager_broadcast.recv() 176 self.logger.info('[Broadcast] Received from manager: {}'\ 177 .format(message)) 178 #TODO: what if broker subscribe to another thing? 179 return True 180 else: 181 return False
182
183 - def finished_jobs(self):
184 return [job for job in self.jobs if job.finished()]
185
186 - def full_of_jobs(self):
187 return len(self.jobs) >= self.max_jobs
188
189 - def kill_processes(self):
190 for job in self.jobs: 191 try: 192 kill(job.pid, SIGKILL) 193 except OSError: 194 pass
195
197 time_difference = time() - self.last_time_saved_monitoring_information 198 return time_difference >= self.config['monitoring interval']
199
200 - def run(self):
201 self.logger.info('Entering main loop') 202 try: 203 self.get_a_job() 204 while True: 205 if self.should_save_monitoring_information_now(): 206 self.save_monitoring_information() 207 if not self.full_of_jobs() and self.manager_has_job(): 208 self.get_a_job() 209 for job in self.finished_jobs(): 210 result = job.get_result() 211 job.end() 212 end_time = time() 213 self.logger.info('Job finished: {}'.format(job)) 214 update_keys = workers.available[job.worker]['provides'] 215 for key in result.keys(): 216 if key not in update_keys: 217 del result[key] 218 worker_input = workers.available[job.worker]['from'] 219 worker_output = workers.available[job.worker]['to'] 220 if worker_input == worker_output == 'document': 221 update_data = [{'_id': ObjectId(job.document_id)}, 222 {'$set': result}] 223 self.collection.update(*update_data) 224 elif worker_input == 'gridfs-file' and \ 225 worker_output == 'document': 226 data = {'_id': ObjectId(job.document_id)} 227 data.update(result) 228 self.collection.insert(data) 229 #TODO: safe=True 230 #TODO: what if we have other combinations of input/output? 231 self.request({'command': 'job finished', 232 'job id': job.job_id, 233 'duration': end_time - job.start_time}) 234 result = self.get_reply() 235 self.jobs.remove(job) 236 self.get_a_job() 237 except KeyboardInterrupt: 238 self.logger.info('Got SIGNINT (KeyboardInterrupt), exiting.') 239 self.close_sockets() 240 self.kill_processes()
241
242 -def main():
243 from logging import Logger, StreamHandler, Formatter 244 from sys import stdout 245 246 247 logger = Logger('ManagerBroker') 248 handler = StreamHandler(stdout) 249 formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - ' 250 '%(message)s') 251 handler.setFormatter(formatter) 252 logger.addHandler(handler) 253 broker = ManagerBroker(('localhost', 5555), ('localhost', 5556), 254 logger=logger) 255 broker.start()
256 257 258 if __name__ == '__main__': 259 main() 260