The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

POEx::WorkerPool::Role::WorkerPool::Worker - A role that provides common semantics for Workers

VERSION

version 0.092800

ATTRIBUTES

job_classes is: ro, isa: ArrayRef[ClassName], required: 1

In order for the serializer on the other side of the process boundary to rebless jobs on the other side, it needs to make sure those classes are loaded.

This attribute is used to indicate which classes need to be loaded.

uuid is: ro, isa: Str

This attribute holds the basis for the aliases for the Worker and its PubSub. It defaults to Data::UUID->new()->create_str()

pubsub_alias is: ro, isa: Str

This is holds the alias to the associated PubSub component for this Worker

status is: rw, isa: Bool

status indicates whether the Worker is currently processing its queue.

_in_process is: rw, isa: DoesJob

This private attribute holds the currently processing Job

_completed_jobs is: rw, isa: ScalarRef

This private attribute is a counter to the number of completed jobs this current processing cycle.

_failed_jobs is: rw, isa: ScalarRef

This private attribute is a counter to the number of failed jobs this current processing cycle.

metaclass: Collection::Array, is: ro, isa: ArrayRef[DoesJob]

This is the FIFO queue of jobs this worker is responsible for processing.

The following provides are defined:

    {
        push    => '_enqueue_job',
        shift   => '_dequeue_job',
        count   => 'count_jobs',
    }

max_jobs is: ro, isa: Int

This determines the fill mark for the job queue.

child_wheel is: ro, isa: Wheel

child_wheel holds this Worker's POE::Wheel::Run instance

METHODS

is_[not_]active

These are convinence methods for checking the status of the Worker

guts_error_handler(Str $op, Int $error_num, Str $error_str, WheelID $id, Str $handle_name) is Event

guts_error_handler is the handler given to POE::Wheel::Run to handle errors that may crop up during operation of the wheel. It will post the arguments via +PXPW_WORKER_CHILD_ERROR using PubSub.

Subscribers will need the following signature:

    method handler
    (
        SessionID :$worker_id, 
        Str :$operation, 
        Int :$error_number, 
        Str :$error_string, 
        WheelID :$wheel_id, 
        Str :$handle_name
    ) is Event

This method will then issue a SIGTERM signal to the child process forcing it to rebuild after exiting

guts_exited(Str $chld, Int $pid, Int $exit_val) is Event

This is the SIGCHLD handler for the child process. It will post the arguments via +PXWP_WORKER_CHILD_EXIT using PubSub.

Subscribers will need to have the following signature:

    method handler
    (
        SessionID :$worker_id, 
        Int :$process_id, 
        Int :$exit_value
    ) is Event

The wheel will then cleared

after _start is Event

_start is advised to create a new PubSub component specific for this Worker and publish all of the various events that the Worker can fire.

after _stop is Event

_stop is advised to terminate the associated PubSub component by calling its destroy event.

enqueue_job(DoesJob $job)

enqueue_job takes an object with the Job role and places it into the queue after a few basic checks, such as if the Worker is currently processing or if the job queue has met the max_jobs limitation. If either case is true, an EnquueuError is thrown.

This method fires +PXWP_JOB_ENQUEUED to the associated PubSub component on success.

Subscribers will need to have the following signature:

    method handler (SessionID :$worker_id, DoesJob $job ) is Event

enqueue_jobs(ArrayRef[DoesJob] $jobs)

enqueue_jobs does the same thing as enqueue_job, but it acts on an array of jobs. Each job successfully enqueued means the worker will fire the +PXWP_JOB_ENQUEUED event via PubSub.

start_processing

start_processing kicks the Worker into gear and prevents adding jobs until the crrent queue has been processed. If there are no jobs in the queue, StartError will be thrown. This method fires the +PXWP_START_PROCESSING event via PubSub.

Subscribers should have the following signature:

    method handler (SessionID :$worker_id, Int :$count_jobs)

_process_queue is Event

This private event is the queue processor. As jobs are dequeued for processing, +PXWP_JOB_DEQUEUED will be fired via PubSub. Subscribers will need the following signature:

    method handler(SessionID :$worker_id, DoesJob :$job) is Event

Once the queue has been depleted +PXWP_STOP_PROCESSING will be fired via PubSub. Subscribers will need the following signature:

    method handler
    (
        SessionID :$worker_id,
        Int :$completed_jobs,
        Int :$failed_jobs
    ) is Event

Then the run stats will be cleared, and the status will be toggled so that the Worker may again accept jobs.

_process_job(DoesJob $job) is Event

This private event takes the given job and feeds it to the child process to be processed. If the child process doesn't exist for whatever reason, +PXWP_WORKER_INTERNAL_ERROR will be fired via PubSub. Subscribers need the following signature:

    method handler(SessionID :$worker_id, Ref :$msg)

This event also places the given job into the _in_process attribute.

guts_output(JobStatus $job_status) is Event

This is the StdoutEvent for the child POE::Wheel::Run. It handles all of the child output which is in the form of JobStatus hashrefs. The following describes the potential events from the child and the actions taken

    Type: 
        +PXWP_JOB_COMPLETE

    Action: 
        _in_process is cleared and _completed_jobs for this session is
        incremented. yields() to _process_queue.

    PubSub Event:
        +PXWP_JOB_COMPLETE

    PubSub Signature:
        method handler(SessionID :$worker_id, DoesJob :$job, Ref :$msg)

    Notes:
        The :$msg argument will contain the output from the Job's execution

    Type: 
        +PXWP_JOB_PROGRESS

    Action: 
        PubSub event posted.

    PubSub Event:
        +PXWP_JOB_PROGRESS

    PubSub Signature:
        method handler
        (
            SessionID :$worker_id, 
            DoesJob :$job, 
            Int :$percent_complete,
            Ref :$msg,
        )

    Notes:
        The :$msg argument will contain the output from the last step executed
        for multi-step jobs

    Type: 
        +PXWP_JOB_FAILED

    Action: 
        _in_process is cleared and _failed_jobs for this session is
        incremented. yields() to _process_queue.

    PubSub Event:
        +PXWP_JOB_FAILED

    PubSub Signature:
        method handler(SessionID :$worker_id, DoesJob :$job, Ref :$msg)

    Notes:
        The :$msg argument will contain the exception generated from the Job

    Type: 
        +PXWP_JOB_START

    Action: 
        PubSub event posted.

    PubSub Event:
        +PXWP_JOB_START

    PubSub Signature:
        method handler
        (
            SessionID :$worker_id, 
            DoesJob :$job, 
        )

    Notes:
        This is an indication that the child process received the Job and is
        beginning execution.

    

halt is Event

halt will destroy the child process, and unset the associated alias ensuring that the Session will stop

die_signal_handler(Str $sig, HashRef) is Event

This is the handler for any exceptions that are thrown. All exceptions will be relayed to the consumer via a PubSub published event: +PXWP_WORKER_ERROR

Subscribers will need the following signature:

    method handler(SessionID :$worker_id, IsaError :$exception, HashRef :$original) is Event

Note that $original will be what was given to us from POE

AUTHOR

  Nicholas Perez <nperez@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2009 by Infinity Interactive.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.