By Lynn Walton – Sr. Software Engineer – ADP Cobalt SEO Team
Introduction
Recently the SEO team configured a new job using Spring Integration. The job includes several phases where steps need to be done for each item in a collection. While I could have used the same single-integration approach we have used before, there were disadvantages with the previous approach that I was hoping to eliminate by using new techniques.
First I’ll describe two features used in the new approach.
Asynchronous Gateways
Spring Integration’s @Gateway annotation allows you to designate a POJO defining an interface which, when wired up with <int:gateway> configuration, will allow a call to the interface to send the passed-in argument as the message payload for the configured channel. It’s convenient for “kicking off” an integration flow to operate with a given set of data. With configuration like the following, Spring Integration creates a GatewayFactoryProxyBean that implements your interface.
<int:gateway id="myJobGateway"
service-interface="com.cobalt.services.seo.integration.support.MyJobGateway"
default-request-channel="startJob" error-channel="jobErrorChannel" />
public interface MyJobGateway {
@Gateway
void startJob(List<Acccount> accounts);
}
In the single-integration approach we typically have our interface return void and do not configure a default-reply-channel. By not having a reply channel, and by having the integration use an ExecutorChannel early in the flow to break the single-thread context between sender/receiver, we end up with behavior similar to making an asynchronous call to the gateway. (Using an ExecutorChannel is done by adding <int:dispatcher taskExecutor=”…”/> to a channel.) In the new approach we use a true asynchronous gateway.
Since Spring Integration 2.0 there has been support for making the Gateway asynchronous. You automatically get an AsynchronousGateway just by defining your interface to return a Future<T>.
public interface MyAsyncJobGateway {
@Gateway
Future<MyResultClass> startJob(List<Account>);
}
Spring’s @Async annotation
Since Spring 3.0, annotating a method with @Async causes Spring to wrap your service in a proxy. When your method is called, the caller will get an immediate return, while the actual execution occurs in a task submitted to a Spring TaskExecutor.
@Service
public class MyServiceImpl implements MyService {
@Async
void myMethod(){
// do work
}
}
For a method where you wish to return something so callers have the option of choosing to wait for a result, you return your result by passing it to the constructor of the AsyncResult<T> class.
@Service
public class MyServiceImpl implements MyService {
@Async
Future myMethod(){
// do work
return new AsyncResult(instanceOfMyResultClass);
}
}
In our normal use case, our service is called by a REST endpoint (which is called from cron). We know the job will be long-running so the REST endpoint does not wait for the Future. But having the service defined to return a Future allows us to wait for it in our Integration tests. These tests are configured to work with a small dataset so waiting is feasible, and making assertions on the returned object simplifies the integration tests.
A High-level Overview of the Two Approaches
Note: the integration flow pseudo-code shown below attempts to provide clarity by leaving out all components other than those which show the flow of the work.
Previous Approach
(Note: processing for each item in a collection is handled by splitters and aggregators) |
New Approach |
1 integration flow:GetDataForAllSitesInAllAccounts – a synchronous gateway (with null return and no reply-channel *) which sends a list of accounts into the flow:
splitter
each account
get list of sites
splitter
each site
convert to list of urls to call
splitter
each url
make call and store
aggregator
aggregator
aggregator
|
2 integration flows:GetSitesForAccount – an asynchronous gateway which sends a single account into the flow:
get list of sites
GetSiteData – an asynchronous gateway which sends each site into a second flow
convert to list of urls to call
splitter
each url
make call and store
aggregator
|
1 service method:
- obtain the list of accounts
- call the gateway
- returns null almost immediately after gateway call because the gateway has a null return and no reply channel, and the integration has ExecutorChannel early in the flow.
|
1 service method marked with Spring’s @Async:
- obtain the list of accounts
- logic to control flow and concurrency for calling both gateways (See “Logic in the Service Method” below)
- returns a Future so callers have the option of waiting for result (great for testing)
|
Comparisons of Advantages and Disadvantages (PRO/CON)
Previous Approach |
New Approach with multiple Spring Async Gateways and a @Async service method |
CON: The integration flow is large and complex enough to make understanding more difficult. (Note: the pseudo-code above doesn’t show the real difference in complexity.) |
PRO: The separate integrations are much simpler to understand.CON: The looping that would be done by splitters/aggregators is now done with custom logic in the service method. |
CON: Getting a summary report is critical but quite difficult with this approach as you must code separate integration components that can handle 1) failures that might have occurred at different stages and 2) summarization of both error and non-error results. The error handling code is complicated because the available payload and header information is different in all stages.Additionally, for the summary (in our logs) to be easily understood, you need to be able to make a good estimate of the time each aggregator needs to wait for the typical case to be finished. (This estimated time is set as the MessageGroupStoreReaper’s timeout value.) Estimating gets more difficult when there are nested splitters and aggregators as each estimated time has to take into account the timeouts for the preceding aggregators. If you set these too short or if unusual circumstances make the job take longer than what you’ve estimated, the summary logging will be difficult to interpret as it relies on counts of items released to the aggregator – which might have happened more than once. Finally, the “best estimate” for production is not easy to guess in advance when pre-production environments differ significantly from production. |
PRO: Summary reporting is easier and more accurate because you’re catching any errors in the sections of code where you know the stage in which the error has occurred. Because of this there is no need to complicate the integration to store metadata in headers for retrieval from an error handling component. Also, there is only one aggregator that needs a time estimate. |
CON: Integration tests need to use Thread.sleep() with a long enough value to give the integration time to run before verifying results. This time varies in different computer environments with different loads, so to prevent build failures you’re forced to choose a pessimistically high value. The tests then take longer than they might have needed. |
PRO: Integration tests can wait with Future.get(estimatedTime, TimeUnit.SECONDS). You can set estimatedTime on the high side to avoid build failures without suffering the penalty of having to wait longer than necessary. |
CON: Integration tests have to perform before-and-after state querying to make meaningful assertions, since the gateway returns null to simulate asynchronous behavior. |
PRO: Integration tests can easily make meaningful assertions on the Future object returned rather than querying before-and-after state. |
PRO: Doing all of the work in one integration allows easy configuration of the number of concurrently executing tasks in each phase, by setting <int:dispatcher task-executor=”myExecutor”/> and configuring the desired pool-size on the executor. |
CON: We have to write the logic in Java to control a “quasi” level of concurrency for a particular stage. I say “quasi” because the nature of it is different than the way it would actually run in the case of a dispatcher with a task executor on a channel. With the coded logic we allow a degree of concurrency but only in batches of a set size where the batch is delayed until each task in the batch has finished. So there are pauses in the throughput that wouldn’t happen with the normal task-executor on a channel, which can always start on the next task when it has an available thread in the pool.PRO: Despite the CON, coding the concurrency logic can also be seen as an advantage, because we can dynamically pass the desired batch size to our service method. This allows the level of “quasi-concurrency” to be changed without deploying a new build and makes experimentation for arriving at the best value easier. |
Logic in the Service Method
The relevant portions of the service method are listed below to show the extra amount of logic we implement to get the other benefits described above. We do our own looping in place of the nested splitters/aggregators. We code the logic for performing some work concurrently in batches, but in exchange for this we get the flexibility of being able to dynamically change the batch size. Finally, the code below for creating and updating a JobStats (summary) object is not really extra. In our previous approach, it would still need to be coded in a separate summarization component used by the integration.
@Override
@Async()
public Future startJobForAccounts(final int numConcurrentSites, final String... acctLogins) {
try {
final List accounts = accountsUtil.createAccountsListForAcctEmail(acctLogins);
final JobStats stats = new JobStats();
// [1] LOOP REPLACING FIRST SPLITTER/AGGREGATOR
for (Account account : accounts) {
startJobForSingleAccount(numConcurrentSites, account, stats);
}
LOGGER.info(stats.createStatsLogStr());
return new AsyncResult(stats);
} catch (Exception exc) {
LOGGER.error("Unexpected error: " + exc.getMessage());
final JobStats stats = new JobStats();
// set properties on JobStats appropriate for indicating the error
return new AsyncResult(stats);
}
}
private void startJobForSingleAccount(final int numConcurrentSites, final Account account, final JobStats stats) {
Assert.isTrue(numConcurrentSites > 0, NUM_CONCURRENT_SITES_MUST_BE_GT_ZERO_MSG);
final List sites = getSitesForAccount(account, stats);
final Map<String, Future> futuresBatchMap = new LinkedHashMap<String, Future>();
final Iterator siteIter = sites.iterator();
int idx = 0;
// [2] LOOP REPLACING SECOND SPLITTER/AGGREGATOR
while (siteIter.hasNext()) {
final Site site = siteIter.next();
idx++;
// [3] 1<sup>st</sup> Gateway call
futuresBatchMap.put(site.toString(), getSiteDataGateway.startGetSiteData(site));
/*
* sites.size() - idx < numConcurrentSites makes sure that any final partial batch gets processed.
* It also means some of the last entries are processed in smaller batches or even one at a time.
*/
if (idx % numConcurrentSites == 0 || sites.size() - idx < numConcurrentSites) {
blockToProcessBatch(futuresBatchMap, stats);
}
}
}
private List getSitesForAccount(final Account account, final JobStats stats) {
// [4] 2nd Gateway call
final Future future = getSitesForAccountGateway.getSitesForAccount(account);
try {
final List sites = future.get(secondsToWaitForSitesListFuture, TimeUnit.SECONDS);
stats.getAccountsSucceeded().add(account.getLogin());
return sites;
} catch (Exception exc) {
stats.getAccountsFailed().add(account.getLogin());
LOGGER.error("Failed to process Account {}", account.getLogin(), exc);
return new ArrayList();
}
}
private void blockToProcessBatch(final Map<String, Future> futuresBatchMap, final JobStats stats) {
for (Map.Entry<String, Future> futureEntry : futuresBatchMap.entrySet()) {
try {
final SiteDataSummary summary = futureEntry.getValue()
.get(secondsToWaitForSiteDataFuture, TimeUnit.SECONDS);
summary.setSuccessful(true);
stats.incrementSitesSucceeded();
} catch (Exception exc) {
stats.getSitesFailed().add(futureEntry.getKey());
LOGGER.error("Failed to process {}", futureEntry.getKey(), exc);
}
}
futuresBatchMap.clear();
}
Conclusion
With a complex framework such as Spring Integration, there are often many ways to implement a desired task. We generally lean toward solutions that require less of our own business logic code and many times this approach serves us well. But it is worthwhile to think about alternative approaches. Sometimes writing a little more code to increase control and improve testability can be better than taking advantage of the features within a complex framework.
I’m glad I tried the approach as I learned a lot and believe the benefits listed above outweigh the only disadvantage – that of having slightly more service code to implement.
Reference Links:
http://docs.spring.io/spring/docs/3.0.x/reference/scheduling.html
http://docs.spring.io/spring-integration/docs/2.0.0.RC1/reference/html/gateway.html
Like this:
Like Loading...