Class ProcessingUnit

  • All Implemented Interfaces:
    Runnable

    public class ProcessingUnit
    extends Thread
    This component executes the processing pipeline. Running in a seperate thread it continuously reads bundles of Cas from the Work Queue filled by ArtifactProducer and sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of Cas Processor listing in the cpe descriptor. The results of analysis produced be Cas Processors is enqueued onto an output queue that is shared with Cas Consumers.
    • Field Detail

      • threadState

        public int threadState
      • releaseCAS

        protected boolean releaseCAS
      • processingUnitProcessTrace

        protected ProcessTrace processingUnitProcessTrace
      • processContainers

        protected LinkedList processContainers
      • numToProcess

        protected long numToProcess
      • casList

        protected CAS[] casList
      • notifyListeners

        protected boolean notifyListeners
      • conversionCas

        protected CAS conversionCas
      • artifact

        protected Object[] artifact
      • conversionCasArray

        protected CAS[] conversionCasArray
      • threadId

        protected String threadId
      • timer01

        public long timer01
      • timer02

        public long timer02
      • timer03

        public long timer03
      • timer04

        public long timer04
      • timer05

        public long timer05
      • timer06

        public long timer06
    • Constructor Detail

      • ProcessingUnit

        public ProcessingUnit()
      • ProcessingUnit

        public ProcessingUnit​(CPMEngine acpm,
                              BoundedWorkQueue aInputQueue,
                              BoundedWorkQueue aOutputQueue)
        Initialize the PU
        Parameters:
        acpm - - component managing life cycle of the CPE
        aInputQueue - - queue to read from
        aOutputQueue - - queue to write to
      • ProcessingUnit

        public ProcessingUnit​(CPMEngine acpm)
    • Method Detail

      • isRunning

        public boolean isRunning()
        Returns true if this component is in running state.
        Returns:
        - true if running, false otherwise
      • setCasConsumerPipelineIdentity

        public void setCasConsumerPipelineIdentity()
        Define a CasConsumer Pipeline identity for this instance
      • isCasConsumerPipeline

        public boolean isCasConsumerPipeline()
      • setInputQueue

        public void setInputQueue​(BoundedWorkQueue aInputQueue)
        Alternative method of providing a queue from which this PU will read bundle of Cas
        Parameters:
        aInputQueue - - read queue
      • setOutputQueue

        public void setOutputQueue​(BoundedWorkQueue aOutputQueue)
        Alternative method of providing a queue where this PU will deposit results of analysis
        Parameters:
        aOutputQueue - - queue to write to
      • setCPMEngine

        public void setCPMEngine​(CPMEngine acpm)
        Alternative method of providing the reference to the component managing the lifecycle of the CPE
        Parameters:
        acpm - - reference to the contrlling engine
      • cleanup

        public void cleanup()
        Null out fields of this object. Call this only when this object is no longer needed.
      • setNotifyListeners

        public void setNotifyListeners​(boolean aDoNotify)
        Set a flag indicating if notifications should be made via configured Listeners
        Parameters:
        aDoNotify - - true if notification is required, false otherwise
      • getCallbackListeners

        public ArrayList getCallbackListeners()
        Returns list of listeners used by this PU for callbacks.
        Returns:
        - lif of BaseStatusCallbackListener instances
      • removeStatusCallbackListener

        public void removeStatusCallbackListener​(BaseStatusCallbackListener aListener)
        Removes given listener from the list of listeners
        Parameters:
        aListener - - object to remove from the list
      • setProcessingUnitProcessTrace

        public void setProcessingUnitProcessTrace​(ProcessTrace aProcessingUnitProcessTrace)
        Plugs in ProcessTrace object used to collect statistics
        Parameters:
        aProcessingUnitProcessTrace - - object to compile stats
      • setUimaTimer

        public void setUimaTimer​(UimaTimer aTimer)
        Plugs in custom timer used by the PU for getting time
        Parameters:
        aTimer - - custom timer to use
      • setContainers

        public void setContainers​(LinkedList processorList)
        Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.
        Parameters:
        processorList - CASProcessor to be added to the processing pipeline
      • disableCasProcessor

        public void disableCasProcessor​(int aCasProcessorIndex)
        Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.
        Parameters:
        aCasProcessorIndex - - location in the pipeline of the Cas Processor to delete
      • disableCasProcessor

        public void disableCasProcessor​(String aCasProcessorName)
        Alternative method to disable Cas Processor. Uses a name to locate it.
        Parameters:
        aCasProcessorName - - a name of the Cas Processor to disable
      • enableCasProcessor

        public void enableCasProcessor​(String aCasProcessorName)
        Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.
        Parameters:
        aCasProcessorName - - name of the Cas Processor to enable
      • run

        public void run()
        Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.
        Specified by:
        run in interface Runnable
        Overrides:
        run in class Thread
      • consumeQueue

        public boolean consumeQueue()
        Consumes the input queue to make sure all bundles still there get processede before CPE terminates.
      • notifyListeners

        protected void notifyListeners​(Object aCas,
                                       boolean isCasObject,
                                       EntityProcessStatus aEntityProcStatus)
        Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es
        Parameters:
        aCas - - object containing an array of OR a single instance of Cas
        isCasObject - - true if instance of Cas is of type Cas, false otherwise
        aEntityProcStatus - - status object that may contain exceptions and trace
      • doNotifyListeners

        protected void doNotifyListeners​(Object aCas,
                                         boolean isCasObject,
                                         EntityProcessStatus aEntityProcStatus)
        Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.
        Parameters:
        aCas - - Cas to pass to listener
        isCasObject - - true is Cas is of type CAS
        aEntityProcStatus - - status object containing exceptions and trace info
      • setReleaseCASFlag

        public void setReleaseCASFlag​(boolean aFlag)
        Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.
        Parameters:
        aFlag - - true if this thread should release a CAS when analysis is complete
      • stopCasProcessors

        public void stopCasProcessors​(boolean kill)
        Stops all Cas Processors that are part of this PU.
        Parameters:
        kill - - true if CPE has been stopped before finishing processing during external stop
      • endOfProcessingReached

        protected boolean endOfProcessingReached​(long aCount)
        Returns true if the CPM has finished analyzing the collection.
        Parameters:
        aCount - - running total of documents processed so far
        Returns:
        - true if CPM has processed all docs, false otherwise
      • process

        protected void process​(Object anArtifact)
        Parameters:
        anArtifact -
      • showMetadata

        protected void showMetadata​(Object[] aCasList)
        Parameters:
        aCasList -
      • isProcessorReady

        protected boolean isProcessorReady​(int aStatus)
        Check if the CASProcessor status is available for processing
      • getBytes

        protected long getBytes​(Object aCas)
        Returns the size of the CAS object. Currently only CASData is supported.
        Parameters:
        aCas - - Cas to get the size for
        Returns:
        the size of the CAS object. Currently only CASData is supported.
      • setCasPool

        public void setCasPool​(CPECasPool aPool)
        Parameters:
        aPool -
      • analyze

        protected boolean analyze​(Object[] aCasObjectList,
                                  ProcessTrace pTrTemp)
                           throws Exception
        An alternate processing loop designed for the single-threaded CPM.
        Parameters:
        aCasObjectList - - a list of CASes to analyze
        pTrTemp - - process trace where statistics are added during analysis
        Throws:
        Exception