Package pypln :: Module manager
[hide private]

Source Code for Module pypln.manager

  1  #!/usr/bin/env python 
  2  # coding: utf-8 
  3   
  4  import uuid 
  5  from Queue import Queue 
  6  from logging import Logger, NullHandler 
  7  import zmq 
  8   
  9   
10 -class Manager(object):
11 #TODO: add another queue for processing jobs 12 #TODO: add a timeout for processing jobs (default or get it from client) 13 #TODO: if processing job have timeout, remove from processing queue, add 14 # again in job_queue and announce pending job
15 - def __init__(self, config, logger=None, logger_name='Manager'):
16 self.job_queue = Queue() 17 self.pending_job_ids = [] 18 self.context = zmq.Context() 19 self.config = config 20 if logger is None: 21 self.logger = Logger(logger_name) 22 self.logger.addHandler(NullHandler()) 23 else: 24 self.logger = logger
25
26 - def bind(self, api_host_port, broadcast_host_port):
27 self.api_host_port = api_host_port 28 self.broadcast_host_port = broadcast_host_port 29 30 self.api = self.context.socket(zmq.REP) 31 self.broadcast = self.context.socket(zmq.PUB) 32 33 self.api.bind('tcp://{}:{}'.format(*self.api_host_port)) 34 self.broadcast.bind('tcp://{}:{}'.format(*self.broadcast_host_port))
35
36 - def close_sockets(self):
37 self.api.close() 38 self.broadcast.close()
39
40 - def get_request(self):
41 message = self.api.recv_json() 42 self.logger.info('[API] Request: {}'.format(message)) 43 return message
44
45 - def reply(self, message):
46 self.api.send_json(message) 47 self.logger.info('[API] Reply: {}'.format(message))
48
49 - def run(self):
50 self.logger.info('Entering main loop') 51 try: 52 while True: 53 message = self.get_request() 54 if 'command' not in message: 55 self.reply({'answer': 'undefined command'}) 56 continue 57 command = message['command'] 58 if command == 'get configuration': 59 self.reply(self.config) 60 elif command == 'add job': 61 message['job id'] = uuid.uuid4().hex 62 del message['command'] 63 self.job_queue.put(message) 64 self.pending_job_ids.append(message['job id']) 65 self.reply({'answer': 'job accepted', 66 'job id': message['job id']}) 67 self.broadcast.send('new job') 68 self.logger.info('[Broadcast] Sent "new job"') 69 elif command == 'get job': 70 if self.job_queue.empty(): 71 self.reply({'worker': None}) 72 else: 73 job = self.job_queue.get() 74 self.reply(job) 75 elif command == 'job finished': 76 if 'job id' not in message or 'duration' not in message: 77 self.reply({'answer': 'syntax error'}) 78 else: 79 job_id = message['job id'] 80 if job_id not in self.pending_job_ids: 81 self.reply({'answer': 'unknown job id'}) 82 else: 83 self.pending_job_ids.remove(job_id) 84 self.reply({'answer': 'good job!'}) 85 new_message = 'job finished: {} duration: {}'\ 86 .format(job_id, message['duration']) 87 self.broadcast.send(new_message) 88 self.logger.info('[Broadcast] Sent "new job"') 89 else: 90 self.reply({'answer': 'unknown command'}) 91 except KeyboardInterrupt: 92 self.close_sockets()
93
94 -def main():
95 from logging import Logger, StreamHandler, Formatter 96 from sys import stdout 97 98 99 logger = Logger('Manager') 100 handler = StreamHandler(stdout) 101 formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - ' 102 '%(message)s') 103 handler.setFormatter(formatter) 104 logger.addHandler(handler) 105 api_host_port = ('*', 5555) 106 broadcast_host_port = ('*', 5556) 107 config = {'db': {'host': 'localhost', 'port': 27017, 108 'database': 'pypln', 109 'collection': 'documents', 110 'gridfs collection': 'files', 111 'monitoring collection': 'monitoring'}, 112 'monitoring interval': 60,} 113 manager = Manager(config, logger) 114 manager.bind(api_host_port, broadcast_host_port) 115 manager.run()
116 117 118 if __name__ == '__main__': 119 main() 120