The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Parallel::TaskExecutor

SYNOPSIS

Cross-platform executor for parallel tasks executed in forked processes.

  my $executor = Parallel::TaskExecutor->new();
  my $task = $executor->run(sub { return 'foo' });
  $task->wait();
  is($task->data(), 'foo');

DESCRIPTION

This module provides a simple interface to run Perl code in forked processes and receive the result of their processing. This is quite similar to Parallel::ForkManager with a different OO approach, more centered on the task object that can be seen as a very lightweight promise.

Note that this module uses Log::Any::Simple for its logging. So you can use any Log::Any::Adapter to consume its log. For example, put the following in your main application file:

  use Log::Any::Adapter ('Stderr');

In addition, when testing a module that uses Parallel::TaskExecutor, if you’re using Test2, you should add the following line at the beginning of each of your tests to initialize the multi-process feature of the test framework:

  use Test2::IPC;

METHODS

constructor

  my $executor = Parallel::TaskExecutor->new(%options);

Create a new executor. The main possible option is:

  • max_parallel_tasks (default = 4): how many different sub-processes can be created in total by this object instance.

But all the options that can be passed to run() can also be passed to new() and they will apply to all the calls to this object.

destructor

When a Parallel::TaskExecutor goes out of scope, its destructor will wait for all the tasks that it started and for which the returned task object is not live. This is a complement to the destructor of Parallel::TaskExecutor::Task which waits for a task to be done if its parent executor is no longer live.

default_executor()

  my $executor = default_executor();

Returns a default Parallel::TaskExecutor object with an unspecified parallelism (guaranteed to be more than 1 parallel tasks).

run()

  my $task = $executor->run($sub, %options);

Fork a new child process and use it to execute the given $sub. The execution can be tracked using the returned $task object of type Parallel::TaskManager::Task.

If there are already max_parallel_tasks tasks running, then the call will block until the count of running tasks goes below that limit.

The possible options are the following:

  • SIG (hash-reference): if provided, this specifies a set of signal handlers to be set in the child process. These signal handler are installed before the provided $sub is called and before the call to run() returns.

  • wait: if set to a true value, the call to run will wait for the task to be complete before returning (this means that $task-done()> will always be true when you get the task).

  • catch_error: by default, a failure of a child task will abort the parent process. If this option is set to true, the failure will be reported by the task instead.

  • scalar: when set to true, the $sub is called in scalar context. Otherwise it is called in list context.

  • forced: if set to true, the task will be run immediately, even if this means exceeding the value for the max_parallel_tasks passed to the constructor. Note however that the task will still increase by one the number of running tasks tracked by the executor (unless untracked is also set to true).

  • untracked: if set to true, the task will not increase the number of running task counted by the executor. However, the call to run() might still be blocked if the number of outstanding tasks exceeds max_parallel_tasks (unless forced is set to true too).

run_now()

  my $data = $executor->run_now($sub, %options);

Runs the given $sub in a forked process and waits for its result. This never blocks (the $sub is run even if the executor max parallelism is already reached) and this does not increase the counted parallelism of the executor either (in effect the untracked, forced, and wait options are set to true).

In addition, the scalar option is set to true if this method is called in scalar context, unless that option was explicitly passed to the run_now() call.

wait()

  $executor->wait();

Waits for all the outstanding tasks to terminate. This waits for all the tasks independently of whether their Parallel::TaskExecutor::Task object is still live.

set_max_parallel_tasks()

  $executor->set_max_parallel_tasks(N)

Sets the max_parallel_tasks option of the executor.

CAVEATS AND TODOS

  • The data returned by a child task can only have a limited size (4kB as of writing this). In a future release, we may switch to using temporary files to pass the result when this limit is reached.

  • There is currently no support to setup uni or bi-directional communication channel with the child task. This must be done manually by the user.

AUTHOR

This program has been written by Mathias Kende.

LICENSE

Copyright 2024 Mathias Kende

This program is distributed under the MIT (X11) License: http://www.opensource.org/licenses/mit-license.php

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

SEE ALSO

AnyEvent
IPC::Run
Parallel::ForkManager
Promise::XS