Skip to content

Conversation

maarzt
Copy link
Contributor

@maarzt maarzt commented Sep 19, 2019

This is the realization of an idea how to organize imglib2 related multi-threaded code. I previously discussed it with @tpietzsch, @hanslovsky, @axtimwalde. Your thoughts are very welcome.

The two concepts in this PR, are the interface TaskExecutor and the class Parallelization:

TaskExecutor

TaskExecutor actually is an interface not a class. It's similar to ExecutorService but offers simpler methods. It's better suited to image processing algorithms. The following example uses TaskExecutor to fill an image with all ones in a multi-threaded way:

public void fillImageWithOnes( final RandomAccessibleInterval< IntType > image,
                               final TaskExecutor taskExecutor )
{
    int numTasks = taskExecutor.suggestNumberOfTasks();
    List< RandomAccessibleInterval< IntType > > chunks =
        splitImageIntoChunks( image, numTasks );

    // The TaskExecutor executes the forEach method in multi threads, if requested.
    taskExecutor.forEach( chunks, chunk -> {
        for ( IntType pixel : Views.iterable( chunk ) )
            pixel.setOne();
    } );
}

This method can be called by fillImageWithOnes( image, TaskExecutors.multiThreaded() ). And it's possible to run it single threaded by fillImageWithOnes( image, TaskExecutors.singleThreaded() ).

TaskExecutor.singleThreaded() is very lightweight. It requires now resources, threads, or memory allocation, and can therefor also be used in very tight loops. When single threaded taskExecutor.suggestNumberOfTasks() returns 1, and taskExecutor.forEach(...) is just a simple for loop. This means that splitImageIntoChunks( image, 1 ) can just return the original image, and as a consequence fillImageWithOnes will run with optimal performance, in both modes: single- and multi-threaded.

Lets imagine, what the method would look like, if ExecutorService is used:

  1. It would need to guess how many chunks are usefull, maybe take:
    Runtime.getRuntime().availableProcessors() * 4.
  2. A list of chunks need to be created.
  3. For each chunk we need to generate a Callable<Void>.
  4. Call Future<Void> futeres = executorService.invokeAll(callables).
  5. Iterate over all futures, and call `future.get()'.
  6. Handle ExecutionException and `ItenterruptedException'.

The ExecutorService based function is way more complicated. To run it single threaded one would use Executors.newSingleThreadedExecutor(), which has a big overhead.

Parallelization

In ImgLib2 it's common that a multi-threaded algorithm has some additional parameters, ExecutorService, numThreads or numberOfTask or combinations of these. This means the user needs to specify an ExecutorService, or number of threads. But most time a user wouldn't want to care about that, and just run the method quickly.

I suggest the following approach:

// If you don't care, just run:
fillImageWithOnes( image );

// Single Threaded call
Parallelization.runSingleThreaded( () -> fillImageWithOnes( image ) );

// Multi Threaded call
Parallelization.runMultiThreaded( () -> fillImageWithOnes( image ) );

// Call with a specific ExecutorService
Parallelization.runWithExecutor( executorService, () -> fillImageWithOnes( image ) );

// Call with the sepecified number of threads
Parallelization.withNumThreads( numThreads, () -> fillImageWithOnes( image ) );

// Example Algorithm, that fills an image with ones.
public void fillImageWithOnes( RandomAccessibleInterval< IntType > image )
{
    TaskExecutor taskExecutor = Parallelization.getTaskExecutor();
    int numTasks = taskExecutor.suggestNumberOfTasks();
    List< RandomAccessibleInterval< IntType > > chunks = splitImageIntoChunks( image, numTasks );

    // The TaskExecutor executes the forEach method in multi threads, if requested.
    taskExecutor.forEach( chunks, chunk -> {
        for ( IntType pixel : Views.iterable( chunk ) )
            pixel.setOne();
    } );
}

The class Parallelization associates a TaskExecutor with each thread. (ThreadLocal<TaskExecutor> is used internally.) Parallelization.getTaskExecutor() returns the task executor for the current task. Parallelization.runSingleThreaded( task ) first sets the task executor of the current thread to TaskExecutor.singleThreaded(), then executes the task, and finally restores the current threads task executor to it's original value. The other methods runMultiThreaded and runWithExecutor work similarly.

This approach has several advantage:

  1. The TaskExecutor doesn't need to be passed around as a parameter.
  2. There is a reasonable default value. Which will use the common ForkJoinPool.
  3. It makes sense to associate, a TaskExecutor setting with a thread. The threads in a fixed thread pool ('Executors.newFixedThreadPool()) shouldn't do nested parallelization. That's way these threads need their task executors be set to TaskExecutor.singleThreaded(). (This is done for you just use TaskExecutors.fixedThreadPool()`).

LoopBuilder & ImgLib2-Algorithm

I used this approach in LoopBuilder. Let's write the fillImageWithOnes method once more:

public void fillImageWithOnes( RandomAccessibleInterval< IntType > image )
{
    LoopBuilder.setImages( image ).multithreaded().forEachPixel( pixel -> pixel.setOne() );
}

It's possible to execute this method single, and multi-threaded with the Parallelization.runSingleThreaded(), etc.

I used the presented approach, in imglib2-algorithm. I will put a link below.

Backwards compatibility

There are a whole bunch of methods, that ensure compatibility with the other approaches:

executorService = taskExecutor.getExecutorService(); // TaskExecutor -> ExecutorService
taskExecutor = TaskExecutors.forExecutorService(executorService); // ExecutorService -> TaskExecutor
numThreads = taskExecutr.getParallelism(); // TaskExector -> numThreads
taskExecutor = TaskExecutors.numThreads(numThreads); // numTreads -> taskExecutor
taskExecutor = TaskExecutors.forExecutorServiceAndNumTasks(executorService, numTasks); // ExecutorService & numTasks -> TaskExecutor

TaskExecutor should be used in image processing algorithms to
realize parallization instead of ExecutorService.
TaskExecutor has a single threaded implementation. That runs
with no parallelization overheaded, and it is easier to use
than ExecutorService.

The Parallelization class allows to set one TaskExecutor per thread.
This means that algorithms don't need to have a TaskExecutor argument.
Instead an algorithm can be called like this:

Parallelization.runSingleThreaded( () -> myAlgorithm() );

or

Parallelization.runMultiThreaded( () -> myAlgoritm() );
@hanslovsky
Copy link
Member

@maarzt this looks good, great work! I added a few comments to the code. Additionally, I think that a method

Optional<int[]> TaskExecutor.suggestChunkSize(Dimensions dimensions)

would be super useful if a certain block/chunk size is required, e.g. when writing to N5 in myAlgorithm. We should probably discuss here, if chunk size should be a parameter of myAlgorithm or a method in the TaskExecutor interface.

@ctrueden
Copy link
Member

@maarzt Thanks a lot for working on this!
@gselzer You may be interested in this as well.

@maarzt
Copy link
Contributor Author

maarzt commented Sep 20, 2019

@hanslovsky Thank you for reviewing this so quickly! ❤️

Please elaborate a little more on this suggestChunkSize(...) method. How would it work?
Could this be a static method in another class Chunker.suggestChunkSize( Dimensions size, int numTasks )?

@gselzer Do I remember correctly, that you are working on Ops? I am really curios, if this PR works well together with Ops. Could it be used for parallelization there?

@tpietzsch
Copy link
Member

@maarzt very nice! Besides minor issues/questions (see inline comments) I like net.imglib2.parallel a lot. Because it is completely independent of other imglib2 stuff, we could consider moving it into a common scijava-parallel artefact (which would have no dependencies)? Opinions? (@ctrueden: Do you think this would be useful anywhere upstream of where imglib2 comes in?)

LoopBuilder changes are also fine.

@hanslovsky
Copy link
Member

Please elaborate a little more on this suggestChunkSize(...) method. How would it work?
Could this be a static method in another class Chunker.suggestChunkSize( Dimensions size, int numTasks )?

In many of our tasks, the way we parallelize is ultimately determined by the block/chunk/cell size of the data that we write out, eventually. In particular, that means that we need to provide a blockSize to the method and the data is processed in chunks of (multiples of) blockSize. This blockSize can now be either

  1. be passed as explicit parameter to the method
  2. be provided by the TaskExecutor object.

It may be preferable to always pass it as explicit parameter to the method anyway, so callers are aware that they need to set this parameter explicitly, instead of falling back to a default value (if any) that does make sense for the number of tasks, but not for the block/cell/chunk size.

This may also be more of a concern for distributed memory computations.

@hanslovsky
Copy link
Member

This probably makes the ParallelizeOverBlocks class in imglib2-algorihtm obsolete

@ctrueden
Copy link
Member

Do you think this would be useful anywhere upstream of where imglib2 comes in?

Yeah, maybe the scijava-ops framework should make use of it. Hopefully I'll have time to do some experiments before the hackathon is over.

@gselzer
Copy link
Contributor

gselzer commented Sep 23, 2019

@maarzt sorry for the delayed answer to your question, but yeah, I think this could be super useful in Ops. As scijava/scijava#31 suggests we have no good multithreading solution as of this new iteration of ops; I hacked together the thread.chunker op to work for the time being but that was designed as a temporary fix. I am hoping to get some time to check all of this out after you guys finish it all up. @ctrueden we could probably implement this across many (all?) of our lifting transformations, no?

@ctrueden
Copy link
Member

@maarzt @tpietzsch Can this PR please be resolved and merged soon? I need it for scijava-ops, as well as for pyimagej. This work hopefully solves #237.

maarzt added a commit that referenced this pull request Jan 28, 2020
@maarzt maarzt merged commit e27654f into master Jan 28, 2020
@ctrueden ctrueden deleted the parallel branch January 28, 2020 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants