kernel.py 2.55 KB
Newer Older
Jerico Moeyersons's avatar
Jerico Moeyersons committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
__author__ = 'Jerico Moeyersons'

import http
import logging
import sys
import os
import requests
import concurrent.futures

WORKER_URL = 'http://worker-'

class Kernel(object):

    def __init__(self):
        logging.info('Initializing API Kernel...')
        self.workers = os.getenv('WORKERS', 0)
        self.next_worker = 0
        if self.workers == 0:
            logging.error('No workers defined')
            sys.exit(1)
        else:
            self.next_worker = 1
        self.active = False

    def start_worker(self, worker=None):
        if worker is None:
            url = WORKER_URL + self.next_worker + "/start"
            self.__set_next_worker()
        else:
            # TODO: check if worker exists
            url = WORKER_URL + worker + "/start"
        logging.info('Starting working ' + str(self.next_worker))

        try:
            r = requests.get(url)
            status_code = r.status_code

            if status_code != 200:
                logging.warning("Worker returned a non 200 HTTP status code")
                return False
            else:
                logging.info("Worker finished successfully")
        except requests.ConnectionError:
            logging.error('Connection error in function start worker')
            return False
        except requests.Timeout:
            logging.error('Timeout error in function start worker')
            return False
        return True

    def multiple_start(self, number):
        if not self.active:
            if number < self.workers:
                with concurrent.futures.ThreadPoolExecutor(max_workers=number) as executor:
                    executor.submit(self.start_worker, self)
                '''for i in range(1, number+1):
                    thread = Thread(target = start_worker, args = (i, ))
                    threads.append(thread)
                    thread.start()
                for index, thread in enumerate(threads):
                    logging.info("Main    : before joining thread %d.", index)
                    thread.join()
                    logging.info("Main    : thread %d done", index)'''
            else:
               with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
                    executor.submit(self.start_worker, self)
            return "All workers started and finished..."
        else:
            return "Another job is running, please try again later..."

    def __set_next_worker(self):
Jerico Moeyersons's avatar
Jerico Moeyersons committed
72
        if self.next_worker == self.workers:
Jerico Moeyersons's avatar
Jerico Moeyersons committed
73
74
75
            self.next_worker = 1
        else:
            self.next_worker += 1