Class CPMEngine
- java.lang.Object
-
- java.lang.Thread
-
- org.apache.uima.collection.impl.cpm.engine.CPMEngine
-
- All Implemented Interfaces:
Runnable
public class CPMEngine extends Thread
Responsible for creating and initializing processing threads. This instance manages the lifecycle of the CPE components. It exposes API for plugging in components programmatically instead of declaratively. Running in its own thread, this components creates seperate Processing Pipelines for Analysis Engines and Cas Consumers, launches configured CollectionReader and attaches all of those components to form a pipeline from source to sink. The Collection Reader feeds Processing Threads containing Analysis Engines, and Analysis Engines feed results of analysis to Cas Consumers.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class java.lang.Thread
Thread.State, Thread.UncaughtExceptionHandler
-
-
Field Summary
Fields Modifier and Type Field Description CPECasPool
casPool
protected boolean
isRunning
protected boolean
killed
Object
lockForPause
protected BoundedWorkQueue
outputQueue
protected boolean
pause
protected ProcessingUnit[]
processingUnits
protected boolean
stopped
protected BoundedWorkQueue
workQueue
-
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
-
-
Constructor Summary
Constructors Constructor Description CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData)
Initializes Collection Processing Engine.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
addCasProcessor(CasProcessor aCasProcessor)
Adds a CASProcessor to the processing pipeline.void
addCasProcessor(CasProcessor aCasProcessor, int aIndex)
Adds a CASProcessor to the processing pipeline at a given place in the processing pipelinevoid
addStatusCallbackListener(BaseStatusCallbackListener aListener)
void
asynchStop()
Deprecated.static void
callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps)
Internal use only, public for crss package access.void
cleanup()
Null out fields of this object.void
deployCasProcessors()
Starts CASProcessor containers one a time.void
disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipelinevoid
disableCasProcessor(String aCasProcessorName)
Disable a CASProcessor in the processing pipelineboolean
dropCasOnException()
void
enableCasProcessor(String aCasProcessorName)
Disable a CASProcessor in the processing pipelineLinkedList
getAllProcessingContainers()
Returns a list of All Processing Containers.ArrayList
getCallbackListeners()
Returns a list of ALL callback listeners currently registered with the CPMCasProcessor[]
getCasProcessors()
Returns all CASProcesors in the processing pipelineprotected CpeConfiguration
getCpeConfig()
String
getLastDocRepository()
String
getLastProcessedDocId()
Returns Id of the last document processedProperties
getPerformanceTuningSettings()
int
getPoolSize()
LinkedList
getProcessingContainers()
Returns a list of Processing Containers for Analysis Engines.Progress[]
getProgress()
Returns collectionReader progress.Map
getStats()
Returns CPE statsint
getThreadCount()
Returns number of processing threadsvoid
invalidateCASes(CAS[] aCASList)
boolean
isHardKilled()
Returns if the CPE was killed hard.boolean
isKilled()
Returns true if this engine has been killedboolean
isParallizable(CasProcessor aProcessor, String aCpName)
Determines if a given Cas Processor is parallelizable.boolean
isPaused()
Returns a global flag indicating if this Thread is in pause stateboolean
isPauseOnException()
Returns if the CPM should pause when exception occursboolean
isRunning()
Returns a global flag indicating if this Thread is in processing statevoid
killIt()
Kill CPM the hard way.void
pauseIt()
Pauses this threadvoid
pipelineKilled(String aPipelineThreadName)
Callback method used to notify the engine when a processing pipeline is killed due to excessive errors.void
redeployAnalysisEngine(ProcessingContainer aProcessingContainer)
Deploys CasProcessor and associates it with aProcessingContainer
void
releaseCASes(CAS[] aCASList)
Releases given cases back to pool.void
removeCasProcessor(int aCasProcessorIndex)
Removes a CASProcessor from the processing pipelinevoid
removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Unregisters given listener from the CPMvoid
resumeIt()
Resumes this threadvoid
run()
Using given configuration creates and starts CPE processing pipeline.void
runSingleThreaded()
Runs the CPE in a single thread without queues.void
setCollectionReader(BaseCollectionReader aCollectionReader)
Sets CollectionReader to use during processingvoid
setConcurrentThreadSize(int aConcurrentThreadSize)
Defines number of threads executing the processing pipeline concurrently.void
setInputQueueSize(int aInputQueueSize)
Defines the size of inputQueue.void
setNumToProcess(long aNumToProcess)
Defines the size of the batchvoid
setOutputQueueSize(int aOutputQueueSize)
Defines the size of outputQueue.void
setPauseOnException(boolean aPause)
Sets a global flag to indicate to the CPM that it should pause whenever exception occursvoid
setPerformanceTuningSettings(Properties aPerformanceTuningSettings)
Overrides the default performance tuning settings for this CPE.void
setPoolSize(int aPoolSize)
Defines the size of Cas Pool.void
setProcessControllerAdapter(ProcessControllerAdapter aPca)
void
setStats(Map aMap)
Plugs in a map where the engine stores perfomance info at runtimevoid
stopCasProcessors(boolean kill)
Stops All Cas Processors and optionally changes the status according to kill flagvoid
stopIt()
Stops execution of the Processing Pipeline and this thread.-
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, suspend, toString, yield
-
-
-
-
Field Detail
-
casPool
public CPECasPool casPool
-
lockForPause
public final Object lockForPause
-
pause
protected boolean pause
-
isRunning
protected volatile boolean isRunning
-
stopped
protected volatile boolean stopped
-
killed
protected volatile boolean killed
-
processingUnits
protected ProcessingUnit[] processingUnits
-
outputQueue
protected BoundedWorkQueue outputQueue
-
workQueue
protected BoundedWorkQueue workQueue
-
-
Constructor Detail
-
CPMEngine
public CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData) throws Exception
Initializes Collection Processing Engine. Assigns this thread and all processing threads created by this component to a common Thread Group.- Parameters:
aThreadGroup
- - contains all CPM related threadsaCpeFactory
- - CPE factory object responsible for parsing cpe descriptor and creating componentsaProcTr
- - instance of the ProcessTrace where the CPM accumulates statsaCheckpointData
- - checkpoint object facillitating restart from the last known point- Throws:
Exception
-
-
Method Detail
-
getProcessingContainers
public LinkedList getProcessingContainers()
Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by its own container.
-
getAllProcessingContainers
public LinkedList getAllProcessingContainers()
Returns a list of All Processing Containers. Each CasProcessor is managed by its own container.
-
getThreadCount
public int getThreadCount() throws ResourceConfigurationException
Returns number of processing threads- Returns:
- - number of processing threads
- Throws:
ResourceConfigurationException
- -
-
setStats
public void setStats(Map aMap)
Plugs in a map where the engine stores perfomance info at runtime- Parameters:
aMap
- - map for runtime stats and totals
-
getStats
public Map getStats()
Returns CPE stats- Returns:
- Map containing CPE stats
-
setPauseOnException
public void setPauseOnException(boolean aPause)
Sets a global flag to indicate to the CPM that it should pause whenever exception occurs- Parameters:
aPause
- - true if pause is requested on exception, false otherwise
-
isPauseOnException
public boolean isPauseOnException()
Returns if the CPM should pause when exception occurs- Returns:
- - true if the CPM pauses when exception occurs, false otherwise
-
setInputQueueSize
public void setInputQueueSize(int aInputQueueSize)
Defines the size of inputQueue. The queue stores this many entities read from the CollectionReader. Every processing pipeline thread will read its entities from this input queue. The CollectionReader is decoupled from the consumer of entities, and continuously replenishes the input queue.- Parameters:
aInputQueueSize
- the size of the batch.
-
setOutputQueueSize
public void setOutputQueueSize(int aOutputQueueSize)
Defines the size of outputQueue. The queue stores this many entities enqueued by every processing pipeline thread.The results of analysis are dumped into this queue for consumer thread to consume its contents.- Parameters:
aOutputQueueSize
- the size of the batch.
-
setPoolSize
public void setPoolSize(int aPoolSize)
Defines the size of Cas Pool.- Parameters:
aPoolSize
- the size of the Cas pool.
-
getPoolSize
public int getPoolSize()
-
setConcurrentThreadSize
public void setConcurrentThreadSize(int aConcurrentThreadSize)
Defines number of threads executing the processing pipeline concurrently.- Parameters:
aConcurrentThreadSize
- the size of the batch.
-
addStatusCallbackListener
public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
-
getCallbackListeners
public ArrayList getCallbackListeners()
Returns a list of ALL callback listeners currently registered with the CPM- Returns:
- -
-
removeStatusCallbackListener
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Unregisters given listener from the CPM- Parameters:
aListener
- - instance ofBaseStatusCallbackListener
to unregister
-
isKilled
public boolean isKilled()
Returns true if this engine has been killed- Returns:
- true if this engine has been killed
-
killIt
public void killIt()
Kill CPM the hard way. None of the entities in the queues will be processed. This methof simply empties all queues and at the end adds EOFToken to the work queue so that all threads go away.
-
isHardKilled
public boolean isHardKilled()
Returns if the CPE was killed hard. Soft kill allows the CPE to finish processing all in-transit CASes. Hard kill causes the CPM to stop processing and to throw away all unprocessed CASes from its queues.- Returns:
- true if the CPE was killed hard
-
asynchStop
@Deprecated public void asynchStop()
Deprecated.
-
stopIt
public void stopIt()
Stops execution of the Processing Pipeline and this thread.
-
isParallizable
public boolean isParallizable(CasProcessor aProcessor, String aCpName) throws Exception
Determines if a given Cas Processor is parallelizable. Remote Cas Processors are by default parallelizable. For integrated and managed the CPM consults Cas Processor's descriptor to determine if it is parallelizable.- Parameters:
aProcessor
- - Cas Processor being checkedaCpName
- - name of the CP- Returns:
- - true if CP is parallelizable, false otherwise
- Throws:
Exception
- -
-
addCasProcessor
public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline. If a CasProcessor already exists and its status=DISABLED this method will re-enable the CasProcesser.- Parameters:
aCasProcessor
- CASProcessor to be added to the processing pipeline- Throws:
ResourceConfigurationException
-
addCasProcessor
public void addCasProcessor(CasProcessor aCasProcessor, int aIndex) throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline- Parameters:
aCasProcessor
- CASProcessor to be added to the processing pipelineaIndex
- - insertion point for a given CasProcessor- Throws:
ResourceConfigurationException
-
removeCasProcessor
public void removeCasProcessor(int aCasProcessorIndex)
Removes a CASProcessor from the processing pipeline- Parameters:
aCasProcessorIndex
- - CasProcessor position in processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline- Parameters:
aCasProcessorIndex
- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(String aCasProcessorName)
Disable a CASProcessor in the processing pipeline- Parameters:
aCasProcessorName
- CASProcessor to be added to the processing pipeline
-
enableCasProcessor
public void enableCasProcessor(String aCasProcessorName)
Disable a CASProcessor in the processing pipeline- Parameters:
aCasProcessorName
- CASProcessor to be added to the processing pipeline
-
getCasProcessors
public CasProcessor[] getCasProcessors()
Returns all CASProcesors in the processing pipeline
-
redeployAnalysisEngine
public void redeployAnalysisEngine(ProcessingContainer aProcessingContainer) throws Exception
Deploys CasProcessor and associates it with aProcessingContainer
- Parameters:
aProcessingContainer
-- Throws:
Exception
-
deployCasProcessors
public void deployCasProcessors() throws AbortCPMException
Starts CASProcessor containers one a time. During this phase the container deploys a TAE as local,remote, or integrated CasProcessor.- Throws:
AbortCPMException
-
isRunning
public boolean isRunning()
Returns a global flag indicating if this Thread is in processing state
-
isPaused
public boolean isPaused()
Returns a global flag indicating if this Thread is in pause state
-
pauseIt
public void pauseIt()
Pauses this thread
-
resumeIt
public void resumeIt()
Resumes this thread
-
setCollectionReader
public void setCollectionReader(BaseCollectionReader aCollectionReader)
Sets CollectionReader to use during processing- Parameters:
aCollectionReader
- aCollectionReader
-
setNumToProcess
public void setNumToProcess(long aNumToProcess)
Defines the size of the batch
-
getLastProcessedDocId
public String getLastProcessedDocId()
Returns Id of the last document processed
-
getLastDocRepository
public String getLastDocRepository()
-
pipelineKilled
public void pipelineKilled(String aPipelineThreadName)
Callback method used to notify the engine when a processing pipeline is killed due to excessive errors. This method is only called if the processing pipeline is unable to acquire a connection to remote service and when configuration indicates 'kill-pipeline' as the action to take on excessive errors. When running with multiple pipelines, routine decrements a global pipeline counter and tests if there are no more left. When all pipelines are killed as described above, the CPM needs to terminate. Since pipelines are prematurely killed, there are artifacts (CASes) in the work queue. These must be removed from the work queue and disposed of (released) back to the CAS pool so that the Collection Reader thread properly exits.- Parameters:
aPipelineThreadName
- - name of the pipeline thread exiting from its run() method
-
run
public void run()
Using given configuration creates and starts CPE processing pipeline. It is either single-threaded or a multi-threaded pipeline. Which is actually used depends on the configuration defined in the CPE descriptor. In multi-threaded mode, the CPE starts number of threads: 1) ArtifactProducer Thread - this is a thread containing a Collection Reader. It runs asynchronously and it fills a WorkQueue with CASes. 2) CasConsumer Thread - this is an optional thread. It is only instantiated if there Cas Consumers in the pipeline 3) Processing Threads - one or more processing threads, configured identically, that are performing analysis How many threads are started depends on configuration in CPE descriptor All threads started here are placed in a ThreadGroup. This provides a catch-all mechanism for errors that may occur in the CPM. If error is thrown, the ThreadGroup is notified. The ThreadGroup than notifies all registers listeners to give an application a chance to report the error and do necessary cleanup. This routine manages all the threads and makes sure that all of them are cleaned up before returning. The ThreadGroup must cleanup all threads under its control otherwise a memory leak occurs. Even those threads that are not started must be cleaned as they end up in the ThreadGroup when instantiated. The code uses number of state variables to make decisions during cleanup.
-
cleanup
public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.
-
stopCasProcessors
public void stopCasProcessors(boolean kill) throws CasProcessorDeploymentException
Stops All Cas Processors and optionally changes the status according to kill flag- Parameters:
kill
- - true if CPE has been stopped before completing normally- Throws:
CasProcessorDeploymentException
-
getProgress
public Progress[] getProgress()
Returns collectionReader progress.
-
invalidateCASes
public void invalidateCASes(CAS[] aCASList)
-
releaseCASes
public void releaseCASes(CAS[] aCASList)
Releases given cases back to pool.- Parameters:
aCASList
- - cas list to release
-
setPerformanceTuningSettings
public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings)
Overrides the default performance tuning settings for this CPE. This affects things such as CAS sizing parameters.- Parameters:
aPerformanceTuningSettings
- the new settings- See Also:
UIMAFramework.getDefaultPerformanceTuningProperties()
-
getPerformanceTuningSettings
public Properties getPerformanceTuningSettings()
- Returns:
- Returns the PerformanceTuningSettings.
-
setProcessControllerAdapter
public void setProcessControllerAdapter(ProcessControllerAdapter aPca)
- Parameters:
aPca
-
-
getCpeConfig
protected CpeConfiguration getCpeConfig() throws Exception
- Throws:
Exception
-
dropCasOnException
public boolean dropCasOnException()
-
runSingleThreaded
public void runSingleThreaded() throws Exception
Runs the CPE in a single thread without queues.- Throws:
Exception
- -
-
callEntityProcessCompleteWithCAS
public static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps)
Internal use only, public for crss package access. switches class loaders and locks cas- Parameters:
statCL
- status call back listenercas
- caseps
- entity process status
-
-