Skip to content

Latest commit

 

History

History
264 lines (225 loc) · 15.1 KB

batch-processing-reference.adoc

File metadata and controls

264 lines (225 loc) · 15.1 KB

Batch Processing Reference

Enterprise, CloudHub

Terminology

Term Description Element

Batch

A group of records which Mule processes individually, by record.

n/a

Batch Commit

A scope which accumulates records into chunks to prepare bulk upserts to external source or service.

batch:commit

Batch Job

The top-level element in an application in which Mule processes a message payload as a batch of records. The term batch job is inclusive of all four phases of processing: Input, Load and Dispatch, Process, and On Complete.

batch:job

Batch Job Instance

An occurrence in a Mule application resulting from the execution of a batch job in a Mule flow. you can let Mule create the batch job instance in the Load and Dispatch phase for you, or you can specify a distinct job instance Id through a Mule Expression. This instance exists eternally.

job-instance-id

Batch Job Result

POJO which contains information about the processing results of a batch job instance. You can use it to get information about execution progress.

BatchJobResult

Batch Message Processor

An element in a Mule flow which triggers execution of a batch job.

batch:execute

Batch Phase

Sequentially ordered stages through which batches pass as Mule processes them.

batch:input
batch:process-records
batch:on-complete 

Batch Step

A child element of the batch job within which multiple message processors act upon records in a batch.

batch:step

Record

A small part of a large message’s payload; a single instance of the result of Mule’s action to split a serialized message payload (i.e. a collection or array) into pieces for processing.

n/a

Block size

Batch records are queued and scheduled in blocks. This element determines the size of the block that will be used for the job’s instances.

block-size

Batch Elements

Studio Palette XML Editor Use

Batch

batch:job

Defines a batch "flow".

Batch Commit

batch:commit

Accumulates records into chunks to prepare bulk upserts to external source or service.

Batch Reference

batch:execute

Set within a Mule flow, triggers start of batch job.

Batch Threading Profile

batch:threading-profile

Configures details regarding threads upon which Mule processes batch jobs.

Record Variable

batch:record-variable-transformer

Sets or removes recordVars on records. Valid only within batch steps inside the Process batch phase.

Elements and Attributes

Element Attribute Value Attribute Req’d Attribute Description

batch:commit

doc:name

unique name for message processor

x

Defines unique identifier for batch commit wrapper.

size

integer

x

Defines number of records to collect before initiating upsert chunk of records to external source or service.

Element Attribute Value Attribute Req’d Attribute Description

batch:execute

name

batch job name

x

Identifies the batch job to execute.

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor; can be an expression.

Element Attribute Value Attribute Req’d Attribute Description

batch:input

none

n/a

 

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:job

name

unique name for job

x

Defines unique identifier for job.

 

max-failed-records

0
-1
other integer 

 

0 = tolerates no failures, stops processing batch immediately.
-1=  tolerates all failures, never stops processing because of failed record.
integer = defines maximum number of failed records a batch tolerates before stopping processing.

Element Attribute Value Attribute Req’d Attribute Description

batch:on-complete

none

n/a

 

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:process-records

none

n/a

 

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:remove-record-variable-transformer

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor.

variableName

name for record-level variable

x

Identifies record-level variable for removal.

Element Attribute Value Attribute Req’d Attribute Description

batch:set-record-variable-transformer

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor.

value

MEL expression

x

Defines value of named variable.

 

variableName

name for record-level variable

x

Defines unique name for record-level variable.

Element Attribute Value Attribute Req’d Attribute Description

batch:step

name

unique name for step

x

Defines unique identifier for step inside the batch job.

 

accept-policy

ALL
FAILURES_ONLY
NO_FAILURES 

 

ALL = step processes all records, failed and successful.
FAILURES_ONLY = step processes only records which failed in a preceding step.
NO_FAILURES = step processes only records which succeeded in all preceding steps.

accept-expression

MEL expression

 

Step processes only those records which, relative to the expression, evaluate to true (evaluate to false = skip record).

Element Attribute Value Attribute Req’d Attribute Description

batch:threading-profile

poolExhaustedAction

WAIT
WAIT
DISCARD
DISCARD_OLDEST
ABORT
RUN 

 

Defines what a batch job should do if all threads are active.
WAIT = (Default) wait until next thread is available
DISCARD = discard waiting batch job
DISCARD_OLDEST = discard the oldest waiting batch job
ABORT = abort processing the batch job
RUN = don’t wait for a thread to become available, run the batch job synchronously

 

maxThreadsActive

integer

 

Defines the maximum number of active threads upon which Mule processes batch jobs.

 

maxThreadsIdle

integer

 

Defines the minimum number of active threads upon which Mule processes batch jobs.

 

threadTTL

integer

 

Defines, in milliseconds, the time a thread should live and remain idle before becoming inactive.

 

threadWaitTimeout

integer

 

Defines how long a batch job should wait for a thread to become available before timing out.

maxBufferSize

integer

 

Defines the size of the "overflow" memory which holds batch jobs while waiting for a thread to become available.

Batch Commit Connectors

Several Anypoint Connectors have the ability to handle record-level errors without failing a whole batch commit (i.e. upsert). At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert.  Thus, rather than failing a complete group of records during a commit activity, the connector simply upserts as many records as it can, and tracks any failures for notification. The short – but soon to grow – list of such connectors follows:

  • Salesforce

  • NetSuite

BatchJobResult Processing Statistics

Statistic Description

batchJobInstanceId

A String indicating the id of the executed job instance.

elapsedTimeInMillis

A long indicating the number of milliseconds the batch job spent in executing state.

failedOnCompletePhase

A boolean indicating whether an exception was found on the on the complete phase.

failedOnInputPhase

A boolean indicating whether an exception was found on the on the input phase.

failedOnLoadingPhase

A boolean indicating whether an exception was found on the on the input phase.

failedRecords

A long indicating the number of records that failed processing.

inputPhaseException

If an exception was found in the input phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnInputPhase.

loadedRecords

A long indicating the number of records loaded so far. Once the loading phase is completed, it should be equal to totalRecords.

loadingPhaseException

If an exception was found in the loading phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnLoadingPhase.

onCompletePhaseException

If an exception was found in the on complete phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnCompletePhase.

processedRecords

A long indicating the number of records processed so far. It equals successfulRecords failedRecords, but it could be lower than totalRecords if the job is not finished.

successfulRecords

A long indicating the number of records processed so far.

totalRecords

Total number of records in the batch.

Example

Note
For a full description of the example and steps the batch job takes in each phase of processing, see Batch Processing.
[tab,title="Studio Visual Editor"]
....
image:example_batch.png[example_batch]
....
[tab,title="XML Editor"]
....
[NOTE]
====
If you copy-paste the code into your instance of Studio, be sure to enter your own values for the the *global Salesforce connector*:

* username
* password
* security token

How do I get a Salesforce security token?

. Log in to your Salesforce account. From your account menu (your account is labeled with your name), select *Setup*.

. In the left navigation bar, under the *My Settings* heading, click to expand the **Personal **folder. 

. Click *Reset My Security Token*. Salesforce resets the token and emails you the new one.

. Access the email that Salesforce sent and copy the new token onto your local clipboard.

. In the application in your instance of Anypoint Studio, click the *Global Elements* tab. 

. Double-click the Salesforce global element to open its *Global Element Properties* panel. In the *Security Token* field, paste the new Salesforce token you copied from the email. Alternatively, configure the global element in the XML Editor.
====

[source,xml, linenums]
----
<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
 
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
 
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
 
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
 
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd">
 
    <sfdc:config name="Salesforce" username="username" password="password" securityToken="SpBdsf98af9tTR3m3YVcm4Y5q0y0R" doc:name="Salesforce">
        <sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </sfdc:config>
 
    <data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>
 
    <data-mapper:config name="new_mapping_1_grf" transformationGraphPath="new_mapping_1.grf" doc:name="DataMapper"/>
 
    <data-mapper:config name="leads_grf" transformationGraphPath="leads.grf" doc:name="DataMapper"/>
 
    <data-mapper:config name="csv_to_lead_grf" transformationGraphPath="csv-to-lead.grf" doc:name="DataMapper"/>
 
    <batch:job max-failed-records="1000" name="Create Leads" doc:name="Create Leads">
        <batch:threading-profile poolExhaustedAction="WAIT"/>
        <batch:input>
            <file:inbound-endpoint path="src/test/resources/input" moveToDirectory="src/test/resources/output" responseTimeout="10000" doc:name="File"/>
            <data-mapper:transform config-ref="csv_to_lead_grf" doc:name="CSV to Lead"/>
        </batch:input>
 
        <batch:process-records>
            <batch:step name="lead-check" doc:name="Lead Check">
                <enricher source="#[payload.size() &gt; 0]" target="#[recordVars['exists']]" doc:name="Message Enricher">
                    <sfdc:query config-ref="Salesforce" query="dsql:SELECT Id FROM Lead WHERE Email = '#[payload[&quot;Email&quot;]]'" doc:name="Find Lead"/>
                </enricher>
            </batch:step>
            <batch:step name="insert-lead"  doc:name="Insert Lead" accept-expression="#[recordVars['exists']]">
                <logger message="Got Record #[payload], it exists #[recordVars['exists']]" level="INFO" doc:name="Logger"/>
                <batch:commit size="200" doc:name="Batch Commit">
                    <sfdc:create config-ref="Salesforce" type="Lead" doc:name="Insert Lead">
                        <sfdc:objects ref="#[payload]"/>
                    </sfdc:create>
                </batch:commit>
            </batch:step>
            <batch:step name="log-failures" accept-policy="ONLY_FAILURES" doc:name="Log Failures">
                <logger message="Got Failure #[payload]" level="INFO" doc:name="Log Failure"/>
            </batch:step>
        </batch:process-records>
 
        <batch:on-complete>
            <logger message="#[payload.loadedRecords] Loaded Records #[payload.failedRecords] Failed Records" level="INFO" doc:name="Log Results"/>
        </batch:on-complete>
    </batch:job>
</mule>
----
....

See Also