diff --git a/CHANGELOG.md b/CHANGELOG.md index 88332da8..547ee067 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## latest +- Refactored interaction with preCICE and domain decomposition into `CouplingHandler`, `Profiler` and `DomainDecomposition` hierarchy [#289](https://github.com/precice/micro-manager/pull/289) - Refactored and generalized the methods in `p2p.py`. Added `MPIHandler` to manage all MPI related aspects. [#287](https://github.com/precice/micro-manager/pull/287) - Applied fixes for previously missed test cases due to #290 [#291](https://github.com/precice/micro-manager/pull/291) - Fixed non-visible CI test cases on PR interface [#290](https://github.com/precice/micro-manager/pull/290) diff --git a/micro_manager/adaptivity/adaptivity_selection.py b/micro_manager/adaptivity/adaptivity_selection.py index b438421e..3c9669df 100644 --- a/micro_manager/adaptivity/adaptivity_selection.py +++ b/micro_manager/adaptivity/adaptivity_selection.py @@ -6,7 +6,7 @@ def create_adaptivity_calculator( config, sim_container, - participant, + profiler, logger, mpi, micro_problem_cls, @@ -28,7 +28,7 @@ def create_adaptivity_calculator( return GlobalAdaptivityCalculator( config, sim_container, - participant, + profiler, logger, mpi, micro_problem_cls, diff --git a/micro_manager/adaptivity/global_adaptivity.py b/micro_manager/adaptivity/global_adaptivity.py index da11b005..07567399 100644 --- a/micro_manager/adaptivity/global_adaptivity.py +++ b/micro_manager/adaptivity/global_adaptivity.py @@ -13,6 +13,7 @@ from micro_manager.config import Config from micro_manager.tools.logging_wrapper import Logger from micro_manager.tools.mpi_handler import MPIHandler, MPI +from micro_manager.tools.profiling import Profiler from micro_manager.micro_simulation import MicroSimulationClass from micro_manager.model_manager import ModelManager from micro_manager.interpolation import RBF_PU @@ -24,7 +25,7 @@ def __init__( self, config: Config, sim_container: SimulationContainer, - participant, + profiler: Profiler, base_logger: Logger, mpi: MPIHandler, micro_problem_cls: MicroSimulationClass, @@ -39,8 +40,8 @@ def __init__( Object which has getter functions to get parameters defined in the configuration file. sim_container : SimulationContainer Simulation container object. - participant : object of class Participant - Object of the class Participant using which the preCICE API is called. + profiler : Profiler + Profiling object. base_logger : object of class Logger Logger object to log messages. mpi : MPIHandler @@ -61,7 +62,7 @@ def __init__( ) self._interpolation = RBF_PU(base_logger, mpi) - self._precice_participant = participant + self._profiler = profiler buffer_size = self._sim_container.global_num_sims**2 array_buffer, mpi_node = self._mpi.create_node_buffer(MPI.FLOAT, buffer_size) @@ -225,24 +226,19 @@ def get_full_field_micro_output( micro_output : list List of dicts having individual output of each simulation. Active and inactive simulation outputs are entered. """ - self._precice_participant.start_profiling_section( + with self._profiler.measure( "micro_manager.global_adaptivity.get_full_field_micro_output" - ) - - micro_sims_output = deepcopy(micro_output) - num_active = np.sum(self._is_sim_active) - if num_active == self._is_sim_active.shape[0]: - self._precice_participant.stop_last_profiling_section() - return micro_sims_output - - self._communicate_micro_output(micro_sims_output) - if num_active <= self._interp_min: - self._precice_participant.stop_last_profiling_section() - return micro_sims_output + ): + micro_sims_output = deepcopy(micro_output) + num_active = np.sum(self._is_sim_active) + if num_active == self._is_sim_active.shape[0]: + return micro_sims_output - self._interpolate_output(micro_input, micro_sims_output) + self._communicate_micro_output(micro_sims_output) + if num_active <= self._interp_min: + return micro_sims_output - self._precice_participant.stop_last_profiling_section() + self._interpolate_output(micro_input, micro_sims_output) return micro_sims_output @@ -469,7 +465,7 @@ def _update_inactive_sims(self) -> None: else: to_be_activated_map[assoc_active_gid] = [to_be_activated_lid] - self._precice_participant.start_profiling_section( + self._profiler.begin( "micro_manager.global_adaptivity.update_inactive_sims.communication" ) @@ -512,7 +508,7 @@ def _update_inactive_sims(self) -> None: self._sim_container[lid].destroy() self._sim_container[lid] = None - self._precice_participant.stop_last_profiling_section() + self._profiler.end() self._sim_is_associated_to = np.copy(_sim_is_associated_to_updated) diff --git a/micro_manager/config.py b/micro_manager/config.py index 270e35f8..27c4fdde 100644 --- a/micro_manager/config.py +++ b/micro_manager/config.py @@ -1049,7 +1049,7 @@ def write_data_names(self) -> List[str]: pass @config_entry - def macro_domain_bounds(self) -> List[List[float]]: + def macro_domain_bounds(self) -> List[float]: """ Get the upper and lower bounds of the macro domain. diff --git a/micro_manager/domain_decomposition.py b/micro_manager/domain_decomposition.py index 9be5c7ea..df2607ff 100644 --- a/micro_manager/domain_decomposition.py +++ b/micro_manager/domain_decomposition.py @@ -1,97 +1,309 @@ """ Class DomainDecomposer provides the method decompose_macro_domain which returns partitioned bounds """ - +from typing import List, Tuple, Optional +from abc import ABC, abstractmethod import numpy as np from scipy.optimize import brentq -from micro_manager.config import Config -from typing import Callable +from micro_manager.config import Config from micro_manager.tools.mpi_handler import MPIHandler, MPI +from micro_manager.tools.logging_wrapper import Logger -class DomainDecomposer: - def __init__(self, configurator: Config, mpi: MPIHandler) -> None: +class DomainDecomposer(ABC): + def __init__(self, config: Config, mpi: MPIHandler, log: Logger) -> None: + self._mpi: MPIHandler = mpi + self._log: Logger = log + # Check if ranks per axis is provided in the configuration file for parallel runs + self._ranks_per_axis: List[int] = config.ranks_per_axis() + self._dims: int = len(self._ranks_per_axis) + self._macro_bounds: List[float] = config.macro_domain_bounds() + + # initial checks + if self._dims not in [2, 3]: + raise ValueError("Domain decomposition only supports 2D and 3D cases.") + if self._dims * 2 != len(self._macro_bounds): + raise ValueError("Provided macro mesh bounds are of incorrect dimension") + if self._dims != len(self._ranks_per_axis): + raise ValueError( + "Provided ranks combination is of incorrect dimension " + "and does not match the dimensions of the macro mesh." + ) + + @abstractmethod + def get_mesh_bounds(self) -> List[float]: """ - Class constructor. + Get the local mesh bounds for this rank based on the domain decomposition type + specified in the configuration file. + + Returns + ------- + mesh_bounds : List[float] + List containing the upper and lower bounds of the domain pertaining to this rank. + Format is same as input parameter macro_bounds. + """ + pass + + @abstractmethod + def partition( + self, + vertex_coords: List[np.ndarray], + vertex_ids: List[int], + access_region: List[float], + ) -> Tuple[List[np.ndarray], List[int]]: + """ + Decompose the micro simulations among all ranks based on their positions in the macro domain. Parameters ---------- - configurator : object of class Config - Object which has getter functions to get parameters defined in the configuration file. - mpi : MPIHandler - MPIHandler object. + vertex_coords : List[np.ndarray] + Array containing the coordinates of the macro points corresponding to the micro simulations. + vertex_ids : List[int] + Array containing IDs associated with the coordinates. + access_region : List[float] + Either the mesh_bounds as returned by get_mesh_bounds or the macro_domain. + + Returns + ------- + local_vertex_coords : List[np.ndarray] + List of coordinates of the macro points pertaining to this rank. + local_vertex_ids : List[int] + List of corresponding vertex IDs """ - self._mpi = mpi + pass - self._ranks_per_axis = ( - configurator.ranks_per_axis() - ) # Check if ranks per axis is provided in the configuration file for parallel runs + def filter_duplicates( + self, + global_vertex_coords: List[List[np.ndarray]], + global_vertex_ids: List[List[int]], + ) -> Tuple[List[np.ndarray], List[int]]: + """ + Filter out vertex coordinates that are already owned by a lower-ranked rank. - self._dims = len(self._ranks_per_axis) + When a macro-point lies exactly on the boundary between two rank bounding + boxes, preCICE returns it to both ranks. This function ensures every vertex + is processed by exactly one rank — the lowest-ranked rank that received it — + while preserving the preCICE ID-coord pairing. - self._is_minimum_access_region_size_specified = False + Parameters + ---------- + global_vertex_coords : List[List[np.ndarray]] + Lists of rank local vertex coordinates, one per rank. + global_vertex_ids : List[List[int]] + Lists of rank local vertex IDs (preCICE), one per rank. - self._decomposition_type = configurator.decomposition_type() + Returns + ------- + filtered_coords : List[np.ndarray] + Local vertex coordinates with duplicates removed. + filtered_ids : List[np.ndarray] + Local preCICE vertex IDs corresponding to the filtered coordinates. + """ + mesh_vertex_coords = np.array(global_vertex_coords[self._mpi.rank]) + mesh_vertex_ids = np.array(global_vertex_ids[self._mpi.rank]) - self._macro_bounds = configurator.macro_domain_bounds() + seen_coords = set() + keep_mask = np.ones(len(mesh_vertex_coords), dtype=bool) - self._get_local_mesh_bounds = self._get_local_mesh_bounds_variant() + for rank in range(self._mpi.size): + for i, coord in enumerate(global_vertex_coords[rank]): + coord_key = tuple(np.round(coord, decimals=10)) + if rank < self._mpi.rank: + # Mark coords already claimed by earlier ranks + seen_coords.add(coord_key) + elif rank == self._mpi.rank: + # Only keep coords not already claimed by earlier ranks + if coord_key in seen_coords: + keep_mask[i] = False + else: + seen_coords.add(coord_key) - self._minimum_access_region_size: list = ( - configurator.minimum_access_region_size() - ) - if self._minimum_access_region_size: # if list is not empty - self._is_minimum_access_region_size_specified = True + return mesh_vertex_coords[keep_mask], mesh_vertex_ids[keep_mask] - def get_local_mesh_bounds(self) -> list: + def finalize( + self, local_vertex_coords: List[np.ndarray] + ) -> Tuple[int, int, List[int]]: """ - Get the local mesh bounds for this rank based on the domain decomposition type specified in the configuration file. + Prints decomposition statistics of all ranks and computes local and global counts. + + Parameters + ---------- + local_vertex_coords : List[np.ndarray] + Vertex coords after partitioning. Returns ------- - mesh_bounds : list - List containing the upper and lower bounds of the domain pertaining to this rank. - Format is same as input parameter macro_bounds. + local_num_sims : int + Local number of simulations + global_num_sims : int + Global number of simulations + sims_per_rank : List[int] """ - return self._get_local_mesh_bounds() + local_num_sims = len(local_vertex_coords) + nms_all_ranks = np.zeros(self._mpi.size, dtype=np.int64) + # Gather number of micro simulations that each rank has, because this rank needs to know how many micro + # simulations have been created by previous ranks, so that it can set + # the correct global IDs + self._mpi.comm.Allgatherv(np.array(local_num_sims), nms_all_ranks) + + max_nms = np.max(nms_all_ranks) + min_nms = np.min(nms_all_ranks) + + if max_nms != min_nms: + # if the number of maximum and minimum micro simulations per rank are different + self._log.log_info_rank_zero( + "The following ranks have the maximum number of micro simulations " + f"({max_nms}): {np.where(nms_all_ranks == max_nms)[0]}" + ) + self._log.log_info_rank_zero( + "The following ranks have the minimum number of micro simulations " + f"({min_nms}): {np.where(nms_all_ranks == min_nms)[0]}" + ) + else: + # if the number of maximum and minimum micro simulations per rank are the same + self._log.log_info_rank_zero( + f"All ranks have the same number of micro simulations: {max_nms}" + ) - def _get_uniform_local_mesh_bounds(self) -> list: - """ - Decompose the macro domain equally among all ranks, if the Micro Manager is run in parallel. + # Get global number of micro simulations + global_num_sims = np.sum(nms_all_ranks) + self._log.log_info_rank_zero( + f"Total number of micro simulations: {global_num_sims}" + ) - Returns - ------- - mesh_bounds : list - List containing the upper and lower bounds of the domain pertaining to this rank. - Format is same as input parameter macro_bounds. - """ + return local_num_sims, global_num_sims, nms_all_ranks + + +class NoOpDecomp(DomainDecomposer): + """ + Performs no decomposition. Assigns full domain to local rank. + """ + + def __init__(self, config: Config, mpi: MPIHandler, log: Logger) -> None: + super().__init__(config, mpi, log) + self._bounds: List[float] = config.macro_domain_bounds() + + def get_mesh_bounds(self) -> List[float]: + return self._bounds + + def partition( + self, + vertex_coords: List[np.ndarray], + vertex_ids: List[int], + access_region: List[float], + ) -> Tuple[List[np.ndarray], List[int]]: + if len(vertex_ids) == 0: + raise RuntimeError( + "The macro mesh has no vertices in the specified access region." + ) + + return vertex_coords, vertex_ids + + +class GridDecomp(DomainDecomposer, ABC): + def __init__(self, config: Config, mpi: MPIHandler, log: Logger) -> None: + super().__init__(config, mpi, log) + + def partition( + self, + vertex_coords: List[np.ndarray], + vertex_ids: List[int], + access_region: List[float], + ) -> Tuple[List[np.ndarray], List[int]]: + mesh_bounds: List[float] = self.get_mesh_bounds() + + # Apply filtering if the access region is equal to the local mesh bounds + # Filtering is required as in this case duplicates can arise due to numerical issues. + if np.all(np.array(mesh_bounds) == np.array(access_region)): + # Gather all vertex coords and IDs from all ranks onto all ranks, + # filter out coords already claimed by lower-ranked ranks. + # When load balancing, all ranks receive all coords. No duplicates can arise. + # TODO: Avoid the allgather by smartly selecting the relevant coordinates + global_vertex_coords: List[List[np.ndarray]] = self._mpi.comm.allgather( + vertex_coords + ) + global_vertex_ids: List[List[int]] = self._mpi.comm.allgather(vertex_ids) + vertex_coords, vertex_ids = self.filter_duplicates( + global_vertex_coords, global_vertex_ids + ) + + if len(vertex_ids) == 0: + self._log.log_warning( + "The access region of this rank has no macro-scale vertices. " + "This rank will not have any micro simulations. " + "To avoid this, change the domain decomposition" + ) + + # perform actual partitioning + local_coords = [] + local_ids = [] + + for idx, coord in enumerate(vertex_coords): + inside = True + for d in range(self._dims): + if not ( + coord[d] >= mesh_bounds[d * 2] + and coord[d] <= mesh_bounds[d * 2 + 1] + ): + inside = False + break + if inside: + local_coords.append(coord) + local_ids.append(vertex_ids[idx]) + + return local_coords, local_ids + + def _calc_rank_in_axis(self) -> List[int]: if np.prod(self._ranks_per_axis) != self._mpi.size: raise ValueError( - "Total number of processors provided in the Micro Manager configuration and in the MPI execution command do not match." + "Total number of processors provided in the Micro Manager " + "configuration and in the MPI execution command do not match." ) - if self._dims == 3: - for z in range(self._ranks_per_axis[2]): - for y in range(self._ranks_per_axis[1]): - for x in range(self._ranks_per_axis[0]): - n = ( - x - + y * self._ranks_per_axis[0] - + z * self._ranks_per_axis[0] * self._ranks_per_axis[1] - ) - if n == self._mpi.rank: - rank_in_axis = [x, y, z] - elif self._dims == 2: + rank_in_axis: Optional[List[int]] = None + # force ranks_per_axis to be 3D for 2D case with value 1 + if self._dims == 2: + self._ranks_per_axis.append(1) + + for z in range(self._ranks_per_axis[2]): for y in range(self._ranks_per_axis[1]): for x in range(self._ranks_per_axis[0]): - n = x + y * self._ranks_per_axis[0] + n = ( + x + + y * self._ranks_per_axis[0] + + z * self._ranks_per_axis[0] * self._ranks_per_axis[1] + ) if n == self._mpi.rank: - rank_in_axis = [x, y] - else: - raise ValueError("Domain decomposition only supports 2D and 3D cases.") + rank_in_axis = [x, y, z] - mesh_bounds = [] + if rank_in_axis is None: + raise ValueError("Provided invalid values for ranks per axis.") + # extract data for 2D case and restore ranks_per_axis + if self._dims == 2: + rank_in_axis = rank_in_axis[:-1] + self._ranks_per_axis.pop() + + return rank_in_axis + + +class UniformGridDecomp(GridDecomp): + def __init__(self, config: Config, mpi: MPIHandler, log: Logger) -> None: + super().__init__(config, mpi, log) + + def get_mesh_bounds(self) -> List[float]: + """ + Decompose the macro domain equally among all ranks, if the Micro Manager is run in parallel. + + Returns + ------- + mesh_bounds : List[float] + List containing the upper and lower bounds of the domain pertaining to this rank. + Format is same as input parameter macro_bounds. + """ + rank_in_axis: List[int] = self._calc_rank_in_axis() + + mesh_bounds: List[int] = [] for d in range(self._dims): dx = ( abs(self._macro_bounds[d * 2 + 1] - self._macro_bounds[d * 2]) @@ -113,7 +325,17 @@ def _get_uniform_local_mesh_bounds(self) -> list: return mesh_bounds - def _get_nonuniform_local_mesh_bounds(self) -> list: + +class NonUniformGridDecomp(GridDecomp): + def __init__(self, config: Config, mpi: MPIHandler, log: Logger) -> None: + super().__init__(config, mpi, log) + + self._minimum_access_region_size: List[ + int + ] = config.minimum_access_region_size() + self._has_minimum_access_region_size = len(self._minimum_access_region_size) > 0 + + def get_mesh_bounds(self) -> List[float]: """ Decompose the macro domain among all ranks with an non-uniform distribution, if the Micro Manager is run in parallel. The non-uniform distribution is based on a geometric progression, where the size of the local mesh bounds increases @@ -121,34 +343,11 @@ def _get_nonuniform_local_mesh_bounds(self) -> list: Returns ------- - mesh_bounds : list + mesh_bounds : List[float List containing the upper and lower bounds of the domain pertaining to this rank. Format is same as input parameter macro_bounds. """ - if np.prod(self._ranks_per_axis) != self._mpi.size: - raise ValueError( - "Total number of processors provided in the Micro Manager configuration and in the MPI execution command do not match." - ) - - if self._dims == 3: - for z in range(self._ranks_per_axis[2]): - for y in range(self._ranks_per_axis[1]): - for x in range(self._ranks_per_axis[0]): - n = ( - x - + y * self._ranks_per_axis[0] - + z * self._ranks_per_axis[0] * self._ranks_per_axis[1] - ) - if n == self._mpi.rank: - rank_in_axis = [x, y, z] - elif self._dims == 2: - for y in range(self._ranks_per_axis[1]): - for x in range(self._ranks_per_axis[0]): - n = x + y * self._ranks_per_axis[0] - if n == self._mpi.rank: - rank_in_axis = [x, y] - else: - raise ValueError("Domain decomposition only supports 2D and 3D cases.") + rank_in_axis: List[int] = self._calc_rank_in_axis() mesh_bounds = [] multiplier = 2 # factor by which the local mesh bounds increase in each rank. 2 means geometric progression. @@ -163,7 +362,7 @@ def _get_nonuniform_local_mesh_bounds(self) -> list: / (multiplier ** self._ranks_per_axis[d] - 1) ) - if self._is_minimum_access_region_size_specified: + if self._has_minimum_access_region_size: if dx0 < self._minimum_access_region_size[d]: dx0 = self._minimum_access_region_size[d] n_ranks = self._ranks_per_axis[d] @@ -206,105 +405,35 @@ def _geom_sum_residual(r): return mesh_bounds - def _get_local_mesh_bounds_variant(self) -> Callable: - """ - Get uniform or nonuniform variant of calculating local mesh bounds - - Returns - ------- - get_local_mesh_bounds_variant : function - Function to calculate local mesh bounds based on the decomposition type specified in the configuration file. - """ - if self._decomposition_type == "uniform": - return self._get_uniform_local_mesh_bounds - elif self._decomposition_type == "nonuniform": - return self._get_nonuniform_local_mesh_bounds - else: - raise ValueError( - "Decomposition type can be either 'uniform' or 'nonuniform'." - ) - - def get_local_sims_and_macro_coords( - self, macro_coords: np.ndarray - ) -> tuple[int, list[np.ndarray]]: - """ - Decompose the micro simulations among all ranks based on their positions in the macro domain. - Parameters - ---------- - macro_coords : numpy.ndarray - Array containing the coordinates of the macro points corresponding to the micro simulations. - - Returns - ------- - micro_sims_on_rank : int - Number of micro simulations pertaining to this rank. - macro_coords_on_this_rank : list of numpy.ndarray - List of coordinates of the macro points pertaining to this rank. - """ - local_mesh_bounds = self.get_local_mesh_bounds() - - macro_coords_on_this_rank = [] - - micro_sims_on_rank = 0 - for position in macro_coords: - inside = True - for d in range(self._dims): - if not ( - position[d] >= local_mesh_bounds[d * 2] - and position[d] <= local_mesh_bounds[d * 2 + 1] - ): - inside = False - break - if inside: - macro_coords_on_this_rank.append(position) - micro_sims_on_rank += 1 - - return micro_sims_on_rank, macro_coords_on_this_rank - - def filter_duplicate_coords( - self, - all_coords: list, - all_ids: list, - ) -> tuple[np.ndarray, np.ndarray]: - """ - Filter out vertex coordinates that are already owned by a lower-ranked rank. - - When a macro-point lies exactly on the boundary between two rank bounding - boxes, preCICE returns it to both ranks. This function ensures every vertex - is processed by exactly one rank — the lowest-ranked rank that received it — - while preserving the preCICE ID-coord pairing. - Parameters - ---------- - all_coords : list - List of numpy arrays, one per rank, containing vertex coordinates. - all_ids : list - List of arrays, one per rank, containing preCICE vertex IDs. - - Returns - ------- - filtered_coords : numpy.ndarray - Vertex coordinates with duplicates removed. - filtered_ids : numpy.ndarray - preCICE vertex IDs corresponding to the filtered coordinates. - """ - mesh_vertex_coords = np.array(all_coords[self._mpi.rank]) - mesh_vertex_ids = np.array(all_ids[self._mpi.rank]) - - seen_coords = set() - keep_mask = np.ones(len(mesh_vertex_coords), dtype=bool) - - for rank in range(self._mpi.size): - for i, coord in enumerate(all_coords[rank]): - coord_key = tuple(np.round(coord, decimals=10)) - if rank < self._mpi.rank: - # Mark coords already claimed by earlier ranks - seen_coords.add(coord_key) - elif rank == self._mpi.rank: - # Only keep coords not already claimed by earlier ranks - if coord_key in seen_coords: - keep_mask[i] = False - else: - seen_coords.add(coord_key) - - return mesh_vertex_coords[keep_mask], mesh_vertex_ids[keep_mask] +def create_domain_decomposer( + config: Config, mpi: MPIHandler, log: Logger +) -> DomainDecomposer: + """ + Creates a decomposition object according to the current configuration. + + Parameters + ---------- + config : Config + configuration object + mpi : MPIHandler + MPIHandler object + log : Logger + Logger object + + Returns + ------- + domain_decomposer : DomainDecomposer + decomposition object + """ + if not mpi.is_parallel(): + return NoOpDecomp(config, mpi, log) + + decomp_type = config.decomposition_type() + match decomp_type: + case "uniform": + return UniformGridDecomp(config, mpi, log) + case "nonuniform": + return NonUniformGridDecomp(config, mpi, log) + + raise ValueError(f"Unknown decomposition type: {decomp_type}") diff --git a/micro_manager/load_balancing.py b/micro_manager/load_balancing.py index fa5fc841..7309d2db 100644 --- a/micro_manager/load_balancing.py +++ b/micro_manager/load_balancing.py @@ -11,6 +11,7 @@ from micro_manager.model_manager import ModelManager from micro_manager.tools.logging_wrapper import Logger from micro_manager.tools.mpi_handler import MPIHandler +from micro_manager.tools.profiling import Profiler from micro_manager.simulation_container import SimulationContainer @@ -31,7 +32,7 @@ class LoadBalancerSimData: class LoadBalancer: def __init__( self, - precice_participant: Participant, + profiler: Profiler, model_manager: ModelManager, adaptivity_controller: Optional[AdaptivityCalculator], sim_container: SimulationContainer, @@ -45,8 +46,8 @@ def __init__( Parameters ---------- - precice_participant: Participant - preCICE participant object from coupling + profiler: Profiler + Profiling object model_manager: ModelManager model manager object to construct instances adaptivity_controller: Optional[AdaptivityCalculator] @@ -59,7 +60,7 @@ def __init__( MPIHandler object. """ self._enabled = config.enable_load_balancing() - self._precice_participant = precice_participant + self._profiler = profiler self._model_manager = model_manager self._adaptivity_controller = adaptivity_controller self._sim_container = sim_container @@ -90,11 +91,9 @@ def balance(self): if not self._enabled: return - # self._precice_participant.start_profiling_section("micro_manager.solve.load_balancing.redistribute") if np.allclose(self._balance_metric_global, 0): self._balance_metric_global = self._balance_metric_global + 1 self._redistribute() - # self._precice_participant.stop_last_profiling_section() def pre_sim_solve(self, gid: int): """ @@ -216,27 +215,19 @@ def _redistribute(self) -> None: Main implementation of load balancing. First computes the new partitioning. Then send/receives micro simulations accordingly. """ - self._precice_participant.start_profiling_section( - "micro_manager.solve.load_balancing.init" - ) - current_partitioning = self._sim_container.get_ranks_of_sims() - # self._precice_participant.start_profiling_section("micro_manager.solve.load_balancing.init.partition") - target_partitioning, work_loads = self._partition_impl( - self._mpi.comm.size, current_partitioning - ) - # self._precice_participant.stop_last_profiling_section() - active_gids = self._get_global_active_gids() - inactive_gids = self._get_global_inactive_gids(active_gids) - send_map = self._get_communication_map( - current_partitioning, target_partitioning, active_gids, inactive_gids - ) - self._precice_participant.stop_last_profiling_section() + with self._profiler.measure("micro_manager.solve.load_balancing.init"): + current_partitioning = self._sim_container.get_ranks_of_sims() + target_partitioning, work_loads = self._partition_impl( + self._mpi.comm.size, current_partitioning + ) + active_gids = self._get_global_active_gids() + inactive_gids = self._get_global_inactive_gids(active_gids) + send_map = self._get_communication_map( + current_partitioning, target_partitioning, active_gids, inactive_gids + ) - self._precice_participant.start_profiling_section( - "micro_manager.solve.load_balancing.comm" - ) - self._exchange_sims(send_map) - self._precice_participant.stop_last_profiling_section() + with self._profiler.measure("micro_manager.solve.load_balancing.comm"): + self._exchange_sims(send_map) sims_per_rank = self._mpi.comm.gather(len(self._sim_container), 0) self._log.log_info_rank_zero( @@ -408,7 +399,7 @@ class ActiveBalancer(LoadBalancer): def __init__( self, - precice_participant: Participant, + profiler: Profiler, model_manager: ModelManager, adaptivity_controller: Optional[AdaptivityCalculator], sim_container: SimulationContainer, @@ -417,7 +408,7 @@ def __init__( mpi: MPIHandler, ): super().__init__( - precice_participant, + profiler, model_manager, adaptivity_controller, sim_container, @@ -701,7 +692,7 @@ def _get_communication_map( def create_load_balancer( - precice_participant: Participant, + profiler: Profiler, model_manager: ModelManager, adaptivity_controller: Optional[AdaptivityCalculator], sim_container: SimulationContainer, @@ -719,7 +710,7 @@ def create_load_balancer( raise RuntimeError(f"Unknown load balancing type: {lb_type}") return lb_cls( - precice_participant, + profiler, model_manager, adaptivity_controller, sim_container, diff --git a/micro_manager/micro_manager.py b/micro_manager/micro_manager.py index a144053b..2a63422f 100644 --- a/micro_manager/micro_manager.py +++ b/micro_manager/micro_manager.py @@ -21,8 +21,6 @@ import subprocess from functools import partial -import precice - from .simulation_container import SimulationContainer from .model_manager import ModelManager from .micro_manager_base import MicroManager @@ -30,12 +28,13 @@ from .adaptivity.model_adaptivity import ModelAdaptivity from .adaptivity.adaptivity_selection import create_adaptivity_calculator -from .domain_decomposition import DomainDecomposer +from .domain_decomposition import DomainDecomposer, create_domain_decomposer from .tasking.connection import spawn_local_workers -from .micro_simulation import create_simulation_class, load_backend_class from .tools.logging_wrapper import Logger from .load_balancing import create_load_balancer from .tools.mpi_handler import MPIHandler, MPI +from .tools.coupling import CouplingHandler +from .tools.profiling import Profiler try: from .interpolation import Interpolation @@ -76,15 +75,14 @@ def __init__(self, config_file: str, log_file: str = "") -> None: else: self._output_dir = os.path.abspath(os.getcwd()) + "/" - # Data names of data to output to the snapshot database - self._write_data_names = self._config.write_data_names() - - # Data names of data to read as input parameter to the simulations - self._read_data_names = self._config.read_data_names() - - self._micro_dt = self._config.micro_dt() + self._sim_container = SimulationContainer(self._mpi) - self._macro_mesh_name = self._config.macro_mesh_name() + self._coupling: CouplingHandler = CouplingHandler( + self._config, + self._mpi, + self._sim_container, + ) + self._profiler: Profiler = Profiler(self._coupling.participant) # Parameter for interpolation in case of a simulation crash self._interpolate_crashed_sims = self._config.enable_crashed_sim_interpolation() @@ -116,9 +114,9 @@ def __init__(self, config_file: str, log_file: str = "") -> None: # Names of micro data to be used for adaptivity computation self._adaptivity_micro_data_names: list = [] for name in self._adaptivity_data_names: - if name in self._read_data_names: + if name in self._coupling.read_data_names: self._adaptivity_macro_data_names.append(name) - if name in self._write_data_names: + if name in self._coupling.write_data_names: self._adaptivity_micro_data_names.append(name) self._adaptivity_in_every_implicit_step = ( @@ -133,14 +131,6 @@ def __init__(self, config_file: str, log_file: str = "") -> None: self._is_model_adaptivity_on = self._config.enable_model_adaptivity() - # Define the preCICE Participant - self._participant = precice.Participant( - "Micro-Manager", - self._config.precice_config_file_name(), - self._mpi.rank, - self._mpi.size, - ) - self._t = 0 # global time self._n = 0 # sim-step @@ -152,8 +142,6 @@ def __init__(self, config_file: str, log_file: str = "") -> None: self._load_balancing_n = self._config.load_balancing_n() self.load_balancing = None - self._sim_container = SimulationContainer(self._mpi) - # ************** # Public methods # ************** @@ -175,8 +163,6 @@ def solve(self) -> None: # call _solve_micro_simulations or _solve_micro_simulations_with_adaptivity internally # should use ModelAdaptivity methods to coordinate - dt = min(self._participant.get_max_time_step_size(), self._micro_dt) - first_iteration = True lb_counter = -1 @@ -184,33 +170,28 @@ def solve(self) -> None: # Log initial adaptivity metrics self._adaptivity_controller.log_metrics(self._n) - while self._participant.is_coupling_ongoing(): - - dt = min(self._participant.get_max_time_step_size(), self._micro_dt) + while self._coupling.is_ongoing(): + dt = self._coupling.dt if self._is_adaptivity_on: if (self._adaptivity_in_every_implicit_step or first_iteration) and ( self._n % self._adaptivity_n == 0 ): - self._participant.start_profiling_section( + with self._profiler.measure( "micro_manager.solve.adaptivity_computation" - ) - - self._adaptivity_controller.compute_adaptivity( - dt, - self._data_for_adaptivity, - ) - active_sim_gids = ( - self._adaptivity_controller.get_active_sim_global_ids() - ) - for gid in active_sim_gids: - self._micro_sims_active_steps[gid] += 1 - - # Write a checkpoint if a simulation is just activated. - # This checkpoint will be asynchronous to the checkpoints written at the start of the time window. - self._sim_container.write_checkpoints(only_none=True) + ): + self._adaptivity_controller.compute_adaptivity( + dt, + self._data_for_adaptivity, + ) + for ( + gid + ) in self._adaptivity_controller.get_active_sim_global_ids(): + self._micro_sims_active_steps[gid] += 1 - self._participant.stop_last_profiling_section() + # Write a checkpoint if a simulation is just activated. + # This checkpoint will be asynchronous to the checkpoints written at the start of the time window. + self._sim_container.write_checkpoints(only_none=True) # handle load balancing, in first iteration all sims are assumed to have same cost performed_lb = False @@ -231,20 +212,22 @@ def solve(self) -> None: performed_lb = True lb_counter += 1 - # Write a checkpoint - if self._participant.requires_writing_checkpoint() or performed_lb: + if self._coupling.requires_writing_checkpoint() or performed_lb: self._sim_container.write_checkpoints() - micro_sims_input = self._read_data_from_precice(dt) - - self._participant.start_profiling_section( - "micro_manager.solve.solve_micro_simulations" + read_buffer = {} if self._is_adaptivity_on else None + micro_sims_input = self._coupling.read_data_from_precice( + read_buffer=read_buffer ) + # TODO: will be removed in adaptivity refactoring + if self._is_adaptivity_on and read_buffer is not None: + for name in read_buffer.keys(): + if name in self._adaptivity_macro_data_names: + self._data_for_adaptivity[name] = read_buffer[name] - micro_sims_output = micro_sim_solve(micro_sims_input, dt) - self.load_balancing.update() - - self._participant.stop_last_profiling_section() + with self._profiler.measure("micro_manager.solve.solve_micro_simulations"): + micro_sims_output = micro_sim_solve(micro_sims_input, dt) + self.load_balancing.update() if self._is_load_balancing: for i in range(self._sim_container.local_num_sims): @@ -274,17 +257,16 @@ def solve(self) -> None: ) sys.exit() - self._write_data_to_precice(micro_sims_output) - - self._participant.advance(dt) + self._coupling.write_to_precice(micro_sims_output) + self._coupling.advance(dt) # Revert micro simulations to their last checkpoints if required - if self._participant.requires_reading_checkpoint(): + if self._coupling.requires_reading_checkpoint(): self._sim_container.load_checkpoints() first_iteration = False # Time window has converged, now micro output can be generated - if self._participant.is_time_window_complete(): + if self._coupling.is_time_window_complete(): self._t += dt # Update time to the end of the time window self._n += 1 # Update time step to the end of the time window @@ -378,7 +360,7 @@ def solve(self) -> None: if self._conn is not None: self._conn.close() - self._participant.finalize() + self._coupling.finalize() def initialize(self) -> None: """ @@ -389,162 +371,54 @@ def initialize(self) -> None: - Create all micro simulation objects and initialize them if an initialize() method is available. - If required, write initial data to preCICE. """ - self._participant.start_profiling_section( - "micro_manager.initialize.direct_access" - ) - - # Decompose the macro-domain and set the mesh access region for each partition in preCICE - if not len( - self._config.macro_domain_bounds() - ) / 2 == self._participant.get_mesh_dimensions(self._macro_mesh_name): - raise Exception("Provided macro mesh bounds are of incorrect dimension") - - if self._mpi.is_parallel(): - if not len( - self._config.ranks_per_axis() - ) == self._participant.get_mesh_dimensions(self._macro_mesh_name): - raise Exception( - "Provided ranks combination is of incorrect dimension" - " and does not match the dimensions of the macro mesh." - ) + with self._profiler.measure("micro_manager.initialize.direct_access"): + # Decompose the macro-domain and set the mesh access region for each partition in preCICE + domain_decomposer: DomainDecomposer = create_domain_decomposer( + self._config, self._mpi, self._logger + ) + access_region = self._config.macro_domain_bounds() + if not self._is_load_balancing: + access_region = domain_decomposer.get_mesh_bounds() + self._coupling.set_access_region(access_region) - domain_decomposer = DomainDecomposer(self._config, self._mpi) + # initialize preCICE + self._coupling.initialize() - if self._mpi.is_parallel() and not self._is_load_balancing: - coupling_mesh_bounds = domain_decomposer.get_local_mesh_bounds() - else: # When serial or load balancing, the whole macro domain is assigned to one/each rank - coupling_mesh_bounds = self._config.macro_domain_bounds() + self._profiler.begin("micro_manager.initialize.initialize_micro_sims") + self._coupling.load_access_region() - self._participant.set_mesh_access_region( - self._macro_mesh_name, coupling_mesh_bounds + local_macro_coords, local_macro_ids = domain_decomposer.partition( + self._coupling.registered_vertex_coords, + self._coupling.registered_vertex_ids, + access_region, ) - - self._participant.stop_last_profiling_section() - - # initialize preCICE - self._participant.initialize() - - self._participant.start_profiling_section( - "micro_manager.initialize.initialize_micro_sims" + local_num_sims, global_num_sims, sims_per_rank = domain_decomposer.finalize( + local_macro_coords ) - ( - self._mesh_vertex_ids, - self._mesh_vertex_coords, - ) = self._participant.get_mesh_vertex_ids_and_coordinates(self._macro_mesh_name) - - if self._mpi.is_parallel() and not self._is_load_balancing: - # Gather all vertex coords and IDs from all ranks onto all ranks, - # filter out coords already claimed by lower-ranked ranks. - # When load balancing, all ranks receive all coords. No duplicates can arise. - # TODO: Avoid the allgather by smartly selecting the relevant coordinates - all_coords = self._mpi.comm.allgather(self._mesh_vertex_coords) - all_ids = self._mpi.comm.allgather(self._mesh_vertex_ids) - - ( - self._mesh_vertex_coords, - self._mesh_vertex_ids, - ) = domain_decomposer.filter_duplicate_coords(all_coords, all_ids) - - # Global coordinates that are necessary for model adaptivity - global_mesh_vertex_coords = self._mpi.comm.allgather( - self._mesh_vertex_coords - ) - self._global_mesh_vertex_coords = np.array( - global_mesh_vertex_coords - ).reshape((-1, self._mesh_vertex_coords.shape[-1])) - else: - # For a serial run or when load balancing, the local mesh vertex coordinates are the global mesh vertex coordinates - self._global_mesh_vertex_coords = self._mesh_vertex_coords - - if self._mesh_vertex_coords.size == 0: - if self._mpi.is_parallel(): - self._logger.log_warning( - "The access region of this rank has no macro-scale vertices. This rank will not have any micro simulations. To avoid this, change the domain decomposition" - ) - if self._lazy_init: - raise Exception( - "The macro mesh has no vertices in the specified access region, but lazy initialization is turned on. Lazy initialization cannot be used if there are no vertices in the access region, as there would be no data to compute the adaptivity and determine which simulations to initialize." - ) - else: - raise Exception( - "The macro mesh has no vertices in the specified access region." - ) - - if self._is_load_balancing and self._mpi.is_parallel(): - ( - local_number_of_sims, - local_macro_coords, - ) = domain_decomposer.get_local_sims_and_macro_coords( - self._mesh_vertex_coords - ) - else: - local_number_of_sims, _ = self._mesh_vertex_coords.shape - local_macro_coords = self._mesh_vertex_coords - - nms_all_ranks = np.zeros(self._mpi.size, dtype=np.int64) - # Gather number of micro simulations that each rank has, because this rank needs to know how many micro - # simulations have been created by previous ranks, so that it can set - # the correct global IDs - self._mpi.comm.Allgatherv(np.array(local_number_of_sims), nms_all_ranks) - - max_nms = np.max(nms_all_ranks) - min_nms = np.min(nms_all_ranks) - - if ( - max_nms != min_nms - ): # if the number of maximum and minimum micro simulations per rank are different - self._logger.log_info_rank_zero( - "The following ranks have the maximum number of micro simulations ({}): {}".format( - max_nms, np.where(nms_all_ranks == max_nms)[0] - ) - ) - self._logger.log_info_rank_zero( - "The following ranks have the minimum number of micro simulations ({}): {}".format( - min_nms, np.where(nms_all_ranks == min_nms)[0] - ) - ) - else: # if the number of maximum and minimum micro simulations per rank are the same - self._logger.log_info_rank_zero( - "All ranks have the same number of micro simulations: {}".format( - max_nms - ) + if local_num_sims == 0 and self._mpi.is_parallel() and self._lazy_init: + raise Exception( + "The macro mesh has no vertices in the specified access region, " + "but lazy initialization is turned on. Lazy initialization cannot be used " + "if there are no vertices in the access region, as there would be no data to compute " + "the adaptivity and determine which simulations to initialize." ) - # Get global number of micro simulations - global_number_of_sims = np.sum(nms_all_ranks) - - self._logger.log_info_rank_zero( - "Total number of micro simulations: {}".format(global_number_of_sims) - ) - if self._is_adaptivity_on: for name in self._adaptivity_data_names: - self._data_for_adaptivity[name] = [0] * local_number_of_sims + self._data_for_adaptivity[name] = [0] * local_num_sims # Create lists of global IDs - global_ids_of_local_sims = [] # DECLARATION - - if self._is_load_balancing: - # Create a set of global coordinate indices for faster lookup - coord_to_index = { - tuple(coord): i for i, coord in enumerate(self._mesh_vertex_coords) - } - - # Set global IDs based on the coordinate ordering in preCICE to be consistent with the scenario without load balancing - for coords in local_macro_coords: - coord_tuple = tuple(coords) - global_ids_of_local_sims.append(coord_to_index[coord_tuple]) - else: - sim_id = np.sum(nms_all_ranks[: self._mpi.rank]) - for i in range(local_number_of_sims): - global_ids_of_local_sims.append(sim_id) - sim_id += 1 + local_gids = self._coupling.generate_gids( + local_macro_coords, + local_macro_ids, + sims_per_rank, + ) self._sim_container.initialize( - glob_num_sims=global_number_of_sims, - local_num_sims=local_number_of_sims, - local_gids=global_ids_of_local_sims, + glob_num_sims=global_num_sims, + local_num_sims=local_num_sims, + local_gids=local_gids, local_coords=local_macro_coords, ) @@ -590,7 +464,7 @@ def initialize(self) -> None: self._adaptivity_controller = create_adaptivity_calculator( self._config, self._sim_container, - self._participant, + self._profiler, self._logger, self._mpi, micro_problem_cls, @@ -604,7 +478,13 @@ def initialize(self) -> None: self._micro_sims_init = False # DECLARATION # Read initial data from preCICE, if it is available - initial_macro_data = self._read_data_from_precice(dt=0) + read_buffer = {} if self._is_adaptivity_on else None + initial_macro_data = self._coupling.read_data_from_precice(0, read_buffer) + # TODO: will be removed in adaptivity refactoring + if self._is_adaptivity_on and read_buffer is not None: + for name in read_buffer.keys(): + if name in self._adaptivity_macro_data_names: + self._data_for_adaptivity[name] = read_buffer[name] first_id = 0 # 0 if lazy initialization is off, otherwise the first active simulation ID micro_sims_to_init = range( @@ -631,7 +511,7 @@ def initialize(self) -> None: self._data_for_adaptivity[name][i] = initial_macro_data[i][name] self._adaptivity_controller.compute_adaptivity( - self._micro_dt, self._data_for_adaptivity + self._coupling.micro_dt, self._data_for_adaptivity ) active_sim_lids = self._adaptivity_controller.get_active_sim_local_ids() @@ -765,7 +645,7 @@ def initialize(self) -> None: self._micro_sims_have_output = micro_problem_cls.check_output() self.load_balancing = create_load_balancer( - self._participant, + self._profiler, self._model_manager, self._adaptivity_controller if self._is_adaptivity_on else None, self._sim_container, @@ -790,7 +670,7 @@ def initialize(self) -> None: initial_micro_data_list = ( self._adaptivity_controller.get_full_field_micro_output( - initial_data, initial_micro_data_list + initial_macro_data, initial_micro_data_list ) ) @@ -800,87 +680,12 @@ def initialize(self) -> None: name ] - self._participant.stop_last_profiling_section() + self._profiler.end() # *************** # Private methods # *************** - def _read_data_from_precice(self, dt) -> list: - """ - Read data from preCICE. - - Parameters - ---------- - dt : float - Time step size at which data is to be read from preCICE. - - Returns - ------- - local_read_data : list - List of dicts in which keys are names of data being read and the values are the data from preCICE. - """ - read_data: dict[str, list] = dict() - - if self._is_load_balancing: - read_vertex_ids = self._sim_container.local_gids - else: - read_vertex_ids = self._mesh_vertex_ids - - for name in self._read_data_names: - read_data[name] = [] - - for name in self._read_data_names: - read_data.update( - { - name: self._participant.read_data( - self._macro_mesh_name, name, read_vertex_ids, dt - ) - } - ) - - if self._is_adaptivity_on: - if name in self._adaptivity_macro_data_names: - self._data_for_adaptivity[name] = read_data[name] - - return [dict(zip(read_data, t)) for t in zip(*read_data.values())] - - def _write_data_to_precice(self, data: list) -> None: - """ - Write data to preCICE. - - Parameters - ---------- - data : list - List of dicts in which keys are names of data and the values are the data to be written to preCICE. - """ - if self._is_load_balancing: - write_vertex_ids = self._sim_container.local_gids - else: - write_vertex_ids = self._mesh_vertex_ids - - data_dict: dict[str, list] = dict() - if not self._sim_container.empty(): - for name in data[0]: - data_dict[name] = [] - - for d in data: - for name, values in d.items(): - data_dict[name].append(values) - - for dname in self._write_data_names: - self._participant.write_data( - self._macro_mesh_name, - dname, - write_vertex_ids, - data_dict[dname], - ) - else: - for dname in self._write_data_names: - self._participant.write_data( - self._macro_mesh_name, dname, [], np.array([]) - ) - def _solve_micro_simulations( self, micro_sims_input: list, dt: float, computed_outputs: dict = {} ) -> list: @@ -1101,7 +906,7 @@ def _solve_micro_simulations_with_model_adaptivity( for lid, out in enumerate(output): if lid in switched_lids: continue - gid = self._sim_container.local_gids[lid] + gid = self._sim_container.local_gids[int(lid)] computed_outputs[gid] = out output = solve_variant(micro_sims_input, dt, computed_outputs) self._model_adaptivity_controller.check_convergence( diff --git a/micro_manager/tools/coupling.py b/micro_manager/tools/coupling.py new file mode 100644 index 00000000..b6448b83 --- /dev/null +++ b/micro_manager/tools/coupling.py @@ -0,0 +1,326 @@ +from micro_manager.config import Config +from micro_manager.simulation_container import SimulationContainer +from micro_manager.tools.mpi_handler import MPIHandler, MPI + +import precice as p +import numpy as np +from typing import List, Dict, Any, Optional, Set + + +class CouplingHandler: + """ + CouplingHandler manages all aspects of coupling with preCICE. + All preCICE API calls are wrapped, and preCICE-relevant data is hosted. + """ + + def __init__( + self, + config: Config, + mpi: MPIHandler, + simulation_container: SimulationContainer, + ): + """ + Creates a new CouplingHandler instance in which a connection to preCICE is established. + + Parameters + ---------- + config : Config + configuration object + mpi : MPIHandler + MPIHandler object + simulation_container : SimulationContainer + Simulation container object. + """ + self._mpi: MPIHandler = mpi + + self._micro_dt: float = config.micro_dt() + + self._macro_mesh_name: str = config.macro_mesh_name() + # Data names of data to output to the snapshot database + self._write_data_names: List[str] = config.write_data_names() + + # Data names of data to read as input parameter to the simulations + self._read_data_names: List[str] = config.read_data_names() + + # Define the preCICE Participant + self._participant: p.Participant = p.Participant( + "Micro-Manager", + config.precice_config_file_name(), + mpi.rank, + mpi.size, + ) + self._sim_container: SimulationContainer = simulation_container + + self._access_region_bounds: List[float] = [] + self._global_region_bounds: List[float] = config.macro_domain_bounds() + # Based on the access region, this rank is associated with a subset of + # IDs and coordinates of the global macro mesh within preCICE. + self._mesh_vertex_ids: List[int] = [] + self._mesh_vertex_coords: List[np.ndarray] = [] + self._mesh_dims: int = self._participant.get_mesh_dimensions( + self._macro_mesh_name + ) + self._gid_to_vertex_id: Dict[int, int] = {} + + # ========================= + # PROPERTIES + # ========================= + + @property + def write_data_names(self) -> List[str]: + return self._write_data_names + + @property + def read_data_names(self) -> List[str]: + return self._read_data_names + + @property + def macro_mesh_name(self) -> str: + return self._macro_mesh_name + + @property + def registered_vertex_ids(self) -> List[int]: + return self._mesh_vertex_ids + + @property + def registered_vertex_coords(self) -> List[np.ndarray]: + return self._mesh_vertex_coords + + @property + def dt(self) -> float: + return float( + np.minimum(self._micro_dt, self._participant.get_max_time_step_size()) + ) + + @property + def micro_dt(self) -> float: + return self._micro_dt + + @property + def participant(self): + return self._participant + + @property + def mesh_dims(self) -> int: + return self._mesh_dims + + @property + def num_registered_vertices(self) -> int: + return len(self._mesh_vertex_coords) + + # ========================= + # preCICE METHODS + # ========================= + + def initialize(self) -> None: + """ + Initializes the preCICE participant. + """ + self._participant.initialize() + + def finalize(self) -> None: + """ + Finalizes the preCICE participant. + """ + self._participant.finalize() + + def is_ongoing(self) -> bool: + """ + Returns whether the preCICE coupling is ongoing. + + Returns + ------- + ongoing : bool + True if the preCICE coupling is ongoing. + """ + return self._participant.is_coupling_ongoing() + + def is_time_window_complete(self) -> bool: + """ + Checks if the current coupling time window is completed. + + Returns + ------- + is_complete : bool + True if time window is completed. + """ + return self._participant.is_time_window_complete() + + def requires_writing_checkpoint(self) -> bool: + """ + Checks if a checkpoint should be written. + + Returns + ------- + should_write : bool + True if the checkpoint should be written. + """ + return self._participant.requires_writing_checkpoint() + + def requires_reading_checkpoint(self) -> bool: + """ + Checks if a checkpoint should be read. + + Returns + ------- + should_read : bool + True if the checkpoint should be read. + """ + return self._participant.requires_reading_checkpoint() + + def advance(self, dt: Optional[float] = None) -> None: + """ + Attempts to advance the coupling to the next time step. + + Parameters + ---------- + dt : Optional[float] + Time step size. + """ + dt = dt or self.dt + self._participant.advance(dt) + + # ========================= + # API METHODS + # ========================= + + def set_access_region(self, access_region: List[float]) -> None: + """ + Sets the region in which the local rank accesses the mesh. + + Parameters + ---------- + access_region : List[float] + List of lower and upper bound. + """ + self._access_region_bounds.clear() + self._access_region_bounds.extend(access_region) + + self._participant.set_mesh_access_region( + self._macro_mesh_name, + self._access_region_bounds, + ) + + def load_access_region(self) -> None: + """ + Reads the IDs and coordinates defined by the access region. + """ + ids, coords = self._participant.get_mesh_vertex_ids_and_coordinates( + self._macro_mesh_name + ) + self._mesh_vertex_ids.extend(ids) + self._mesh_vertex_coords.extend(coords) + + def read_data_from_precice( + self, + dt: Optional[float] = None, + read_buffer: Optional[Dict[str, List[Any]]] = None, + ) -> List[Dict[str, Any]]: + """ + Read data from preCICE. + + Parameters + ---------- + dt : Optional[float] + Time step size at which data is to be read from preCICE. + read_buffer : Optional[Dict[str, List[Any]]] + Buffer in which fields from preCICE are read into. + + Returns + ------- + local_read_data : List[Dict[str, Any]] + List of dicts in which keys are names of data being read and the values are the data from preCICE. + """ + read_data: Dict[str, List[Any]] = read_buffer or {} + read_data.clear() + read_data.update({name: [] for name in self._read_data_names}) + read_vertex_ids: List[int] = [ + self._gid_to_vertex_id[gid] for gid in self._sim_container.local_gids + ] + dt = dt or self.dt + + for name in self._read_data_names: + read_data.update( + { + name: self._participant.read_data( + self._macro_mesh_name, name, read_vertex_ids, dt + ) + } + ) + + return [dict(zip(read_data, t)) for t in zip(*read_data.values())] + + def write_to_precice(self, data: List[Dict[str, Any]]) -> None: + """ + Write data to preCICE. + + Parameters + ---------- + data : list + List of dicts in which keys are names of data and the values are the data to be written to preCICE. + """ + write_vertex_ids: List[int] = [ + self._gid_to_vertex_id[gid] for gid in self._sim_container.local_gids + ] + data_dict: Dict[str, List[Any]] = { + dname: [] for dname in self._write_data_names + } + + for d in data: + for dname in self._write_data_names: + data_dict[dname].append(d[dname]) + + if self._sim_container.empty(): + write_vertex_ids = [] + + for dname in self._write_data_names: + self._participant.write_data( + self._macro_mesh_name, + dname, + write_vertex_ids, + data_dict[dname], + ) + + def generate_gids( + self, + local_vertex_coords: List[np.ndarray], + local_vertex_ids: List[int], + sims_per_rank: List[int], + ) -> List[int]: + """ + Creates GIDs for the provided vertex coordinates and sets up internal lookup maps between GIDs and vertex IDs. + Here, vertex IDs are the IDs as defined by preCICE. + + Parameters + ---------- + local_vertex_coords : List[np.ndarray] + vertex coordinates after domain decomposition. + local_vertex_ids : List[int] + vertex IDs after domain decomposition. + sims_per_rank : List[int] + Number of simulations per rank. + + Returns + ------- + local_gids : List[int] + List of local GIDs. + """ + has_global_access = np.all( + np.array(self._global_region_bounds) == np.array(self._access_region_bounds) + ) + if not has_global_access: + self._mesh_vertex_ids = local_vertex_ids + self._mesh_vertex_coords = local_vertex_coords + + local_vertex_ids_set: Set[int] = set(local_vertex_ids) + local_gids: List[int] = [] + self._gid_to_vertex_id.clear() + gid = 0 if has_global_access else np.sum(sims_per_rank[: self._mpi.rank]) + for v_id in self._mesh_vertex_ids: + self._gid_to_vertex_id[gid] = v_id + if v_id in local_vertex_ids_set: + local_gids.append(gid) + + gid += 1 + + return local_gids diff --git a/micro_manager/tools/profiling.py b/micro_manager/tools/profiling.py new file mode 100644 index 00000000..811a0def --- /dev/null +++ b/micro_manager/tools/profiling.py @@ -0,0 +1,102 @@ +import precice as p + + +class ProfilingContext: + """ + Provides an automated context in which profiling is performed by preCICE. + When the context is entered profiling is started. + Upon exiting it, profiling is stopped. + Only one context may be active at a time. This is checked during debug mode. + """ + + CONTEXT_ACTIVE: bool = False + + def __init__(self, participant: p.Participant, name: str): + """ + Creates a new ProfilingContext instance. + + Parameters + ---------- + participant : p.Participant + preCICE participant + name : str + profiling name + """ + self._participant: p.Participant = participant + self._name: str = name + + def __enter__(self) -> "ProfilingContext": + """ + Called when entering the context in: + with obj: ... + """ + assert not self.CONTEXT_ACTIVE + self.CONTEXT_ACTIVE = True + + self._participant.start_profiling_section(self._name) + return self + + def __exit__(self, *args) -> None: + """ + Called when exiting the context in: + with obj: ... + """ + self._participant.stop_last_profiling_section() + self.CONTEXT_ACTIVE = False + + +class Profiler: + """ + Provides methods to perform profiling with preCICE. + """ + + def __init__(self, participant: p.Participant): + """ + Creates a new Profiler instance. + + Parameters + ---------- + participant : p.Participant + preCICE participant + """ + self._participant: p.Participant = participant + + def measure(self, name: str) -> ProfilingContext: + """ + Creates an automated profiling context. Usage: + + with profiler.measure("name"): + ... calls to be measured ... + + ... other code ... + + Parameters + ---------- + name : str + profiling name + + Returns + ------- + ctx : ProfilingContext + profiling context + """ + return ProfilingContext(self._participant, name) + + def begin(self, name: str) -> None: + """ + Begins manual profiling. Must be ended with a corresponding call to end. + + Parameters + ---------- + name : str + profiling name + """ + assert not ProfilingContext.CONTEXT_ACTIVE + self._participant.start_profiling_section(name) + + def end(self): + """ + Ends manual profiling. + """ + self._participant.stop_last_profiling_section() + ProfilingContext.CONTEXT_ACTIVE = False diff --git a/tests/unit/precice.py b/tests/unit/precice.py index 283dc78a..1d01fa44 100644 --- a/tests/unit/precice.py +++ b/tests/unit/precice.py @@ -49,7 +49,9 @@ def set_mesh_access_region(self, mesh_name, bounds): pass def get_mesh_vertex_ids_and_coordinates(self, mesh_name): - return np.array([0, 1, 2, 3]), np.array([[0, 0], [1, 0], [1, 1], [0, 1]]) + return np.array([0, 1, 2, 3]), np.array( + [[0, 0, 0], [1, 0, 0], [1, 1, 0], [0, 1, 0]] + ) def write_data(self, mesh_name, data_name, vertex_ids, data): if data_name == "Micro-Scalar-Data": diff --git a/tests/unit/test_adaptivity_parallel.py b/tests/unit/test_adaptivity_parallel.py index 87b50ee8..aee6efa7 100644 --- a/tests/unit/test_adaptivity_parallel.py +++ b/tests/unit/test_adaptivity_parallel.py @@ -72,7 +72,7 @@ def test_update_inactive_sims_global_adaptivity(self): adaptivity_controller = GlobalAdaptivityCalculator( self._configurator, sim_container=container, - participant=MagicMock(), + profiler=MagicMock(), base_logger=MagicMock(), mpi=self._mpi, micro_problem_cls=sim_cls, @@ -160,7 +160,7 @@ def test_update_all_active_sims_global_adaptivity(self): adaptivity_controller = GlobalAdaptivityCalculator( self._configurator, sim_container=container, - participant=MagicMock(), + profiler=MagicMock(), base_logger=MagicMock(), mpi=self._mpi, micro_problem_cls=sim_cls, @@ -226,7 +226,7 @@ def test_communicate_micro_output(self): adaptivity_controller = GlobalAdaptivityCalculator( self._configurator, sim_container=container, - participant=MagicMock(), + profiler=MagicMock(), base_logger=MagicMock(), mpi=self._mpi, micro_problem_cls=sim_cls, diff --git a/tests/unit/test_domain_decomposition.py b/tests/unit/test_domain_decomposition.py index 1ec6d08d..fc60bda4 100644 --- a/tests/unit/test_domain_decomposition.py +++ b/tests/unit/test_domain_decomposition.py @@ -1,7 +1,11 @@ from unittest import TestCase, skip from unittest.mock import MagicMock import numpy as np -from micro_manager.domain_decomposition import DomainDecomposer +from micro_manager.domain_decomposition import ( + DomainDecomposer, + NonUniformGridDecomp, + UniformGridDecomp, +) class TestDomainDecomposition(TestCase): @@ -37,8 +41,10 @@ def test_rank2_out_of_4_2d(self): mpi = MagicMock() mpi.rank = 2 mpi.size = 4 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [0, 0.5, 1, 2])) @@ -55,8 +61,10 @@ def test_rank1_out_of_4_3d(self): mpi = MagicMock() mpi.rank = 1 mpi.size = 4 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [0.0, 1, -2, 0.0, -2, 8])) @@ -73,8 +81,10 @@ def test_rank5_outof_10_3d(self): mpi = MagicMock() mpi.rank = 5 mpi.size = 10 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [-1, 1, 0, 2, 2, 4])) @@ -91,8 +101,10 @@ def test_rank10_out_of_32_3d(self): mpi = MagicMock() mpi.rank = 10 mpi.size = 32 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [0, 0.5, -2, 2, 0.5, 1.75])) @@ -109,8 +121,10 @@ def test_rank7_out_of_16_3d(self): mpi = MagicMock() mpi.rank = 7 mpi.size = 16 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [0.75, 1, -2, 0, -2, 8])) @@ -149,8 +163,10 @@ def test_nonuniform_rank2_out_of_4_2d(self): mpi = MagicMock() mpi.rank = 2 mpi.size = 4 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = NonUniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue(np.allclose(mesh_bounds, [0.0, 1.0 / 3.0, 2.0 / 3.0, 2.0])) @@ -171,8 +187,10 @@ def test_nonuniform_rank15_out_of_128_2d(self): mpi = MagicMock() mpi.rank = 15 mpi.size = 128 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = NonUniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue( np.allclose(mesh_bounds, [0.756153, 1.0, 0.0, 0.019664], atol=1e-5) @@ -182,8 +200,10 @@ def test_nonuniform_rank15_out_of_128_2d(self): mpi = MagicMock() mpi.rank = 112 mpi.size = 128 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = NonUniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue( np.allclose(mesh_bounds, [0.0, 1.0 / 256.0, 0.364631, 0.5], atol=1e-5) @@ -203,8 +223,10 @@ def test_nonuniform_rank1_out_of_4_3d(self): mpi = MagicMock() mpi.rank = 1 mpi.size = 4 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) - mesh_bounds = domain_decomposer.get_local_mesh_bounds() + domain_decomposer = NonUniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + mesh_bounds = domain_decomposer.get_mesh_bounds() self.assertTrue( np.allclose(mesh_bounds, [-1.0 / 3.0, 1.0, -2.0, -2.0 / 3.0, -2.0, 8.0]) @@ -219,10 +241,12 @@ def test_nonuniform_invalid_processor_count_raises(self): mpi = MagicMock() mpi.rank = 0 mpi.size = 4 - domain_decomposer = DomainDecomposer(self._configuration_mock, mpi) with self.assertRaises(ValueError): - domain_decomposer.get_local_mesh_bounds() + domain_decomposer = NonUniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ) + domain_decomposer.get_mesh_bounds() class TestDuplicateCoordFiltering(TestCase): @@ -245,21 +269,23 @@ def test_no_duplicates(self): all_ids = [[0, 1], [2, 3]] self._configuration_mock.decomposition_type.return_value = "uniform" + self._configuration_mock.ranks_per_axis.return_value = [2, 1] + self._configuration_mock.macro_domain_bounds.return_value = [-2, -2, 2, 2] mpi = MagicMock() mpi.rank = 0 mpi.size = 2 - coords, ids = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords, ids = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords), 2) mpi = MagicMock() mpi.rank = 1 mpi.size = 2 - coords, ids = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords, ids = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords), 2) def test_duplicate_on_boundary_rank0_keeps(self): @@ -275,14 +301,16 @@ def test_duplicate_on_boundary_rank0_keeps(self): all_ids = [[0, 1], [1, 2]] self._configuration_mock.decomposition_type.return_value = "uniform" + self._configuration_mock.ranks_per_axis.return_value = [2, 1] + self._configuration_mock.macro_domain_bounds.return_value = [-2, -2, 2, 2] # Rank 0 should keep both its coords mpi = MagicMock() mpi.rank = 0 mpi.size = 2 - coords0, ids0 = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords0, ids0 = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords0), 2) self.assertTrue(np.allclose(coords0[1], shared)) @@ -290,9 +318,9 @@ def test_duplicate_on_boundary_rank0_keeps(self): mpi = MagicMock() mpi.rank = 1 mpi.size = 2 - coords1, ids1 = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords1, ids1 = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords1), 1) self.assertTrue(np.allclose(coords1[0], [1.0, 0.0])) @@ -309,29 +337,31 @@ def test_duplicate_on_boundary_three_ranks(self): all_ids = [[0, 1], [0, 2], [0, 3]] self._configuration_mock.decomposition_type.return_value = "uniform" + self._configuration_mock.ranks_per_axis.return_value = [3, 1] + self._configuration_mock.macro_domain_bounds.return_value = [-3, -3, 3, 3] mpi = MagicMock() mpi.rank = 0 mpi.size = 3 - coords0, _ = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords0, _ = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords0), 2) mpi = MagicMock() mpi.rank = 1 mpi.size = 3 - coords1, _ = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords1, _ = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords1), 1) self.assertTrue(np.allclose(coords1[0], [1.0, 0.0])) mpi = MagicMock() mpi.rank = 2 mpi.size = 3 - coords2, _ = DomainDecomposer( - self._configuration_mock, mpi - ).filter_duplicate_coords(all_coords, all_ids) + coords2, _ = UniformGridDecomp( + self._configuration_mock, mpi, MagicMock() + ).filter_duplicates(all_coords, all_ids) self.assertEqual(len(coords2), 1) self.assertTrue(np.allclose(coords2[0], [2.0, 0.0])) diff --git a/tests/unit/test_micro_manager.py b/tests/unit/test_micro_manager.py index 42306031..a6aa0b82 100644 --- a/tests/unit/test_micro_manager.py +++ b/tests/unit/test_micro_manager.py @@ -60,8 +60,12 @@ def test_micromanager_constructor(self): """ manager = micro_manager.MicroManagerCoupling("micro-manager-config.json") - self.assertListEqual(manager._read_data_names, self.fake_read_data_names) - self.assertListEqual(self.fake_write_data_names, manager._write_data_names) + self.assertListEqual( + manager._coupling.read_data_names, self.fake_read_data_names + ) + self.assertListEqual( + self.fake_write_data_names, manager._coupling.write_data_names + ) self.assertEqual(manager._micro_n_out, 10) def test_initialization(self): @@ -71,15 +75,19 @@ def test_initialization(self): manager = micro_manager.MicroManagerCoupling("micro-manager-config.json") manager.initialize() - self.assertEqual(manager._micro_dt, 0.1) # from Interface.initialize + self.assertEqual(manager._coupling.micro_dt, 0.1) # from Interface.initialize self.assertEqual(manager._sim_container.global_num_sims, 4) - self.assertListEqual(manager._mesh_vertex_ids.tolist(), [0, 1, 2, 3]) + self.assertListEqual(manager._coupling.registered_vertex_ids, [0, 1, 2, 3]) self.assertEqual(len(manager._sim_container), 4) self.assertEqual( manager._sim_container[0].very_important_value, 0 ) # test inheritance - self.assertListEqual(manager._read_data_names, self.fake_read_data_names) - self.assertListEqual(self.fake_write_data_names, manager._write_data_names) + self.assertListEqual( + manager._coupling.read_data_names, self.fake_read_data_names + ) + self.assertListEqual( + self.fake_write_data_names, manager._coupling.write_data_names + ) def test_read_write_data_from_precice(self): """ @@ -93,8 +101,8 @@ def test_read_write_data_from_precice(self): manager._sim_container = container manager._mesh_vertex_ids = container.local_coords - manager._write_data_to_precice(self.fake_write_data) - read_data = manager._read_data_from_precice(1.0) + manager._coupling.write_to_precice(self.fake_write_data) + read_data = manager._coupling.read_data_from_precice(1.0) for data, fake_data in zip(read_data, self.fake_read_data): self.assertEqual(data["Macro-Scalar-Data"], 1) diff --git a/tests/unit/test_micro_simulation_crash_handling.py b/tests/unit/test_micro_simulation_crash_handling.py index 499dc513..5b49e11f 100644 --- a/tests/unit/test_micro_simulation_crash_handling.py +++ b/tests/unit/test_micro_simulation_crash_handling.py @@ -58,7 +58,7 @@ def test_crash_handling(self): 4, 4, [0, 1, 2, 3], np.array([[-2, 0, 0], [-1, 0, 0], [1, 0, 0], [2, 0, 0]]) ) manager._has_sim_crashed = [False] * 4 - manager._mesh_vertex_coords = container.local_coords + manager._coupling._mesh_vertex_coords = container.local_coords manager._is_adaptivity_on = ( False # make sure adaptivity is off overriding config ) @@ -113,7 +113,7 @@ def test_crash_handling_with_adaptivity(self): manager._number_of_nearest_neighbors = 3 # reduce number of neighbors to 3 manager._micro_sims_active_steps = np.zeros(5, dtype=np.int32) manager._has_sim_crashed = [False] * 5 - manager._mesh_vertex_coords = container.local_coords + manager._coupling._mesh_vertex_coords = container.local_coords for lid in container.range_lid: container[lid] = MicroSimulation(lid) manager._sim_container = container