Class IterativeActivityRun<I,WD extends WorkDefinition,AH extends ActivityHandler<WD,AH>,WS extends AbstractActivityWorkStateType>

java.lang.Object
com.evolveum.midpoint.repo.common.activity.run.AbstractActivityRun<WD,AH,BS>
com.evolveum.midpoint.repo.common.activity.run.LocalActivityRun<WD,AH,WS>
com.evolveum.midpoint.repo.common.activity.run.IterativeActivityRun<I,WD,AH,WS>
All Implemented Interfaces:
ImplicitSegmentationResolver, IterativeActivityRunSpecifics, ExecutionSupport, DebugDumpable
Direct Known Subclasses:
PlainIterativeActivityRun, SearchBasedActivityRun

public abstract class IterativeActivityRun<I,WD extends WorkDefinition,AH extends ActivityHandler<WD,AH>,WS extends AbstractActivityWorkStateType> extends LocalActivityRun<WD,AH,WS> implements ExecutionSupport, IterativeActivityRunSpecifics
Represents a run of an iterative activity: either plain iterative one or search-based one. Responsibilities at this level of abstraction: 1. Orchestrates the basic run cycle - see runLocally(OperationResult): a. calls before/after execution "hook" methods + the main execution routine, b. generates the execution result based on kind(s) of error(s) encountered. 2. Orchestrates the basic bucket execution cycle - see processOrAnalyzeOrSkipSingleBucket(OperationResult): a. item source preparation, b. before/after bucket execution "hook" methods, along with main iterateOverItemsInBucket(OperationResult) method, c. sets up and winds down the coordinator,
  • Field Details

    • bucket

      protected WorkBucketType bucket
      Current bucket that is being processed. It is used to narrow the search query for search-based activities.
    • coordinator

      protected ProcessingCoordinator<I> coordinator
      Schedules individual items for processing by worker threads (if running in multiple threads). Re-created for each individual bucket.
    • errorState

      @NotNull protected final @NotNull ErrorState errorState
      Error state. In particular, should we stop immediately because of a fatal exception? TODO rethink this
    • transientRunStatistics

      @NotNull protected final @NotNull TransientActivityRunStatistics transientRunStatistics
      Maintains selected statistical information related to processing items in the current run. It is like a simplified version of ActivityItemProcessingStatistics that cover all the runs (and sometimes all the realizations) of an activity.
    • beans

      @NotNull protected final @NotNull CommonTaskBeans beans
      Useful Spring beans.
  • Constructor Details

  • Method Details

    • runLocally

      @NotNull protected final @NotNull ActivityRunResult runLocally(OperationResult result) throws ActivityRunException, CommonException
      Specified by:
      runLocally in class LocalActivityRun<WD extends WorkDefinition,AH extends ActivityHandler<WD,AH>,WS extends AbstractActivityWorkStateType>
      Throws:
      ActivityRunException
      CommonException
    • getChannelOverride

      @Nullable protected @Nullable String getChannelOverride()
      Channel URI that should be set into the task during this activity run. (If not null.)
    • prepareItemSourceForCurrentBucket

      protected abstract void prepareItemSourceForCurrentBucket(OperationResult result) throws ActivityRunException, CommonException
      Prepares the item source. E.g. for search-iterative tasks we prepare object type, query, and options here. Iterative activities delegate this method fully to the plugin. However, search-based activities provide their own default implementation.
      Throws:
      ActivityRunException
      CommonException
    • isInRepository

      protected abstract boolean isInRepository(OperationResult result) throws ActivityRunException, CommonException
      Do we execute over items in repository? (Maybe the name should be changed.)
      Throws:
      ActivityRunException
      CommonException
    • determineOverallSize

      @Nullable public @Nullable Integer determineOverallSize(OperationResult result) throws CommonException, ActivityRunException
      Determines expected progress (overall size) for the activity. E.g. for search-based activities we count the objects here (overall).
      Returns:
      null if no value could be determined or is not applicable
      Throws:
      CommonException
      ActivityRunException
    • determineCurrentBucketSize

      @Nullable public @Nullable Integer determineCurrentBucketSize(OperationResult result) throws CommonException
      Determines the current bucket size. E.g. for search-based activities we count the objects here (in current bucket).
      Returns:
      null if no value could be determined or is not applicable
      Throws:
      CommonException
    • iterateOverItemsInBucket

      protected abstract void iterateOverItemsInBucket(OperationResult result) throws CommonException
      Starts the item source (e.g. `searchObjectsIterative` call or `synchronize` call) and begins processing items generated by it. Returns when the source finishes. For example: - for search-based tasks, this call returns immediately after the iterative search is over; - for live sync task, this returns after all changes were fetched and acknowledged, and the resulting token was written; - for async update task, this returns also after all changes were fetched and acknowledged and confirmed to the source.
      Throws:
      CommonException
    • isMultithreaded

      public final boolean isMultithreaded()
    • ensureNoWorkerThreads

      protected final void ensureNoWorkerThreads()
      Fails if worker threads are defined. To be used in tasks that do not support multithreading.
    • ensureNoParallelism

      protected final void ensureNoParallelism()
      Fails if there is any parallelism within this activity: worker threads or worker tasks. It is to avoid unintended parallel execution like the one in MID-7861. Note that this method does not preclude parallel execution with a different activity. But that is not enabled by default, so it's safe to assume it will not be configured by mistake.
    • getShortName

      @NotNull public final @NotNull String getShortName()
    • getContextDescription

      @NotNull public final @NotNull String getContextDescription()
    • getContextDescriptionSpaced

      @NotNull public final @NotNull String getContextDescriptionSpaced()
      Inserts a space before context description if it's not empty.
    • setContextDescription

      public final void setContextDescription(String value)
    • handleError

      public final ErrorHandlingStrategyExecutor.FollowUpAction handleError(@NotNull @NotNull OperationResultStatus status, @NotNull @NotNull Throwable exception, ItemProcessingRequest<?> request, OperationResult result)
    • getDefaultErrorAction

      @NotNull protected abstract ErrorHandlingStrategyExecutor.FollowUpAction getDefaultErrorAction()
      Returns:
      Default error action if no policy is specified or if no policy entry matches.
    • getRootTaskOid

      @NotNull public final @NotNull String getRootTaskOid()
    • updateStatistics

      public void updateStatistics(boolean updateThreadLocalStatistics, OperationResult result) throws SchemaException, ObjectNotFoundException
      Updates statistics in the coordinator task (including TL if it's safe to do so). If needed, also updates the statistics in the repository. Statistics updated in the task: - task.operationStats, - progress (both activity-based and legacy), - activity statistics: items, synchronization, actions executed, bucketing operations Note that using modifyObjectDynamically would be perhaps better, but the current use of last update timestamp ensures that there will not be concurrent updates of the coordinator coming from its worker threads.
      Throws:
      SchemaException
      ObjectNotFoundException
    • canUpdateThreadLocalStatistics

      protected boolean canUpdateThreadLocalStatistics()
      Returns true if it's safe to update TL statistics in coordinator. Normally, it is so. A notable exception is asynchronous update using AMQP (an experimental feature for now). The reason is that the message handling occurs in a thread different from the task thread. See MID-7464.
    • getTransientRunStatistics

      @NotNull public final @NotNull TransientActivityRunStatistics getTransientRunStatistics()
    • processItem

      public abstract boolean processItem(@NotNull @NotNull ItemProcessingRequest<I> request, @NotNull @NotNull RunningTask workerTask, OperationResult result) throws ActivityRunException, CommonException
      Throws:
      ActivityRunException
      CommonException
    • determineActivityStateForCounters

      @NotNull protected final @NotNull ActivityState determineActivityStateForCounters(@NotNull @NotNull OperationResult result) throws SchemaException, ObjectNotFoundException
      Overrides:
      determineActivityStateForCounters in class AbstractActivityRun<WD extends WorkDefinition,AH extends ActivityHandler<WD,AH>,WS extends AbstractActivityWorkStateType>
      Throws:
      SchemaException
      ObjectNotFoundException
    • getItemsReport

      @NotNull public final @NotNull ItemsReport getItemsReport()
    • getConnIdOperationsReport

      @NotNull public final @NotNull ConnIdOperationsReport getConnIdOperationsReport()
    • shouldReportItems

      public final boolean shouldReportItems()
    • shouldReportConnIdOperations

      public final boolean shouldReportConnIdOperations()
    • shouldReportInternalOperations

      public final boolean shouldReportInternalOperations()
    • getBucket

      public final WorkBucketType getBucket()
    • enableGlobalConnIdOperationsListener

      public final void enableGlobalConnIdOperationsListener()
    • disableGlobalConnIdOperationsListener

      public final void disableGlobalConnIdOperationsListener()
    • getErrorState

      @NotNull public @NotNull ErrorState getErrorState()