From 9952550c05c9b7cc1ee63df1433fd29ca93abf69 Mon Sep 17 00:00:00 2001 From: Jack Lovell Date: Thu, 13 Mar 2025 16:34:58 +0000 Subject: [PATCH 1/2] Implement MPIEngine render engine This render engine uses SPMD (single process, multiple data) to perform renders in parallel, using MPI (message passing interface). The paradigm is that multiple independent processes are launched, all of which build their own copy of the scenegraph in parallel and all generate rendering tasks. Each process with rank > 0 then works on a subset of those tasks and sends the results to the rank = 0 process, which processes the results. After the render is complete, only process 0 has the complete results. MPI communications are blocking (send/recv rather than isend/irecv) and use mpi4py's methods for general Python objects, rather than the high performance equivalents utilising the buffer protocol (Send/Recv). The latter is done to avoid loss of generality of the render engine: it supports any function supported by the other render engines, not just ones returning objects supporting the buffer protocol. The use of blocking comms simplifies the implementation and in testing (using the MPI variant of the raysect logo demo) the MPI comms were a small fraction - < 5% - of the total run time, with the vast majority of the time - > 85 % - spent in the render function itself. So the simplicity of implementation does not have a significant performance cost. Tasks are distributed approximately equally among all workers at startup, with no adaptive scheduling. Again, this is done for simplicity of implementation as an adaptive scheduler would need some way of tracking how long each process was spending on tasks and all processes would need to communicate with one another about which tasks they were taking on. Adding an adaptive scheduler could be left to future work, but in testing any uneven runtime caused by the naive task distribution seemed to only have a small effect on the total run time, so the simplicity of the current solution is advantageous. --- demos/cornell_box_mpi.py | 210 ++++++++++++++++++ demos/raysect_logo_mpi.py | 81 +++++++ .../api_reference/core/render_engines.rst | 4 +- raysect/core/workflow.py | 111 +++++++++ 4 files changed, 404 insertions(+), 2 deletions(-) create mode 100644 demos/cornell_box_mpi.py create mode 100644 demos/raysect_logo_mpi.py diff --git a/demos/cornell_box_mpi.py b/demos/cornell_box_mpi.py new file mode 100644 index 00000000..950b5708 --- /dev/null +++ b/demos/cornell_box_mpi.py @@ -0,0 +1,210 @@ +""" +Cornell Box MPI Demo +==================== + +This demo renders a variant of the classic Cornell Box scene. + +For the original Cornell Box see: + + http://www.graphics.cornell.edu/online/box/data.html + +The wall colours and light spectrum used in this demo are the values measured +for the physical Cornell Box. + +The MPI (message passing interface) render engine is used for this demo. +To run, use `mpirun -n python cornell_box_mpi.py`. An +MPI library must be installed on your system, and the mpi4py Python +package must be installed in the Python environment. + +There are some small differences between this demo and the shared-memory +equilvalent cornell_box.py, due to details of the MPI implementation meaning +we need to communicate the results of one render pass to all the worker +processes before the next pass. Compare the last section of the two demo +scripts to see how this is done. +""" + +from numpy import array + +from raysect.primitive import Sphere, Box +from raysect.optical import World, Node, translate, rotate, Point3D +from raysect.optical.material import Lambert, UniformSurfaceEmitter +from raysect.optical.library import InterpolatedSF, schott +from raysect.optical.observer import PinholeCamera +from raysect.optical.observer import RGBPipeline2D, BayerPipeline2D, PowerPipeline2D +from raysect.optical.observer import RGBAdaptiveSampler2D +from raysect.core.workflow import MPIEngine + + +# define reflectivity for box surfaces +wavelengths = array( + [400, 404, 408, 412, 416, 420, 424, 428, 432, 436, 440, 444, 448, 452, 456, 460, 464, 468, 472, 476, 480, 484, 488, + 492, 496, 500, 504, 508, 512, 516, 520, 524, 528, 532, 536, 540, 544, 548, 552, 556, 560, 564, 568, 572, 576, 580, + 584, 588, 592, 596, 600, 604, 608, 612, 616, 620, 624, 628, 632, 636, 640, 644, 648, 652, 656, 660, 664, 668, 672, + 676, 680, 684, 688, 692, 696, 700]) + +white = array( + [0.343, 0.445, 0.551, 0.624, 0.665, 0.687, 0.708, 0.723, 0.715, 0.71, 0.745, 0.758, 0.739, 0.767, 0.777, 0.765, + 0.751, 0.745, 0.748, 0.729, 0.745, 0.757, 0.753, 0.75, 0.746, 0.747, 0.735, 0.732, 0.739, 0.734, 0.725, 0.721, + 0.733, 0.725, 0.732, 0.743, 0.744, 0.748, 0.728, 0.716, 0.733, 0.726, 0.713, 0.74, 0.754, 0.764, 0.752, 0.736, + 0.734, 0.741, 0.74, 0.732, 0.745, 0.755, 0.751, 0.744, 0.731, 0.733, 0.744, 0.731, 0.712, 0.708, 0.729, 0.73, + 0.727, 0.707, 0.703, 0.729, 0.75, 0.76, 0.751, 0.739, 0.724, 0.73, 0.74, 0.737]) + +green = array( + [0.092, 0.096, 0.098, 0.097, 0.098, 0.095, 0.095, 0.097, 0.095, 0.094, 0.097, 0.098, 0.096, 0.101, 0.103, 0.104, + 0.107, 0.109, 0.112, 0.115, 0.125, 0.14, 0.16, 0.187, 0.229, 0.285, 0.343, 0.39, 0.435, 0.464, 0.472, 0.476, 0.481, + 0.462, 0.447, 0.441, 0.426, 0.406, 0.373, 0.347, 0.337, 0.314, 0.285, 0.277, 0.266, 0.25, 0.23, 0.207, 0.186, + 0.171, 0.16, 0.148, 0.141, 0.136, 0.13, 0.126, 0.123, 0.121, 0.122, 0.119, 0.114, 0.115, 0.117, 0.117, 0.118, 0.12, + 0.122, 0.128, 0.132, 0.139, 0.144, 0.146, 0.15, 0.152, 0.157, 0.159]) + +red = array( + [0.04, 0.046, 0.048, 0.053, 0.049, 0.05, 0.053, 0.055, 0.057, 0.056, 0.059, 0.057, 0.061, 0.061, 0.06, 0.062, 0.062, + 0.062, 0.061, 0.062, 0.06, 0.059, 0.057, 0.058, 0.058, 0.058, 0.056, 0.055, 0.056, 0.059, 0.057, 0.055, 0.059, + 0.059, 0.058, 0.059, 0.061, 0.061, 0.063, 0.063, 0.067, 0.068, 0.072, 0.08, 0.09, 0.099, 0.124, 0.154, 0.192, + 0.255, 0.287, 0.349, 0.402, 0.443, 0.487, 0.513, 0.558, 0.584, 0.62, 0.606, 0.609, 0.651, 0.612, 0.61, 0.65, 0.638, + 0.627, 0.62, 0.63, 0.628, 0.642, 0.639, 0.657, 0.639, 0.635, 0.642]) + +white_reflectivity = InterpolatedSF(wavelengths, white) +red_reflectivity = InterpolatedSF(wavelengths, red) +green_reflectivity = InterpolatedSF(wavelengths, green) + +# define light spectrum +light_spectrum = InterpolatedSF(array([400, 500, 600, 700]), array([0.0, 8.0, 15.6, 18.4])) + +# set-up scenegraph +world = World() + +# enclosing box +enclosure = Node(world) + +e_back = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, 0, 1) * rotate(0, 0, 0), + material=Lambert(white_reflectivity)) + +e_bottom = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, -1, 0) * rotate(0, -90, 0), + # material=m) + material=Lambert(white_reflectivity)) + +e_top = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, 1, 0) * rotate(0, 90, 0), + material=Lambert(white_reflectivity)) + +e_left = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(1, 0, 0) * rotate(-90, 0, 0), + material=Lambert(red_reflectivity)) + +e_right = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(-1, 0, 0) * rotate(90, 0, 0), + material=Lambert(green_reflectivity)) + +# ceiling light +light = Box(Point3D(-0.4, -0.4, -0.01), Point3D(0.4, 0.4, 0.0), + parent=enclosure, + transform=translate(0, 1, 0) * rotate(0, 90, 0), + material=UniformSurfaceEmitter(light_spectrum, 2)) + +# alternate light #1 +# light = Box(Point3D(-0.4, -0.4, -0.01), Point3D(0.4, 0.4, 0.0), +# parent=enclosure, +# transform=translate(0, 1, 0) * rotate(0, 90, 0), +# material=UniformSurfaceEmitter(d65_white, 2)) + +# alternate light #2 +# back_light = Sphere(0.1, +# parent=enclosure, +# transform=translate(0.80, -0.85, 0.80)*rotate(0, 0, 0), +# material=UniformSurfaceEmitter(light_spectrum, 10.0)) + +# objects in enclosure +box = Box(Point3D(-0.4, 0, -0.4), Point3D(0.3, 1.4, 0.3), + parent=world, + transform=translate(0.4, -1 + 1e-6, 0.4)*rotate(30, 0, 0), + material=schott("N-BK7")) + +sphere = Sphere(0.4, + parent=world, + transform=translate(-0.4, -0.6 + 1e-6, -0.4)*rotate(0, 0, 0), + material=schott("N-BK7")) + + +filter_red = InterpolatedSF([100, 650, 660, 670, 680, 800], [0, 0, 1, 1, 0, 0]) +filter_green = InterpolatedSF([100, 530, 540, 550, 560, 800], [0, 0, 1, 1, 0, 0]) +filter_blue = InterpolatedSF([100, 480, 490, 500, 510, 800], [0, 0, 1, 1, 0, 0]) + +# create and setup the camera +power_unfiltered = PowerPipeline2D(display_unsaturated_fraction=0.96, name="Unfiltered") +power_unfiltered.display_update_time = 15 + +power_green = PowerPipeline2D(filter=filter_green, display_unsaturated_fraction=0.96, name="Green Filter") +power_green.display_update_time = 15 + +power_red = PowerPipeline2D(filter=filter_red, display_unsaturated_fraction=0.96, name="Red Filter") +power_red.display_update_time = 15 + +rgb = RGBPipeline2D(display_unsaturated_fraction=0.96, name="sRGB") + +bayer = BayerPipeline2D(filter_red, filter_green, filter_blue, display_unsaturated_fraction=0.96, name="Bayer Filter") +bayer.display_update_time = 15 + +pipelines = [rgb, power_unfiltered, power_green, power_red, bayer] + +sampler = RGBAdaptiveSampler2D(rgb, ratio=10, fraction=0.2, min_samples=500, cutoff=0.01) + + +camera = PinholeCamera((1024, 1024), parent=world, transform=translate(0, 0, -3.3) * rotate(0, 0, 0), pipelines=pipelines) +camera.frame_sampler = sampler +camera.spectral_rays = 1 +camera.spectral_bins = 15 +camera.pixel_samples = 250 +camera.ray_importance_sampling = True +camera.ray_important_path_weight = 0.25 +camera.ray_max_depth = 500 +camera.ray_extinction_min_depth = 3 +camera.ray_extinction_prob = 0.01 + +# Speedups for testing purposes. +# Fewer camera pixels for faster runtime per pass. +camera.pixels = (256, 256) +# Less strict cutoff to finish with fewer passes. +sampler.cutoff = 0.05 + + +camera.render_engine = MPIEngine() + +# Don't make plots during the render as MPI is typically run non-interactively. +for pipeline in camera.pipelines: + pipeline.display_progress = False + +# Worker processes do not have accurate statistics, only the root process has +# all the render results. So don't bother outputting statistics on the workers. +if camera.render_engine.rank != 0: + camera.quiet = True + +# start ray tracing. +print(f"Starting ray tracing on rank {camera.render_engine.rank}") +render_pass = 1 +while not camera.render_complete: + if camera.render_engine.rank == 0: + print(f"Rendering pass {render_pass}...") + camera.observe() + # Rank 0 processes the ray tracing results so is the only process which + # knows the true progress of the render. In order to know when the render + # is complete, we need to ensure the frame sampler on each of the worker + # processes has an up-to-date copy of the pipeline used for sampling. + # We also need to use a different variable name for the received broadcast + # object else it is not broadcast properly. + root_rgb = camera.render_engine.comm.bcast(rgb, root=0) + camera.frame_sampler.pipeline = root_rgb + camera.render_engine.comm.Barrier() + render_pass += 1 +print(f"Finished ray tracing on rank {camera.render_engine.rank}") + +# Again, only rank 0 has all the ray tracing results so is the only one which +# can produce a correct image. +if camera.render_engine.rank == 0: + rgb.save('CornellMPI_rgb.png') diff --git a/demos/raysect_logo_mpi.py b/demos/raysect_logo_mpi.py new file mode 100644 index 00000000..95df1d7a --- /dev/null +++ b/demos/raysect_logo_mpi.py @@ -0,0 +1,81 @@ +""" +Renders the raysect logo: a top-down view of 6 coloured slabs. + +MPI (message passing interface) is used to parallelise the workflow. +Running this demo therefore requires an MPI library, such as MPICH, +OpenMPI or MicrosoftMPI. The mpi4py Python package must also +be installed. + +To run this demo, call `mpirun -n python raysect_logo_mpi.py`. +If a batch job scheduler is used on a cluster, the render may be spread +across multiple nodes of the cluster. If run interactively at the command +line, all rendering will be done on the same machine as the command line. +""" + +from matplotlib.pyplot import * +from numpy import array + +from raysect.primitive import Sphere, Box + +from raysect.optical import World, Node, translate, rotate, Point3D, d65_white, ConstantSF, InterpolatedSF +from raysect.optical.observer import PinholeCamera +from raysect.optical.material.emitter import UniformSurfaceEmitter +from raysect.optical.material.dielectric import Dielectric +from raysect.core.workflow import MPIEngine + + +world = World() + +wavelengths = array([300, 490, 510, 590, 610, 800]) +red_attn = array([0.0, 0.0, 0.0, 0.0, 1.0, 1.0]) * 0.98 +green_attn = array([0.0, 0.0, 1.0, 1.0, 0.0, 0.0]) * 0.85 +blue_attn = array([1.0, 1.0, 0.0, 0.0, 0.0, 0.0]) * 0.98 +yellow_attn = array([0.0, 0.0, 1.0, 1.0, 1.0, 1.0]) * 0.85 +cyan_attn = array([1.0, 1.0, 1.0, 1.0, 0.0, 0.0]) * 0.85 +purple_attn = array([1.0, 1.0, 0.0, 0.0, 1.0, 1.0]) * 0.95 + +red_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, red_attn)) +green_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, green_attn)) +blue_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, blue_attn)) +yellow_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, yellow_attn)) +cyan_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, cyan_attn)) +purple_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, purple_attn)) + +Sphere(1000, world, material=UniformSurfaceEmitter(d65_white, 1.0)) + +node = Node(parent=world, transform=rotate(0, 0, 90)) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 0) * translate(0, 1, -0.500001), red_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 60) * translate(0, 1, -0.500001), yellow_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 120) * translate(0, 1, -0.500001), green_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 180) * translate(0, 1, -0.500001), cyan_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 240) * translate(0, 1, -0.500001), blue_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 300) * translate(0, 1, -0.500001), purple_glass) + +camera = PinholeCamera((256, 256), fov=45, parent=world, transform=translate(0, 0, -6.5) * rotate(0, 0, 0)) + +camera.ray_max_depth = 500 +camera.ray_extinction_prob = 0.01 +camera.pixel_samples = 100 +camera.spectral_rays = 1 +camera.spectral_bins = 21 +camera.render_engine = MPIEngine() + +# With the MPI engine, only the rank 0 process has all the results available +# to generate sampling statistics. So don't output stats on the workers. +if camera.render_engine.rank != 0: + camera.quiet = True + +# Don't try making plots during the render: this will likely fail when run +# through a non-interactive job scheduler. +for pipeline in camera.pipelines: + pipeline.display_progress = False + +camera.observe() + +# Again, only rank 0 has all the results, so this is the only process that +# should produce any output. +if camera.render_engine.rank == 0: + # Comment out if you don't want to save the result to a file: + camera.pipelines[0].save("raysect_logo.png") + # Uncomment if running interactively for a plot of the result: + camera.pipelines[0].display() diff --git a/docs/source/api_reference/core/render_engines.rst b/docs/source/api_reference/core/render_engines.rst index c0a8154a..ecca20f4 100644 --- a/docs/source/api_reference/core/render_engines.rst +++ b/docs/source/api_reference/core/render_engines.rst @@ -11,5 +11,5 @@ Render Engines .. autoclass:: raysect.core.workflow.MulticoreEngine :show-inheritance: - - +.. autoclass:: raysect.core.workflow.MPIEngine + :show-inheritance: diff --git a/raysect/core/workflow.py b/raysect/core/workflow.py index beedff2a..814372f5 100644 --- a/raysect/core/workflow.py +++ b/raysect/core/workflow.py @@ -27,10 +27,17 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +from collections import defaultdict from multiprocessing import get_context, cpu_count from raysect.core.math import random import time +try: + from mpi4py import MPI + HAVE_MPI = True +except ImportError: + HAVE_MPI = False + class RenderEngine: """ @@ -326,6 +333,110 @@ def _worker(self, render, args, kwargs, job_queue, result_queue): result_queue.put(results) +class MPIEngine(RenderEngine): + """ + Render engine for running in an MPI context. + + This engine is useful for distributed memory systems, and for shared + memory systems where the overhead of inter-process communication of + the scenegraph is large compared with the time taken for a single + process to produce the scenegraph (e.g. on Windows). + + This render engine requires mpi4py to be installed, and the program + to be run using ``mpirun -n python ``. + + >>> from raysect.core import MPIEngine + >>> from raysect.optical.observer import PinholeCamera + >>> + >>> camera = PinholeCamera((512, 512)) + >>> camera.render_engine = MPIEngine() + + The render engine uses the single process, multiple data (SPMD) paradigm. + Each process is treated as a separate serial renderer. It is assumed that + the scene graph is created in every process, and each process processes a + subset of the total rendering tasks sequentially. The results are then + gathered back to the root (rank 0) process. + + The SPMD paradigm means there are many copies of the program running and + each copy runs the same instructions, so programs must be written with + this in mind. While all copies build the scene to render, only one copy + (the process with rank 0) actually receives all the render results + and calls the update function. This means that after an observe, + only the rank 0 process contains the results of the call to the render + function for all tasks, and so any post processing (including plotting + or saving images) should only be done on the rank 0 process. + + Also, any further renders which depend on the results of a previous render + (for example, using adapive samplers) will require communication of the + results from rank 0 to all the other processes: + + >>> pipeline = RGBPipeline2d() + >>> camera.pipelines = [pipeline] + >>> camera.sampler = RGBAdaptivesampler2d(pipeline) + >>> camera.observe() + >>> # Update the sampler in each worker process with the results + >>> # of the previous render for the next pass. + >>> root_pipeline = camera.render_engine.comm.bcast(pipeline, root=0) + >>> camera.sampler.pipeline = root_pipeline + >>> # Now subsequent observes will have the correct statistics. + >>> camera.observe() + + The class contains some attributes relevant to the MPI environment: + + :ivar comm: the MPI communicator (``MPI_COMM_WORLD``). + :ivar rank: the process rank in the communicator. Only rank 0 contains the + full results after a call ``to RenderEngine.run()``. + :ivar size: the number of processes in the communicator. + """ + def __init__(self): + if not HAVE_MPI: + raise RuntimeError("The mpi4py package is required to use this engine.") + comm = MPI.COMM_WORLD + self.comm = comm + self.rank = comm.Get_rank() + self.size = comm.Get_size() + if self.size < 2: + raise RuntimeError("At least 2 separate processes are required to use this engine.") + + def run(self, tasks, render, update, render_args=(), render_kwargs=None, update_args=(), update_kwargs=None): + # Avoid mutable default arguments. + if render_kwargs is None: + render_kwargs = {} + if update_kwargs is None: + update_kwargs = {} + # All processes must have the same tasks, in the same order, so that + # all processes agree on which subset of tasks each is to work on. + tasks = self.comm.bcast(tasks, root=0) + ntasks = len(tasks) + nworkers = self.size - 1 + worker_tasks = defaultdict(list) + for i, task in enumerate(tasks): + worker_tasks[i % nworkers].append(task) + if self.rank == 0: # The root node processes the results. + remaining = ntasks + while remaining: + result = self.comm.recv() + if isinstance(result, Exception): + raise result + update(result, *update_args, **update_kwargs) + remaining -= 1 + else: # Each worker renders the subset of tasks assigned to them. + # Unlike MulticoreEngine, there is no need to re-seed the random + # number generator to prevent all workers inheriting the same sequence + # since all processes are independent. + for task in worker_tasks[self.rank - 1]: + try: + result = render(task, *render_args, **render_kwargs) + except Exception as e: + result = e + self.comm.send(result, 0) + self.comm.Barrier() + + def worker_count(self): + return self.size + + + if __name__ == '__main__': class Job: From f4f61980dc6fab419dbbca7a15ebb32a546939fd Mon Sep 17 00:00:00 2001 From: Jack Lovell Date: Tue, 1 Apr 2025 17:05:18 +0100 Subject: [PATCH 2/2] Implement hybrid render engine This render engine uses a combination of distributed and shared memory tools to perform parallel rendering. MPI (mesage passing interface) is used to communicate between processes with separate memory, for example on different nodes of a compute cluster. Within each MPI process a secondary render engine is used for shared memory computation. The secondary (or sub) render engine may be any other render engine. `SerialEngine` and `MulticoreEngine` are currently supported, though the former is less efficient than using the MPIEngine due to additional IPC overhead. The engine+subengine architecture leaves room to expand to additional engines in future, such as a `MultiThreadEngine` in a free-threaded (no GIL) Python build. Gathering of render results is done in the main process on MPI rank 0. A sub process is spawned on rank 0 in which the sub render engine runs. For this to work efficiently the operating system must support cheap process forking without memcopy. At present only Linux is known to support this properly. MacOS may work if you're lucky. This limitation means the render engine is not truly cross platform: if cross-platform compatibility is required then SerialEngine and MPIEngine are still the only suitable choices. Ideally the result gathering and updates should not require spawning a new process: a separate thread would make more logical sense. But doing it in the same thread as the sub-engine is run causes resource contention and therefore significant performance degradation. Suggestions for fixing this would be most helpful. Hybrid MPI+multiprocessing has many gotchas, and care must be taken when launching the program especially with a job scheduler. The documentation includes some hints about how the program should be run with a few popular schedulers, and there is also a static helper method for getting an appropriate number of processes for a parallel sub worker. But ultimately it's the responsibility of the end user to configure the engine and start the job properly: the render engine itself does very little of this automatically. 2 new demos have been added showcasing this new render engine: one to render the Raysect logo and one doing a high fidelity, multi-pass adaptive sampling render of the Cornell Box. --- demos/cornell_box_hybrid.py | 213 +++++++++++++++++++++++++++ demos/raysect_logo_hybrid.py | 90 ++++++++++++ raysect/core/workflow.py | 276 +++++++++++++++++++++++++++++++++++ 3 files changed, 579 insertions(+) create mode 100644 demos/cornell_box_hybrid.py create mode 100644 demos/raysect_logo_hybrid.py diff --git a/demos/cornell_box_hybrid.py b/demos/cornell_box_hybrid.py new file mode 100644 index 00000000..9fe547c1 --- /dev/null +++ b/demos/cornell_box_hybrid.py @@ -0,0 +1,213 @@ +""" +Cornell Box MPI Demo +==================== + +This demo renders a variant of the classic Cornell Box scene. + +For the original Cornell Box see: + + http://www.graphics.cornell.edu/online/box/data.html + +The wall colours and light spectrum used in this demo are the values measured +for the physical Cornell Box. + +The MPI (message passing interface) render engine is used for this demo. +To run, use `mpirun -n python cornell_box_mpi.py`. An +MPI library must be installed on your system, and the mpi4py Python +package must be installed in the Python environment. + +There are some small differences between this demo and the shared-memory +equilvalent cornell_box.py, due to details of the MPI implementation meaning +we need to communicate the results of one render pass to all the worker +processes before the next pass. Compare the last section of the two demo +scripts to see how this is done. +""" + +from numpy import array + +from raysect.primitive import Sphere, Box +from raysect.optical import World, Node, translate, rotate, Point3D +from raysect.optical.material import Lambert, UniformSurfaceEmitter +from raysect.optical.library import InterpolatedSF, schott +from raysect.optical.observer import PinholeCamera +from raysect.optical.observer import RGBPipeline2D, BayerPipeline2D, PowerPipeline2D +from raysect.optical.observer import RGBAdaptiveSampler2D +from raysect.core.workflow import HybridEngine, MulticoreEngine + + +# define reflectivity for box surfaces +wavelengths = array( + [400, 404, 408, 412, 416, 420, 424, 428, 432, 436, 440, 444, 448, 452, 456, 460, 464, 468, 472, 476, 480, 484, 488, + 492, 496, 500, 504, 508, 512, 516, 520, 524, 528, 532, 536, 540, 544, 548, 552, 556, 560, 564, 568, 572, 576, 580, + 584, 588, 592, 596, 600, 604, 608, 612, 616, 620, 624, 628, 632, 636, 640, 644, 648, 652, 656, 660, 664, 668, 672, + 676, 680, 684, 688, 692, 696, 700]) + +white = array( + [0.343, 0.445, 0.551, 0.624, 0.665, 0.687, 0.708, 0.723, 0.715, 0.71, 0.745, 0.758, 0.739, 0.767, 0.777, 0.765, + 0.751, 0.745, 0.748, 0.729, 0.745, 0.757, 0.753, 0.75, 0.746, 0.747, 0.735, 0.732, 0.739, 0.734, 0.725, 0.721, + 0.733, 0.725, 0.732, 0.743, 0.744, 0.748, 0.728, 0.716, 0.733, 0.726, 0.713, 0.74, 0.754, 0.764, 0.752, 0.736, + 0.734, 0.741, 0.74, 0.732, 0.745, 0.755, 0.751, 0.744, 0.731, 0.733, 0.744, 0.731, 0.712, 0.708, 0.729, 0.73, + 0.727, 0.707, 0.703, 0.729, 0.75, 0.76, 0.751, 0.739, 0.724, 0.73, 0.74, 0.737]) + +green = array( + [0.092, 0.096, 0.098, 0.097, 0.098, 0.095, 0.095, 0.097, 0.095, 0.094, 0.097, 0.098, 0.096, 0.101, 0.103, 0.104, + 0.107, 0.109, 0.112, 0.115, 0.125, 0.14, 0.16, 0.187, 0.229, 0.285, 0.343, 0.39, 0.435, 0.464, 0.472, 0.476, 0.481, + 0.462, 0.447, 0.441, 0.426, 0.406, 0.373, 0.347, 0.337, 0.314, 0.285, 0.277, 0.266, 0.25, 0.23, 0.207, 0.186, + 0.171, 0.16, 0.148, 0.141, 0.136, 0.13, 0.126, 0.123, 0.121, 0.122, 0.119, 0.114, 0.115, 0.117, 0.117, 0.118, 0.12, + 0.122, 0.128, 0.132, 0.139, 0.144, 0.146, 0.15, 0.152, 0.157, 0.159]) + +red = array( + [0.04, 0.046, 0.048, 0.053, 0.049, 0.05, 0.053, 0.055, 0.057, 0.056, 0.059, 0.057, 0.061, 0.061, 0.06, 0.062, 0.062, + 0.062, 0.061, 0.062, 0.06, 0.059, 0.057, 0.058, 0.058, 0.058, 0.056, 0.055, 0.056, 0.059, 0.057, 0.055, 0.059, + 0.059, 0.058, 0.059, 0.061, 0.061, 0.063, 0.063, 0.067, 0.068, 0.072, 0.08, 0.09, 0.099, 0.124, 0.154, 0.192, + 0.255, 0.287, 0.349, 0.402, 0.443, 0.487, 0.513, 0.558, 0.584, 0.62, 0.606, 0.609, 0.651, 0.612, 0.61, 0.65, 0.638, + 0.627, 0.62, 0.63, 0.628, 0.642, 0.639, 0.657, 0.639, 0.635, 0.642]) + +white_reflectivity = InterpolatedSF(wavelengths, white) +red_reflectivity = InterpolatedSF(wavelengths, red) +green_reflectivity = InterpolatedSF(wavelengths, green) + +# define light spectrum +light_spectrum = InterpolatedSF(array([400, 500, 600, 700]), array([0.0, 8.0, 15.6, 18.4])) + +# set-up scenegraph +world = World() + +# enclosing box +enclosure = Node(world) + +e_back = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, 0, 1) * rotate(0, 0, 0), + material=Lambert(white_reflectivity)) + +e_bottom = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, -1, 0) * rotate(0, -90, 0), + # material=m) + material=Lambert(white_reflectivity)) + +e_top = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(0, 1, 0) * rotate(0, 90, 0), + material=Lambert(white_reflectivity)) + +e_left = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(1, 0, 0) * rotate(-90, 0, 0), + material=Lambert(red_reflectivity)) + +e_right = Box(Point3D(-1, -1, 0), Point3D(1, 1, 0), + parent=enclosure, + transform=translate(-1, 0, 0) * rotate(90, 0, 0), + material=Lambert(green_reflectivity)) + +# ceiling light +light = Box(Point3D(-0.4, -0.4, -0.01), Point3D(0.4, 0.4, 0.0), + parent=enclosure, + transform=translate(0, 1, 0) * rotate(0, 90, 0), + material=UniformSurfaceEmitter(light_spectrum, 2)) + +# alternate light #1 +# light = Box(Point3D(-0.4, -0.4, -0.01), Point3D(0.4, 0.4, 0.0), +# parent=enclosure, +# transform=translate(0, 1, 0) * rotate(0, 90, 0), +# material=UniformSurfaceEmitter(d65_white, 2)) + +# alternate light #2 +# back_light = Sphere(0.1, +# parent=enclosure, +# transform=translate(0.80, -0.85, 0.80)*rotate(0, 0, 0), +# material=UniformSurfaceEmitter(light_spectrum, 10.0)) + +# objects in enclosure +box = Box(Point3D(-0.4, 0, -0.4), Point3D(0.3, 1.4, 0.3), + parent=world, + transform=translate(0.4, -1 + 1e-6, 0.4)*rotate(30, 0, 0), + material=schott("N-BK7")) + +sphere = Sphere(0.4, + parent=world, + transform=translate(-0.4, -0.6 + 1e-6, -0.4)*rotate(0, 0, 0), + material=schott("N-BK7")) + + +filter_red = InterpolatedSF([100, 650, 660, 670, 680, 800], [0, 0, 1, 1, 0, 0]) +filter_green = InterpolatedSF([100, 530, 540, 550, 560, 800], [0, 0, 1, 1, 0, 0]) +filter_blue = InterpolatedSF([100, 480, 490, 500, 510, 800], [0, 0, 1, 1, 0, 0]) + +# create and setup the camera +power_unfiltered = PowerPipeline2D(display_unsaturated_fraction=0.96, name="Unfiltered") +power_unfiltered.display_update_time = 15 + +power_green = PowerPipeline2D(filter=filter_green, display_unsaturated_fraction=0.96, name="Green Filter") +power_green.display_update_time = 15 + +power_red = PowerPipeline2D(filter=filter_red, display_unsaturated_fraction=0.96, name="Red Filter") +power_red.display_update_time = 15 + +rgb = RGBPipeline2D(display_unsaturated_fraction=0.96, name="sRGB") + +bayer = BayerPipeline2D(filter_red, filter_green, filter_blue, display_unsaturated_fraction=0.96, name="Bayer Filter") +bayer.display_update_time = 15 + +pipelines = [rgb, power_unfiltered, power_green, power_red, bayer] + +sampler = RGBAdaptiveSampler2D(rgb, ratio=10, fraction=0.2, min_samples=500, cutoff=0.01) + + +camera = PinholeCamera((1024, 1024), parent=world, transform=translate(0, 0, -3.3) * rotate(0, 0, 0), pipelines=pipelines) +camera.frame_sampler = sampler +camera.spectral_rays = 1 +camera.spectral_bins = 15 +camera.pixel_samples = 250 +camera.ray_importance_sampling = True +camera.ray_important_path_weight = 0.25 +camera.ray_max_depth = 500 +camera.ray_extinction_min_depth = 3 +camera.ray_extinction_prob = 0.01 + +# Speedups for testing purposes. +# Fewer camera pixels for faster runtime per pass. +# camera.pixels = (256, 256) +# Less strict cutoff to finish with fewer passes. +sampler.cutoff = 0.05 + +# Get the available parallelism for this MPI process automatically, +# or hard code it if your scheduler isn't supported. +nworkers = HybridEngine.estimate_subworker_count() +# nworkers = 16 +camera.render_engine = HybridEngine(MulticoreEngine(nworkers)) + +# Don't make plots during the render as MPI is typically run non-interactively. +for pipeline in camera.pipelines: + pipeline.display_progress = False + +# Worker processes do not have accurate statistics, only the root process has +# all the render results. So don't bother outputting statistics on the workers. +if camera.render_engine.rank != 0: + camera.quiet = True + +# start ray tracing. +print(f"Starting ray tracing on rank {camera.render_engine.rank}") +render_pass = 1 +while not camera.render_complete: + if camera.render_engine.rank == 0: + print(f"Rendering pass {render_pass}...") + camera.observe() + # Rank 0 processes the ray tracing results so is the only process which + # knows the true progress of the render. In order to know when the render + # is complete, we need to ensure the frame sampler on each of the worker + # processes has an up-to-date copy of the pipeline used for sampling. + # We also need to use a different variable name for the received broadcast + # object else it is not broadcast properly. + root_rgb = camera.render_engine.comm.bcast(rgb, root=0) + camera.frame_sampler.pipeline = root_rgb + camera.render_engine.comm.Barrier() + render_pass += 1 +print(f"Finished ray tracing on rank {camera.render_engine.rank}") + +# Again, only rank 0 has all the ray tracing results so is the only one which +# can produce a correct image. +if camera.render_engine.rank == 0: + rgb.save('CornellHybrid_rgb.png') diff --git a/demos/raysect_logo_hybrid.py b/demos/raysect_logo_hybrid.py new file mode 100644 index 00000000..a1327dde --- /dev/null +++ b/demos/raysect_logo_hybrid.py @@ -0,0 +1,90 @@ +""" +Renders the raysect logo: a top-down view of 6 coloured slabs. + +MPI (message passing interface) is used to parallelise the workflow. +Running this demo therefore requires an MPI library, such as MPICH or +OpenMPI. The mpi4py Python package must also be installed. + +To run this demo, call `mpirun -n python raysect_logo_mpi.py`. +If a batch job scheduler is used on a cluster, the render may be spread +across multiple nodes of the cluster. If run interactively at the command +line, all rendering will be done on the same machine as the command line. +""" + +from matplotlib.pyplot import * +from numpy import array + +from raysect.primitive import Sphere, Box + +from raysect.optical import World, Node, translate, rotate, Point3D, d65_white, ConstantSF, InterpolatedSF +from raysect.optical.observer import PinholeCamera +from raysect.optical.material.emitter import UniformSurfaceEmitter +from raysect.optical.material.dielectric import Dielectric +from raysect.core.workflow import MulticoreEngine, HybridEngine + + +world = World() + +wavelengths = array([300, 490, 510, 590, 610, 800]) +red_attn = array([0.0, 0.0, 0.0, 0.0, 1.0, 1.0]) * 0.98 +green_attn = array([0.0, 0.0, 1.0, 1.0, 0.0, 0.0]) * 0.85 +blue_attn = array([1.0, 1.0, 0.0, 0.0, 0.0, 0.0]) * 0.98 +yellow_attn = array([0.0, 0.0, 1.0, 1.0, 1.0, 1.0]) * 0.85 +cyan_attn = array([1.0, 1.0, 1.0, 1.0, 0.0, 0.0]) * 0.85 +purple_attn = array([1.0, 1.0, 0.0, 0.0, 1.0, 1.0]) * 0.95 + +red_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, red_attn)) +green_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, green_attn)) +blue_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, blue_attn)) +yellow_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, yellow_attn)) +cyan_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, cyan_attn)) +purple_glass = Dielectric(index=ConstantSF(1.4), transmission=InterpolatedSF(wavelengths, purple_attn)) + +Sphere(1000, world, material=UniformSurfaceEmitter(d65_white, 1.0)) + +node = Node(parent=world, transform=rotate(0, 0, 90)) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 0) * translate(0, 1, -0.500001), red_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 60) * translate(0, 1, -0.500001), yellow_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 120) * translate(0, 1, -0.500001), green_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 180) * translate(0, 1, -0.500001), cyan_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 240) * translate(0, 1, -0.500001), blue_glass) +Box(Point3D(-0.5, 0, -2.5), Point3D(0.5, 0.25, 0.5), node, rotate(0, 0, 300) * translate(0, 1, -0.500001), purple_glass) + +camera = PinholeCamera((256, 256), fov=45, parent=world, transform=translate(0, 0, -6.5) * rotate(0, 0, 0)) + +camera.ray_max_depth = 500 +camera.ray_extinction_prob = 0.01 +camera.pixel_samples = 100 +camera.spectral_rays = 1 +camera.spectral_bins = 21 + +# Work a bit harder across a cluster. +camera.pixels = (512, 512) +camera.pixel_samples = 500 + +# Either hard-code the number of sub-engine workers, or infer it from the run +# time environment if using a supported job scheduler like Slurm or Grid Engine. +# nworkers = 8 +nworkers = HybridEngine.estimate_subworker_count() +camera.render_engine = HybridEngine(MulticoreEngine(nworkers)) +print(f"MPI process {camera.render_engine.name} running with {nworkers} workers.") + +# With the hybrid engine, only the rank 0 process has all the results available +# to generate sampling statistics. So don't output stats on the workers. +if camera.render_engine.rank != 0: + camera.quiet = True + +# Don't try making plots during the render: this will likely fail when run +# through a non-interactive job scheduler. +for pipeline in camera.pipelines: + pipeline.display_progress = False + +camera.observe() + +# Again, only rank 0 has all the results, so this is the only process that +# should produce any output. +if camera.render_engine.rank == 0: + # Comment out if you don't want to save the result to a file: + camera.pipelines[0].save("raysect_logo.png") + # Uncomment if running interactively for a plot of the result: + # camera.pipelines[0].display() diff --git a/raysect/core/workflow.py b/raysect/core/workflow.py index 814372f5..747ec07a 100644 --- a/raysect/core/workflow.py +++ b/raysect/core/workflow.py @@ -29,6 +29,9 @@ from collections import defaultdict from multiprocessing import get_context, cpu_count +import platform +import os +import subprocess from raysect.core.math import random import time @@ -436,6 +439,279 @@ def worker_count(self): return self.size +class HybridEngine(RenderEngine): + """ + Render engine for combined shared and distributed memory systems. + + This render engine requires mpi4py to be installed, and the program + to be run using ``mpirun -n python ``. + + >>> from raysect.core import HybridEngine, MulticoreEngine + >>> from raysect.optical.observer import PinholeCamera + >>> + >>> nworkers = 4 + >>> camera = PinholeCamera((512, 512)) + >>> camera.render_engine = HybridEngine(MulticoreEngine(nworkers)) + + The render engine uses a variation of the single process, multiple + data (SPMD) paradigm. In this paradigm, "tasks" are independent + processes potentially running on separate computers. Each task will + have one or more "workers" which it spawns in order to do its share + of the rendering. Each worker will be on the same computer as its + parent task and should share the parent task's memory. + + When the program is started with `mpirun`, there will be `ntasks` + separate processes, all of which run concurrently and build their + own copies of the scenegraph. Then when this render engine's `run` + method is called in each process, the sub-engine will create + `nworkers` worker subprograms to perform a subset of the render: how + this is done depends entirely on which sub-engine is used. For + example, if the `MulticoreEngine` is used then `nworkers` + subprocesses will be forked from the parent task to perform the + render subset. If the `SerialEngine` is used then the render subset + will be computed in serial in the task's own process. The rendering + results for all workers in all tasks are gathered back to the root + (rank 0) task. + + The SPMD paradigm means there are many copies of the program running + and each copy runs the same instructions, so programs must be + written with this in mind. While all copies build the scene to + render, only one copy (the MPI process with rank 0) actually + receives all the render results and calls the update function. This + means that after an observe, only the rank 0 process contains the + results of the call to the render function for all tasks, and so any + post processing (including plotting or saving images) should only be + done on the rank 0 process. + + Also, any further renders which depend on the results of a previous + render (for example, using adapive samplers) will require + communication of the results from rank 0 to all the other processes: + + >>> pipeline = RGBPipeline2d() + >>> camera.pipelines = [pipeline] + >>> camera.sampler = RGBAdaptivesampler2d(pipeline) + >>> camera.observe() + >>> # Update the sampler in each worker process with the results + >>> # of the previous render for the next pass. + >>> root_pipeline = camera.render_engine.comm.bcast(pipeline, root=0) + >>> camera.sampler.pipeline = root_pipeline + >>> # Now subsequent observes will have the correct statistics. + >>> camera.observe() + + It is the end user's responsibility to ensure that the number of + workers is configured appropriately, and this will strongly depend + on the environment in which the program is launched. Examples of + running the hybrid engine for 3 popular schedulers are given here. + + Slurm: + + $ sbatch -n -c + $ # Within script.sh: + $ mpirun -n --bind-to none + + >>> # Within application.py: + >>> camera.render_engine = HybridEngine(MulticoreEngine() + + PSB/Torque: + + $ qsub -l nodes=:ppn= + $ # Within script.sh: + $ mpirun -n --map-by node --bind-to none + + >>> # Within application.py: + >>> camera.render_engine = HybridEngine(MulticoreEngine() + + Grid engine is more complicated as there is no portable way to + specify the number of slots per node, though there is an environment + variable for the number of separate nodes the job is being run on: + + $ qsub -pe + $ # Within script.sh: + $ mpirun -n $NHOSTS --map-by node --bind-to none + + >>> # Within application.py: + >>> # Find out how many slots are allocated this MPI process's node. + >>> import os, platform, subprocess + >>> host = platform.node() + >>> qstat = subprocess.run(['qstat', '-g', 't'], capture_output=True, + encoding='UTF-8') + >>> job_id = os.getenv("JOB_ID") + >>> # Loop through qstat output until we find our job id on this node. + >>> current_job = "" + >>> NSUB = 0 + >>> for line in qstat.stdout.splitlines(): + >>> fields = line.strip().split() + >>> if len(fields) >= 9: # Start of a node's entry + >>> current_job = fields[0] + >>> current_queue = fields[7] + >>> slot_type = fields[8] + >>> elif len(fields) >= 2: # Additional lines of a node's entry + >>> current_queue = fields[0] + >>> slot_type = fields[1] + >>> else: # Other decorative lines in the output are irrelevant. + >>> continue + >>> if current_job == job_id and host in current_queue and slot_type == "SLAVE": + >>> NSUB += 1 + >>> + >>> camera.render_engine = HybridEngine(MulticoreEngine(NSUB)) + + This class contains some attributes relevant to the MPI environment. + The sub-engine attributes are accessible through the sub-engine + directly. + + :ivar comm: the MPI communicator (``MPI_COMM_WORLD``). + :ivar rank: the mpi rank in the communicator. Only rank 0 contains the + full results after a call ``to RenderEngine.run()``. + :ivar nmpi: the number of MPI processes. + :ivar name: the MPI processor name. + :ivar totalsize: Total number of workers summed over all MPI processes. + """ + def __init__(self, subengine=None): + if not HAVE_MPI: + raise RuntimeError("The mpi4py package is required to use this engine.") + if subengine is None: + subengine = SerialEngine() + self.subengine = subengine + comm = MPI.COMM_WORLD + self.comm = comm + self.rank = comm.Get_rank() + self.nmpi = comm.Get_size() + self.name = MPI.Get_processor_name() + _local_size = subengine.worker_count() + self._all_sizes = comm.allgather(_local_size) + self.total_size = sum(self._all_sizes) + + def run(self, tasks, render, update, + render_args=(), render_kwargs=None, + update_args=(), update_kwargs=None): + # Avoid mutable default arguments. + if render_kwargs is None: + render_kwargs = {} + if update_kwargs is None: + update_kwargs = {} + # All processes must have the same tasks, in the same order, so that + # all processes agree on which subset of tasks each is to work on. + tasks = self.comm.bcast(tasks, root=0) + ntasks = len(tasks) + # Split tasks evenly per individual worker, as different MPI processes + # may have different numbers of workers. Then re-group the tasks into + # batches for each MPI worker. + nworkers = self.total_size + worker_tasks = [[] for _ in range(nworkers)] + for i, task in enumerate(tasks): + worker_tasks[i % nworkers].append(task) + rank_tasks = [] + idx = 0 + for rank in range(self.nmpi): + size = self._all_sizes[rank] + rank_tasks.append(sum(worker_tasks[idx:idx+size], [])) + idx += size + # On rank 0, spawn a separate process to do the rendering while gathering + # results in the main thread. We can't use mpi4py to send results from rank + # 0 to rank 0: here be segfaults. So use a multiprocessing queue instead. + # This does mean we have to gather data differently on rank 0. + # We use the 'fork' context here for memory efficiency, which makes this + # implementation unix-only (and even then, dodgy on MacOS). + if self.rank == 0: + ctx = get_context('fork') + queue = ctx.SimpleQueue() + worker = ctx.Process( + target=self.subengine.run, + kwargs=dict( + tasks=rank_tasks[self.rank], + render=render, render_args=render_args, render_kwargs=render_kwargs, + update=queue.put, update_args=(), update_kwargs={}, + ) + ) + worker.start() + remaining = ntasks + local_remaining = len(rank_tasks[self.rank]) + remote_remaining = remaining - local_remaining + while remaining: + # Data may come in either from the local queue or from remote MPI. + if local_remaining and not queue.empty(): + result = queue.get() + local_remaining -= 1 + elif remote_remaining: + result = self.comm.recv() + remote_remaining -= 1 + else: # Nothing local or remote yet. Try again. + continue + if isinstance(result, Exception): + raise result + update(result, *update_args, **update_kwargs) + remaining = local_remaining + remote_remaining + worker.join() + else: + # All other ranks render the subset of tasks assigned to them and + # send the results through MPI to rank 0 for collating. + self.subengine.run( + tasks=rank_tasks[self.rank], + render=render, render_args=render_args, render_kwargs=render_kwargs, + update=self.comm.send, update_args=(0,), update_kwargs={}, + ) + self.comm.Barrier() + + def worker_count(self): + return self.totalsize + + @staticmethod + def estimate_subworker_count(scheduler=None): + """ + Estimate the number of processes the sub-worker should use. + + This routine should be called in processes launched from + job schedulers. It will use environment variables exported + by the job scheduler to work out the degree of parallelism + available to the sub-worker. + + Currently, Slurm and Grid Engine are supported. For other + schedulers it is the user's responsibility to work out (or hard + code) the sub-worker count. + + :param scheduler: the name of the scheduler in use. + One of ("slurm", "gridengine") + :return: The number of sub-workers available in this MPI task. + """ + if scheduler is None: + # Try to work out which scheduler is in use. + if "SLURM_JOB_ID" in os.environ: + scheduler = "slurm" + elif "SGE_ROOT" in os.environ: + scheduler = "gridengine" + else: + raise RuntimeError("Can't find a supported scheduler.") + scheduler = scheduler.lower() + if scheduler == "slurm": + nsub = os.getenv("SLURM_CPUS_PER_TASK", 1) + elif scheduler == "gridengine": + # Parse qstat output to get the number of SLAVE slots + # allocated to this node. + node = platform.node() + qstat = subprocess.run(['qstat', '-g', 't'], capture_output=True, + encoding='UTF-8') + job_id = os.environ["JOB_ID"] + # Loop through qstat output until we find our job id on this node. + current_job = "" + nsub = 0 + for line in qstat.stdout.splitlines(): + fields = line.strip().split() + if len(fields) >= 9: # Start of a node's entry + current_job = fields[0] + current_queue = fields[7] + slot_type = fields[8] + elif len(fields) >= 2: # Additional lines of a node's entry + current_queue = fields[0] + slot_type = fields[1] + else: # Other decorative lines in the output are irrelevant. + continue + if current_job == job_id and node in current_queue and slot_type == "SLAVE": + nsub += 1 + else: + raise RuntimeError(f"{scheduler} is not supported by this function.") + return nsub + + if __name__ == '__main__':