From 7920e6760745b6fee64da39121a84a72e6d1130b Mon Sep 17 00:00:00 2001 From: Winni <winnus@posteo.de> Date: Tue, 20 Jun 2023 18:47:13 +0200 Subject: [PATCH] v0.02 mpi worker implementation, local only one process possible now to speed up computation on simple objective functions --- src/PyGMA.ipynb | 277 +++++++++++++++++++++++++++++++++++++++++++++- src/PyGMA.py | 40 ++++--- src/config.py | 20 +++- src/controller.py | 183 +++++++++++++++++++++++------- src/mpi_tags.py | 12 ++ src/mpi_test.py | 261 +++++++++++++++++++++++++++++++++++++------ src/mpi_worker.py | 53 +++++++++ 7 files changed, 755 insertions(+), 91 deletions(-) create mode 100644 src/mpi_tags.py create mode 100644 src/mpi_worker.py diff --git a/src/PyGMA.ipynb b/src/PyGMA.ipynb index c45ad58..4c03386 100755 --- a/src/PyGMA.ipynb +++ b/src/PyGMA.ipynb @@ -2673,9 +2673,284 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "3a0db8f4-ce53-443c-b27e-202d704a29a1", "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[9.5784958e-26, 3.0736081e-41, 0.0000000e+00],\n", + " [0.0000000e+00, 1.3916017e-13, 4.5758000e-41],\n", + " [2.8914486e-15, 4.5758000e-41, 3.4432028e-13],\n", + " [4.5758000e-41, 2.8382549e-15, 4.5758000e-41]], dtype=float32)" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import numpy as np\n", + "np.empty([4, 3], dtype='f')" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "80db3552-187e-48eb-9fc7-b107edfc962b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "array([ 0, -1073741824, 257261567, 32767], dtype=int32)" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "np.empty([4], dtype='i')" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "3288567d-ac49-4e31-a400-edf3b02248d9", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "num_genomes = 3\n", + "genome = np.array([0, 1, 1, 1, 0, 0], dtype=np.bool_)\n", + "worker_tuples = np.array([[i, i, i, genome] for i in range(num_genomes)], dtype=object)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "4189934b-cbae-4674-a526-890d5d0548e9", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[0, 0, 0, array([False, True, True, True, False, False])],\n", + " [1, 1, 1, array([False, True, True, True, False, False])],\n", + " [2, 2, 2, array([False, True, True, True, False, False])]],\n", + " dtype=object)" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "worker_tuples" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "57165607-8b5e-4059-b62c-150a658d8709", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "ename": "SyntaxError", + "evalue": "Missing parentheses in call to 'print'. Did you mean print(...)? (1087306869.py, line 1)", + "output_type": "error", + "traceback": [ + "\u001b[0;36m Cell \u001b[0;32mIn[13], line 1\u001b[0;36m\u001b[0m\n\u001b[0;31m print 'lil'\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mSyntaxError\u001b[0m\u001b[0;31m:\u001b[0m Missing parentheses in call to 'print'. Did you mean print(...)?\n" + ] + } + ], + "source": [ + "print 'lil'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6d547ee-4143-4a69-b645-b2b795b1d199", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "43f2eaf3-e1c2-414d-bb94-f5d2b336c7ba", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from enum import IntEnum\n", + "class mpi_tags(IntEnum):\n", + " CONTINUE_PROCESSING = 10\n", + " DATA = 3\n", + " EXIT = 6" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "ec1403aa-a5c2-470d-a186-1b68bdc5200e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "10" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mpi_tags.CONTINUE_PROCESSING.value" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "1861c8d2-14cf-4e81-9be2-bbe3ef1ec922", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "num_genomes = 3\n", + "genome = np.array([0, 1, 1, 1, 0, 0], dtype=np.bool_)\n", + "worker_tuples = [[i, i, i, genome] for i in range(num_genomes)]" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "ac0c8aa9-6fb5-484e-9979-e469b3bcd52b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "[2, 2, 2, array([False, True, True, True, False, False])]" + ] + }, + "execution_count": 33, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "worker_tuples.pop()" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "77855c83-f33d-45f3-92c9-d1b18c6dc3ba", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[1, 1, 1, array([False, True, True, True, False, False])]\n", + "[0, 0, 0, array([False, True, True, True, False, False])]\n" + ] + } + ], + "source": [ + "while worker_tuples:\n", + " data = worker_tuples.pop()\n", + " print(data)\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "7fc1bf54-6df8-416e-8d9f-a44a174cf626", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "worker_tuples" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "id": "673645d0-3c39-4182-a771-165e1751b0a1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from mpi4py import MPI" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "id": "757216d5-ed47-4248-b861-0376352ace2f", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "<mpi4py.MPI.Intracomm at 0x7f8de69202d0>" + ] + }, + "execution_count": 49, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "MPI.COMM_WORLD" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9612635-aaf9-4e3b-a1c2-828bab0234fc", + "metadata": {}, "outputs": [], "source": [] } diff --git a/src/PyGMA.py b/src/PyGMA.py index ffbad2b..4d9e544 100644 --- a/src/PyGMA.py +++ b/src/PyGMA.py @@ -12,13 +12,16 @@ def bool2int(x): return r -def mpi_worker(): - # IMP: - # logic for the mpi worker +def mpi_worker(config): + # instantiate the worker + from mpi_worker import MPI_worker + worker = MPI_worker(config=config, sleep_time=0) + worker.start() return 0 + def controller(): """ Main controller logic for the algorithm @@ -65,6 +68,7 @@ def controller(): return + def main(): # read config check_config() @@ -72,18 +76,26 @@ def main(): # check if MPI should be used if config["use_mpi"]: - comm = MPI.COMM_WORLD - size = comm.Get_size() - rank = comm.Get_rank() - rank = 0 - - # if mpi worker switch to worker mode - if rank != 0: - mpi_worker() + # if we use the mpi4py futures they will handle their + # worker spawning themselfes. We just need to be controller :) + if config["use_mpi4py_futures"]: + controller() - # rank 0 is the controller + # if mpi intercomm calls are used we need to decide if we are + # worker or controller. else: - controller() + # checking the ranks + from mpi4py import MPI + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + # if mpi worker switch to worker mode + if rank != 0: + mpi_worker(config) + + # rank 0 is the controller + else: + controller() # for no mpi the main process is the controller else: controller() @@ -92,4 +104,4 @@ def main(): if __name__ == "__main__": print('hello') - #main() + main() diff --git a/src/config.py b/src/config.py index dee026f..02770b2 100755 --- a/src/config.py +++ b/src/config.py @@ -45,10 +45,22 @@ CONFIG = { # Processes # -------------- # use mpi parallelization - 'use_mpi': False, - - # if mpi=False, how many local parallel processe - 'num_local_processes': 3, + # If use_mpi4py_futures=False start with: + # mpiexec -n 3 python PyGMA.py + 'use_mpi': True, + + # use mpi4py.futures + # this will dynamically spawn workers. + # NOTE: you have to execute the programm in this manner: + # mpiexec -n 3 python -m mpi4py.futures PyGMA.py + 'use_mpi4py_futures': False, + + # if mpi=False, how many local processes to use + # Note if > 1 it will spawn additonal processes that independently + # solves the experiment. Spawning takes time (memcopys etc) + # if the experiment is very simple having only one process + # handling everyting might be faster. + 'num_local_processes': 1, # ------------- diff --git a/src/controller.py b/src/controller.py index c229141..5106a24 100755 --- a/src/controller.py +++ b/src/controller.py @@ -14,6 +14,7 @@ class Controller: """ def __init__(self, config, rng_seed=9): + # config self.config = config # Population objects @@ -89,31 +90,119 @@ class Controller: # instantiation via mpi or local worker # --------------------------------------- if self.config['use_mpi']: - raise NotImplementedError + # use the mpi4py futures + # ---------------------- + if self.config['use_mpi4py_futures']: + # import the MPIPoolExecutor + # importing more then 1x will use chached module + from mpi4py.futures import MPIPoolExecutor + # save worker futures + futures = [] + # starting a worker for each individual + # num workers will be handled via mpi + with MPIPoolExecutor() as executor: + for w_tuple in worker_tuples: + futures.append( + executor.submit( + self._instantiate_individual_local_process, + pop_index=w_tuple[0], + indiv_index=w_tuple[1], + phase_index=w_tuple[2], + gene=w_tuple[3], + ) + ) + + # write back the fitness values into the individuals + for future in futures: + # result will be a tuple (pop_index, indiv_index, fitness) + result = future.result() + self.populations[result[0] + ].individuals[result[1]].fitness = result[2] + + # use mpi calls + # ------------- + else: + # will use the cached module and as such the global intercomm + from mpi4py import MPI + from mpi_tags import mpi_tags + comm = MPI.COMM_WORLD + world_size = comm.Get_size() + + # while we have data distribute it to free workers + # collect finished data as well + # a gene is computed once the result is written back + genes_to_compute = len(worker_tuples) + while genes_to_compute > 0: + # check all workers + for worker_index in range(1, world_size): + + # check if worker can compute new data (Is idle) and we have data to compute + if comm.iprobe(source=worker_index, tag=mpi_tags.IDLE) and worker_tuples: + # eat up that message to clean the pipeline + comm.recv(source=worker_index, tag=mpi_tags.IDLE) + # pop the next data to distribute + data = worker_tuples.pop() + # signalling the worker that it should continue to work + comm.send(True, dest=worker_index, + tag=mpi_tags.CONTINUE_PROCESSING) + # send data to worker + comm.send(data, dest=worker_index, + tag=mpi_tags.DATA) + + # check if we can retrieve a result from the worker + if comm.iprobe(source=worker_index, tag=mpi_tags.DATA_RETURN): + # retrieve the result + w_result = comm.recv( + source=worker_index, tag=mpi_tags.DATA_RETURN) + # write the fitness result into the appropriate individual + # print( + # f'-------------------root got {w_result} from worker {worker_index}') + self.populations[w_result[0] + ].individuals[w_result[1]].fitness = w_result[2] + # note that we have computed a gene + genes_to_compute -= 1 + + # use local processes + # ------------------- else: - # save worker futures - futures = [] - # starting a worker for each individual - with ProcessPoolExecutor(max_workers=self.config['num_local_processes']) as executor: + # check if we just use the main process for evaluation + if self.config['num_local_processes'] < 2: + # evaluate the worker tuples for w_tuple in worker_tuples: - futures.append( - executor.submit( - self._instantiate_individual_local_process, - pop_index=w_tuple[0], - indiv_index=w_tuple[1], - phase_index=w_tuple[2], - gene=w_tuple[3], - ) + result = self._instantiate_individual_local_process( + pop_index=w_tuple[0], + indiv_index=w_tuple[1], + phase_index=w_tuple[2], + gene=w_tuple[3] ) + self.populations[result[0] + ].individuals[result[1]].fitness = result[2] + + # we wanna spawn local processes yuppi <3 + else: + # save worker futures + futures = [] + # starting a worker for each individual + with ProcessPoolExecutor(max_workers=self.config['num_local_processes']) as executor: + for w_tuple in worker_tuples: + futures.append( + executor.submit( + self._instantiate_individual_local_process, + pop_index=w_tuple[0], + indiv_index=w_tuple[1], + phase_index=w_tuple[2], + gene=w_tuple[3] + ) + ) + + # write back the fitness values into the individuals + for future in futures: + # result will be a tuple (pop_index, indiv_index, fitness) + result = future.result() + self.populations[result[0] + ].individuals[result[1]].fitness = result[2] - # write back the fitness values into the individuals - for future in futures: - # result will be a tuple (pop_index, indiv_index, fitness) - result = future.result() - self.populations[result[0] - ].individuals[result[1]].fitness = result[2] - - # sort the popuPopulations based on their fitness values + # sort the Populations based on their fitness values for pop in self.populations: pop.sort_individuals() @@ -121,8 +210,8 @@ class Controller: # FIXME: # is it really the case that when a new worker is instantiated # that it will get a full deep copy of all the objects? - # As such the experiment for an individual is called - # on a single, only by this worker used instance of the + # As such the experiment for an individual is called + # on a single, only by this worker used instance of the # experiment right? # I think so but cannot find the source (despite my brain) that # verifyes that. @@ -142,17 +231,17 @@ class Controller: """ # instantiate the experiment and evaluate the individual experiment = self.config['evolutionary_phases'][phase_index].experiment - + # conduct the experiment - #print(pop_index) - #print('worker controller has',self.epochs,'epochs and is at',self) - #print(self) + # print(pop_index) + # print('worker controller has',self.epochs,'epochs and is at',self) + # print(self) fitness = experiment.conduct(gene) - + # return result tuple return (pop_index, indiv_index, fitness) - - # not used + + # not used def epoch(self): """ One epoch: @@ -191,10 +280,11 @@ class Controller: for pop in self.populations: f.append(pop.get_fitness(mean=mean)) return f - def get_fittest_individuals(n=1)->list: + + def get_fittest_individuals(n=1) -> list: """ Returns the n fittest individuals from all populations. - + Parameters ---------- n: int @@ -205,7 +295,7 @@ class Controller: # recombine them for she fittest # add a switch that one can know from which populations they are # then an additional array is returned with the pop indexes. - + def evolve(self): """ Evolves the genomes through all phases @@ -217,7 +307,7 @@ class Controller: print(f'Evolutionary phase {evolutionary_phase}') # reset epochs self.epochs = 0 - + # call the phase init # which will return a new population array self.populations = evolutionary_phase.initialize( @@ -245,19 +335,20 @@ class Controller: # (mpi and local) will have their instance of the # phase list and with it know the experiments. self.__instantiate_individuals(phase_index=p_index) - + # check if algorithm stagnates # IMP # fusion islands but what is the condition? # whenever a new phase happens the phase can do # this on their own. - + # epoch done # ---------- # information - print(f'epoch {self.epochs}, Pop max fitness {self.get_population_fitness()}') - + print( + f'epoch {self.epochs}, Pop max fitness {self.get_population_fitness()}') + # increment counter self.epochs += 1 @@ -266,4 +357,18 @@ class Controller: # ---------- print(f'Finished phase {evolutionary_phase}') print('--------------------------------') - #print('--------------------------------') + # print('--------------------------------') + + # Evoluting finised + # ---------------- + # if mpi intercomm was used we want to shut down the workers + if self.config['use_mpi'] and not self.config['use_mpi4py_futures']: + # send stop to all wokers + from mpi4py import MPI + from mpi_tags import mpi_tags + comm = MPI.COMM_WORLD + world_size = comm.Get_size() + print('stopping mpi workers') + for w_index in range(1, world_size): + comm.send(False, dest=w_index, + tag=mpi_tags.CONTINUE_PROCESSING) diff --git a/src/mpi_tags.py b/src/mpi_tags.py new file mode 100644 index 0000000..72d55e8 --- /dev/null +++ b/src/mpi_tags.py @@ -0,0 +1,12 @@ +from enum import IntEnum + + +class mpi_tags(IntEnum): + """ + Enums used to specify tags in the mpi process communication. + """ + CONTINUE_PROCESSING = 10 + DATA = 3 + DATA_RETURN = 4 + IDLE = 9 + EXIT = 6 diff --git a/src/mpi_test.py b/src/mpi_test.py index f038c78..c23f4f3 100644 --- a/src/mpi_test.py +++ b/src/mpi_test.py @@ -1,40 +1,235 @@ # size # mpiexec -n 4 python mpi_test.py import numpy as np +import time +from enum import IntEnum from mpi4py import MPI -comm = MPI.COMM_WORLD -size = comm.Get_size() -rank = comm.Get_rank() - -print(rank) -print(size) - -# so the idea is to have a dummy here which can be fusioned into the PyGMA -# this will work as follows: -# having a rank 0 which will send an numpy array of the form -# [i_pop, i_indiv, phase_index, [indiv.genome]] -# this then will be passed to the worker pool (however big the size is) -# Workers: -# will receive that array and compute the experiment (import it here) conduct method on it. -# will communicate back an array to rank 0 of the form: -# [pop_index, indiv_index, fitness] -# the rank 0 will then use this array to write back the results (can be done if all workers are finished) -# otherwise you have to think about a really parralel population phase operation, like distributed population model, but we are going to do distributed islands if needed and then the islands evolve faster or not. - -# todo: -# since send() will block, how one can distribute the data to the workers in a non blocking schema? -# and how to receive the results back if one can not send all data at once. -# one would need to send to 4 workers, wait until one is done and then receive back and send new data' -# all that in a non blocking implementation - - -if rank == 0: - # we are controller +# for pool approach +from mpi4py.futures import MPIPoolExecutor + + +def main_data_one(): + # here the data is splittet into one genome at a time and transmitted to the workers + class mpi_tags(IntEnum): + CONTINUE_PROCESSING = 10 + DATA = 3 + DATA_RETURN = 4 + IDLE = 9 + EXIT = 6 + + comm = MPI.COMM_WORLD + world_size = comm.Get_size() + rank = comm.Get_rank() + + if rank == 0: + # controller + # worker tuples + # (pop_index, indiv_index, phase_index, gene) + num_genomes = 3 + genome = np.array([0, 1, 1, 1, 0, 0], dtype=np.bool_) + worker_tuples = [[i, i, i, genome] for i in range(num_genomes)] + + # while we have data distribute it to free workers + # collect finished data as well + # a gene is computed once the result is written back + genes_to_compute = len(worker_tuples) + while genes_to_compute > 0: + # check all workers + for worker_index in range(1, world_size): + # check if worker can compute new data (Is idle) and we have data to compute + if comm.iprobe(source=worker_index, tag=mpi_tags.IDLE) and worker_tuples: + # eat up that message to clean the pipeline + comm.recv(source=worker_index, tag=mpi_tags.IDLE) + # pop the next data to distribute + data = worker_tuples.pop() + # signalling the worker that it should continue to work + comm.send(True, dest=worker_index, + tag=mpi_tags.CONTINUE_PROCESSING) + # send data to worker + comm.send(data, dest=worker_index, tag=mpi_tags.DATA) + + # check if we can retrieve a result from the worker + if comm.iprobe(source=worker_index, tag=mpi_tags.DATA_RETURN): + # retrieve the result + w_result = comm.recv( + source=worker_index, tag=mpi_tags.DATA_RETURN) + # write the fitness result into the appropriate individual + # TBA + print(f'-------------------root got {w_result} from worker {worker_index}') + # note that we have computed a gene + genes_to_compute -= 1 + + + if rank > 0: + # worker + # loop until stop is signalled + while True: + # print(f'worker {rank} loop') + # signaling that we are waiting for new work + comm.isend(None, dest=0, tag=mpi_tags.IDLE) + # we receive a bool if we shall to continue to process + # to not eat all CPU ressources we sleep a little if no transmission + while not comm.iprobe(source=0, tag=mpi_tags.CONTINUE_PROCESSING): + # sleep + print(f'worker {rank} is sleeping') + time.sleep(1) + continue_processing = comm.recv( + source=0, tag=mpi_tags.CONTINUE_PROCESSING) + # if we shall not continue stop the worker + if not continue_processing: + break + # get the working data + data = comm.recv(source=0, tag=mpi_tags.DATA) + # calculate the data + print(f'worker {rank} got {data}') + # return the result + comm.send([rank, rank, rank], dest=0, tag=mpi_tags.DATA_RETURN) + + +def main_data_lists(): + class mpi_tags(IntEnum): + CONTINUE_PROCESSING = 10 + DATA = 3 + EXIT = 6 + comm = MPI.COMM_WORLD + size = comm.Get_size() + rank = comm.Get_rank() + + # print(rank) + # print(size) + + # so the idea is to have a dummy here which can be fusioned into the PyGMA + # this will work as follows: + # having a rank 0 which will send an numpy array of the form + # [i_pop, i_indiv, phase_index, [indiv.genome]] + # this then will be passed to the worker pool (however big the size is) + # Workers: + # will receive that array and compute the experiment (import it here) conduct method on it. + # will communicate back an array to rank 0 of the form: + # [pop_index, indiv_index, fitness] + # the rank 0 will then use this array to write back the results (can be done if all workers are finished) + # otherwise you have to think about a really parralel population phase operation, like distributed population model, but we are going to do distributed islands if needed and then the islands evolve faster or not. + + # todo: + # since send() will block, how one can distribute the data to the workers in a non blocking schema? + # and how to receive the results back if one can not send all data at once. + # one would need to send to 4 workers, wait until one is done and then receive back and send new data' + # all that in a non blocking implementation + # maybe one can use mpi.scatter() and gather() + # then in rank 0 one gets the world size, splits the array of genes into eqaul halfes, distribute them with scatter() to the workers and then once finished gather() them again. + + # since the number of individuals (more islands, population growth) can change dynamically, if using numpy arrays in mpi + # they need to be dynamically resized. How can this be achieved? + # since with scatter you do not know what you gonna get as worker there is the only way that you first send + # a message to the worker that will adjust the receivbuffer appropiately and then send the real dada. + + # so numpy does not work. Therefore use the normal pickle approach. + # here there are two options. + # 1. Split the data you need to work on into equal parts for every worker. + # Then send the data lists and let the workers work on them, gather the results... + # If for a reason a worker will work to long on his set becaus complicated, the others have to wait. + # 2. Send always single genes. This way the workers become faster idle again and will only have to wait for the longest gene + sendbuf = None + recvbuff = None + if rank == 0: + # we are controller + # make dummy worker tuples + # [i_pop, i_indiv, phase_index, [indiv.genome]] + # num_genomes = 3 + # genome = np.array([0, 1, 1, 1, 0, 0], dtype=np.bool_) + # worker_tuples = np.array([[i, i, i, genome] + # for i in range(num_genomes)], dtype=object) + + # # send them + # # return will be [pop_index, indiv_index, fitness] + # sendbuf = worker_tuples + # recvbuff = np.empty([num_genomes, 3]) + # # comm.Scatter(sendbuf, recvbuff, root=0) + + # sendbuf = np.array([1, 2, 3]) + # recvbuff = np.empty([3]) + for i in range(3): + # send loop + for worker in range(1, comm.Get_size()): + print(f'sending to worker {worker}') + # tell the worker he sould work + comm.send(True, dest=worker, tag=mpi_tags.CONTINUE_PROCESSING) + # send the working data + comm.send([1, 2, 3], dest=worker, tag=3) + + # receive loop + results = [] + for worker in range(1, comm.Get_size()): + r = comm.recv(source=worker, tag=4) + results.append(r) + + print(results, i) + if rank > 0: + # we are worker + # loop until stop is signalled + while True: + print(f'worker {rank} loop') + # receive a bool if we want to continue to process + # to not eat all CPU ressources we sleep a little if no transmission + while not comm.iprobe(source=0, tag=mpi_tags.CONTINUE_PROCESSING): + # sleep + print(f'worker {rank} is sleeping') + time.sleep(1) + + continue_processing = comm.recv( + source=0, tag=mpi_tags.CONTINUE_PROCESSING) + # if we shall not continue stop the worker + if not continue_processing: + break + # get the working data + data = comm.recv(source=0, tag=3) + # calculate the data + print(data) + # return the result + comm.send([rank, rank, rank], dest=0, tag=4) + + +def main_MPIPool(): + # here the idea is to use the mpipool executor which is inside the mpi4py lib + + # make dummy worker tuples + # [i_pop, i_indiv, phase_index, [indiv.genome]] genome = np.array([0, 1, 1, 1, 0, 0], dtype=np.bool_) - worker_tuples = [[i, i, genome] for i in range(3)] - - # send them + worker_tuples = [[i, i, i, genome] for i in range(6)] + + # save worker futures + futures = [] + # starting a worker for each individual + with MPIPoolExecutor() as executor: + for w_tuple in worker_tuples: + futures.append( + executor.submit( + _instantiate_individual_mpi_pool_process, + pop_index=w_tuple[0], + indiv_index=w_tuple[1], + phase_index=w_tuple[2], + gene=w_tuple[3], + ) + ) + + # write back the fitness values into the individuals + for future in futures: + # result will be a tuple (pop_index, indiv_index, fitness) + result = future.result() + print(result) + + +def _instantiate_individual_mpi_pool_process(pop_index, indiv_index, phase_index, gene): + + # do something... + print(f'worker {pop_index}') + + # (pop_index, indiv_index, fitness) + fitness = pop_index + return (pop_index, indiv_index, fitness) -if rank > 0: - # we are worker +if __name__ == '__main__': + # main_MPIPool() + # main_data_lists() + main_data_one() diff --git a/src/mpi_worker.py b/src/mpi_worker.py new file mode 100644 index 0000000..cf7034f --- /dev/null +++ b/src/mpi_worker.py @@ -0,0 +1,53 @@ +import time +from mpi4py import MPI +from mpi_tags import mpi_tags + + +class MPI_worker(): + def __init__(self, config, sleep_time=0.01): + self.config = config + self.comm = MPI.COMM_WORLD + self.rank = self.comm.Get_rank() + self.sleep_time = sleep_time + + def start(self): + # worker + # loop until stop is signalled + while True: + # print(f'worker {rank} loop') + # signaling that we are waiting for new work + self.comm.isend(None, dest=0, tag=mpi_tags.IDLE) + + # we receive a bool if we shall to continue to process + # to not eat all CPU ressources we sleep a little if no transmission + while not self.comm.iprobe(source=0, tag=mpi_tags.CONTINUE_PROCESSING): + # sleep + # print(f'worker {self.rank} is sleeping') + time.sleep(self.sleep_time) + continue_processing = self.comm.recv( + source=0, tag=mpi_tags.CONTINUE_PROCESSING) + + # if we shall not continue stop the worker + if not continue_processing: + break + + # get the working data + # format will be (pop_index, indiv_index, phase_index, gene) + data = self.comm.recv(source=0, tag=mpi_tags.DATA) + + # extract data + pop_index = data[0] + indiv_index = data[1] + phase_index = data[2] + gene = data[3] + + # instantiate the experiment and evaluate the individual + experiment = self.config['evolutionary_phases'][phase_index].experiment + + # conduct the experiment + fitness = experiment.conduct(gene) + + # print(f'worker {self.rank} got {data}') + # return the result tuple + self.comm.send((pop_index, indiv_index, fitness), + dest=0, tag=mpi_tags.DATA_RETURN) -- GitLab