Recently, I found a set of code that iterate through list of configs and start the job by using execution pool, and the way and style it use very different as I used to write, so I went through and write down this blog try to see what is the advantage of this sytle.
The class name is SimpleSparkEtlProcessor
, it has following attributes and method:
- list of config objects:
private ExecutorService executorService;
This list of config get initialized in the construction:
public SimpleSparkEtlProcessor(SimpleSparkEtlJobConfig config) {
this.simpleSparkEtlJobConfig = config;
this.lock = new ReentrantLock();
this.running = new AtomicBoolean(true);
this.extractConfigDirectoryList = config.getSourceConfigs();
}
run()
method, this is the start method.
public void run() {
running.set(true);
// Lock lock = locks.apply(simpleSparkEtlJobConfig);
try {
log.info("start the processors");
while (running.get()) {
if (acquireLock(lock)) {
List<CompletableFuture<Void>> futures = extractConfigDirectoryList.stream().map(sourceConfig -> CompletableFuture.runAsync(() -> {
runJob(sourceConfig);
}, executorService)).collect(Collectors.toList());
//wait for all the jobs to complete
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
lock.unlock();
}
}
} finally {
log.info("run into finally. we are exiting the processor.");
running.set(false);
lock.unlock();
}
}
@SneakyThrows
public void runJob(SimpleSparkEtlJobConfig.SourceConfig sourceConfig){
log.info("start directory monitoring job, directory: {}, Thread name: {}", sourceConfig.directory, Thread.currentThread().getName());
Function<String, CompletableFuture<String>> sparkSubmitter = createSparkProcessor();
FileProcessor fileProcessor = new FileProcessor(sourceConfig, sparkSubmitter);
fileProcessor.process();
log.info("complete directory monitoring job, directory: {}, Thread name: {}", sourceConfig.directory, Thread.currentThread().getName());
}
public Boolean acquireLock(Lock lock) {
try {
return lock.tryLock();
} catch (Exception e) {
log.info("{}", e);
return false;
}
}
- The whole code are wrapped in while loop - with an atomic boolean
running == true
while (running.get()) {
...
}
- All the threads have to get lock of this object before going further, this to prevent another thread running on this Processor object.
if (acquireLock(lock)) {
...
}
- use stream to iterate
extractConfigDirectoryList
and foreach conig callCompletableFuture.runAsync
to run the code asychronize It return list ofCompletableFuture<Void>
objects.
List<CompletableFuture<Void>> futures = extractConfigDirectoryList.stream().map(
sourceConfig -> CompletableFuture.runAsync(() -> { runJob(sourceConfig); }, executorService)
).collect(Collectors.toList());
here it use the CompletableFuture.runAsync(Runnable, ExecutorService)
interface, it return a CompletableFuture<Void>
object.
- After all jobs done, unlock the lock, back to the while loop.
//wait for all the jobs to complete
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
lock.unlock();
- Outside the while loop, there is
try...finally
to trap all the exception, if any thing happened, to prevent infinite loop, set therunning
to false and this will stop the while loop immediately
try{
...
} finally {
running.set(false);
lock.unlock();
}