1
2
3
4 import uuid
5 from Queue import Queue
6 from logging import Logger, NullHandler
7 import zmq
8
9
11
12
13
14
15 - def __init__(self, config, logger=None, logger_name='Manager'):
16 self.job_queue = Queue()
17 self.pending_job_ids = []
18 self.context = zmq.Context()
19 self.config = config
20 if logger is None:
21 self.logger = Logger(logger_name)
22 self.logger.addHandler(NullHandler())
23 else:
24 self.logger = logger
25
26 - def bind(self, api_host_port, broadcast_host_port):
27 self.api_host_port = api_host_port
28 self.broadcast_host_port = broadcast_host_port
29
30 self.api = self.context.socket(zmq.REP)
31 self.broadcast = self.context.socket(zmq.PUB)
32
33 self.api.bind('tcp://{}:{}'.format(*self.api_host_port))
34 self.broadcast.bind('tcp://{}:{}'.format(*self.broadcast_host_port))
35
37 self.api.close()
38 self.broadcast.close()
39
41 message = self.api.recv_json()
42 self.logger.info('[API] Request: {}'.format(message))
43 return message
44
45 - def reply(self, message):
46 self.api.send_json(message)
47 self.logger.info('[API] Reply: {}'.format(message))
48
50 self.logger.info('Entering main loop')
51 try:
52 while True:
53 message = self.get_request()
54 if 'command' not in message:
55 self.reply({'answer': 'undefined command'})
56 continue
57 command = message['command']
58 if command == 'get configuration':
59 self.reply(self.config)
60 elif command == 'add job':
61 message['job id'] = uuid.uuid4().hex
62 del message['command']
63 self.job_queue.put(message)
64 self.pending_job_ids.append(message['job id'])
65 self.reply({'answer': 'job accepted',
66 'job id': message['job id']})
67 self.broadcast.send('new job')
68 self.logger.info('[Broadcast] Sent "new job"')
69 elif command == 'get job':
70 if self.job_queue.empty():
71 self.reply({'worker': None})
72 else:
73 job = self.job_queue.get()
74 self.reply(job)
75 elif command == 'job finished':
76 if 'job id' not in message or 'duration' not in message:
77 self.reply({'answer': 'syntax error'})
78 else:
79 job_id = message['job id']
80 if job_id not in self.pending_job_ids:
81 self.reply({'answer': 'unknown job id'})
82 else:
83 self.pending_job_ids.remove(job_id)
84 self.reply({'answer': 'good job!'})
85 new_message = 'job finished: {} duration: {}'\
86 .format(job_id, message['duration'])
87 self.broadcast.send(new_message)
88 self.logger.info('[Broadcast] Sent "new job"')
89 else:
90 self.reply({'answer': 'unknown command'})
91 except KeyboardInterrupt:
92 self.close_sockets()
93
95 from logging import Logger, StreamHandler, Formatter
96 from sys import stdout
97
98
99 logger = Logger('Manager')
100 handler = StreamHandler(stdout)
101 formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - '
102 '%(message)s')
103 handler.setFormatter(formatter)
104 logger.addHandler(handler)
105 api_host_port = ('*', 5555)
106 broadcast_host_port = ('*', 5556)
107 config = {'db': {'host': 'localhost', 'port': 27017,
108 'database': 'pypln',
109 'collection': 'documents',
110 'gridfs collection': 'files',
111 'monitoring collection': 'monitoring'},
112 'monitoring interval': 60,}
113 manager = Manager(config, logger)
114 manager.bind(api_host_port, broadcast_host_port)
115 manager.run()
116
117
118 if __name__ == '__main__':
119 main()
120