1
2
3
4 from multiprocessing import Process, Pipe, cpu_count
5 from os import kill, getpid
6 from time import sleep, time
7 from signal import SIGKILL
8 import zmq
9 from pymongo import Connection
10 from gridfs import GridFS
11 from bson.objectid import ObjectId
12 import pypln.workers as workers
13 from pypln.client import ManagerClient
14 from pypln.utils import get_host_info, get_outgoing_ip, get_process_info
15
16
19 self.job_id = message['job id']
20 self.document_id = message['document']
21 self.worker = message['worker']
22 self.parent_connection = None
23 self.child_connection = None
24 self.start_time = None
25 self.process = None
26 self.pid = None
27
29 return ('<Job(worker={}, document_id={}, job_id={}, pid={}, '
30 'start_time={})>'.format(self.worker, self.document_id,
31 self.job_id, self.pid,
32 self.start_time))
33
35 parent_connection, child_connection = Pipe()
36 self.parent_connection = parent_connection
37 self.child_connection = child_connection
38
39 self.process = Process(target=workers.wrapper, args=(child_connection, ))
40 self.start_time = time()
41 self.process.start()
42 self.pid = self.process.pid
43
44 - def send(self, message):
45 self.parent_connection.send(message)
46
48 return self.parent_connection.recv()
49
51 return self.parent_connection.poll()
52
54 self.parent_connection.close()
55 self.child_connection.close()
56 self.process.join()
57
59
60
61 - def __init__(self, api_host_port, broadcast_host_port, logger=None,
62 logger_name='ManagerBroker', poll_time=50):
63 ManagerClient.__init__(self, logger=logger, logger_name=logger_name)
64 self.api_host_port = api_host_port
65 self.broadcast_host_port = broadcast_host_port
66 self.jobs = []
67 self.max_jobs = cpu_count()
68 self.poll_time = poll_time
69 self.last_time_saved_monitoring_information = 0
70 self.logger.info('Broker started')
71
73 self.manager_api.send_json(message)
74 self.logger.info('[API] Request to manager: {}'.format(message))
75
77 message = self.manager_api.recv_json()
78 self.logger.info('[API] Reply from manager: {}'.format(message))
79 return message
80
82 conf = self.config['db']
83 self.mongo_connection = Connection(conf['host'], conf['port'])
84 self.db = self.mongo_connection[conf['database']]
85 if 'username' in conf and 'password' in conf and conf['username'] and \
86 conf['password']:
87 self.db.authenticate(conf['username'], conf['password'])
88 self.collection = self.db[conf['collection']]
89 self.monitoring_collection = self.db[conf['monitoring collection']]
90 self.gridfs = GridFS(self.db, conf['gridfs collection'])
91
93 self.request({'command': 'get configuration'})
94 self.config = self.get_reply()
95
97 self.logger.info('Trying to connect to manager...')
98 super(ManagerBroker, self).connect(self.api_host_port,
99 self.broadcast_host_port)
100
123
132
134 worker_input = workers.available[job.worker]['from']
135 data = {}
136 if worker_input == 'document':
137 required_fields = workers.available[job.worker]['requires']
138 fields = set(['_id', 'meta'] + required_fields)
139 data = self.collection.find({'_id': ObjectId(job.document_id)},
140 fields=fields)[0]
141 elif worker_input == 'gridfs-file':
142 file_data = self.gridfs.get(ObjectId(job.document_id))
143 data = {'_id': ObjectId(job.document_id),
144 'length': file_data.length,
145 'md5': file_data.md5,
146 'name': file_data.name,
147 'upload_date': file_data.upload_date,
148 'contents': file_data.read()}
149
150
151 job.start()
152 job.send((job.worker, data))
153 self.logger.debug('Started worker "{}" for document "{}" (PID: {})'\
154 .format(job.worker, job.document_id,
155 job.pid))
156
158 for i in range(self.max_jobs - len(self.jobs)):
159 self.request({'command': 'get job'})
160 message = self.get_reply()
161
162 if 'worker' in message and message['worker'] is None:
163 break
164 elif 'worker' in message and 'document' in message and \
165 message['worker'] in workers.available:
166 job = Job(message)
167 self.jobs.append(job)
168 self.start_job(job)
169 else:
170 self.logger.info('Ignoring malformed job: {}'.format(message))
171
172
174 if self.manager_broadcast.poll(self.poll_time):
175 message = self.manager_broadcast.recv()
176 self.logger.info('[Broadcast] Received from manager: {}'\
177 .format(message))
178
179 return True
180 else:
181 return False
182
184 return [job for job in self.jobs if job.finished()]
185
187 return len(self.jobs) >= self.max_jobs
188
190 for job in self.jobs:
191 try:
192 kill(job.pid, SIGKILL)
193 except OSError:
194 pass
195
199
201 self.logger.info('Entering main loop')
202 try:
203 self.get_a_job()
204 while True:
205 if self.should_save_monitoring_information_now():
206 self.save_monitoring_information()
207 if not self.full_of_jobs() and self.manager_has_job():
208 self.get_a_job()
209 for job in self.finished_jobs():
210 result = job.get_result()
211 job.end()
212 end_time = time()
213 self.logger.info('Job finished: {}'.format(job))
214 update_keys = workers.available[job.worker]['provides']
215 for key in result.keys():
216 if key not in update_keys:
217 del result[key]
218 worker_input = workers.available[job.worker]['from']
219 worker_output = workers.available[job.worker]['to']
220 if worker_input == worker_output == 'document':
221 update_data = [{'_id': ObjectId(job.document_id)},
222 {'$set': result}]
223 self.collection.update(*update_data)
224 elif worker_input == 'gridfs-file' and \
225 worker_output == 'document':
226 data = {'_id': ObjectId(job.document_id)}
227 data.update(result)
228 self.collection.insert(data)
229
230
231 self.request({'command': 'job finished',
232 'job id': job.job_id,
233 'duration': end_time - job.start_time})
234 result = self.get_reply()
235 self.jobs.remove(job)
236 self.get_a_job()
237 except KeyboardInterrupt:
238 self.logger.info('Got SIGNINT (KeyboardInterrupt), exiting.')
239 self.close_sockets()
240 self.kill_processes()
241
243 from logging import Logger, StreamHandler, Formatter
244 from sys import stdout
245
246
247 logger = Logger('ManagerBroker')
248 handler = StreamHandler(stdout)
249 formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - '
250 '%(message)s')
251 handler.setFormatter(formatter)
252 logger.addHandler(handler)
253 broker = ManagerBroker(('localhost', 5555), ('localhost', 5556),
254 logger=logger)
255 broker.start()
256
257
258 if __name__ == '__main__':
259 main()
260