Search This Blog

Wednesday 30 October 2013

Anybody just give me the result !!

I came across the invokeAny method of ExecutorService and liked the idea a lot. There could always be the case that you have a set of input of which you care to see any one processed. Once we have an output the remaining results don't matter.
Accordingly I decided to implement such a case with the invokeAny method of  ExecutorService.

public class InvokeAnyEg {
  
  private static Random random = new Random((long) Math.random() * 3);
 
  private static int getProblemInput() {
          int nxtNo = random.nextInt();
          nxtNo = nxtNo < 0 ? nxtNo * -1 : nxtNo;
          return nxtNo/1000; //just reducing the size of number
  }
 
  public static void main(String[] args) throws InterruptedException,
                  ExecutionException {
          final int SOLVER_COUNT = 5;
          Collection<Solver> solvers = new ArrayList<Solver>(SOLVER_COUNT);
         
          for (int i = 0; i < SOLVER_COUNT; i++) {
                  Solver solver = new Solver(getProblemInput());
                  solvers.add(solver);
          }
          System.out.println("Start solving ..");
          ExecutorService executorService = Executors.newFixedThreadPool(SOLVER_COUNT);
          Long solution = executorService.invokeAny(solvers);
          System.out.println("A solution has been found , result is " + solution);
          executorService.shutdown();
  }
}
As seen above I have a set of participants working on a problem. I just need the answer from any one of my participants. As seen here, I have distributed my problem among 5 'solvers'. The collection of solvers is passed to my executorService and the invokeAny method will return the successful result found by the first thread.
Hint: Dear Reader, Do not look to try and understand the problem input. Its simply a random number.
class Solver implements Callable<Long> {

  private int problem;

  public Solver(int problem) {
          this.problem = problem;
  }

  @Override
  public Long call() throws Exception {
          String threadName = "[" + Thread.currentThread().getName() + "]";
          //Start processing
          System.out.println(threadName + " working with " + problem);
          long result = 0;
          for (long i = 0; i < problem; i++) {
                  result = result + 1;
          }
          System.out.println("Result computed by " + threadName + " is - " + result);
          return Long.valueOf(result);
  }
}
The solution class does nothing but increment the result until the result is equal to the input (!!!).
If I run the code I would expect the first thread to complete to return the answer. The main method would display and the program will end. Right ?
Well... almost right. Let us look at the output:
Start solving ..
[pool-1-thread-1] working with 1155484
[pool-1-thread-2] working with 723955
[pool-1-thread-3] working with 1033096
Result computed by [pool-1-thread-2] is - 723955
[pool-1-thread-4] working with 1690734
[pool-1-thread-5] working with 1557280
Result computed by [pool-1-thread-5] is - 1557280
Result computed by [pool-1-thread-1] is - 1155484
Result computed by [pool-1-thread-3] is - 1033096
A solution has been found , result is 723955
Result computed by [pool-1-thread-4] is - 1690734
As seen 5 threads from the Thread pool started work on 5 tasks. The first thread to complete was Thread 2, which returned to main. Main displayed the result. As per the documentation for invokeAny API, the rest of the threads should have cancelled their tasks and returned. But surprise surprise - the remaining threads continue to run and run until they were done with their tasks too. Why did this happen? The documentation clearly sates
"Upon normal or exceptional return, tasks that have not completed are cancelled."
How would the Executor Service cancel a thread ?
took me some time before I realized - with an interrupt.
As I had no blocking method in my code, The interrupt only resulted in the interrupt status flag being set. This meant it was the job of my task to check the flag and react to it. Accordingly I changed my for loop:
for (long i = 0; i < problem; i++) {
  if (Thread.interrupted()) {
     System.out.println("Interrupt request received at " + threadName + " to cancel");
       throw new InterruptedException();
  }
  result = result + 1;
}
Now if the Thread receives an interrupt it immediately throws an InterruptedException which being unhandled cancels my execution.
Start solving ..
[pool-1-thread-1] working with 115548457
[pool-1-thread-3] working with 103309605
[pool-1-thread-5] working with 155728026
[pool-1-thread-4] working with 169073440
[pool-1-thread-2] working with 72395540
Result computed by [pool-1-thread-2] is - 72395540
Interrupt request received at [pool-1-thread-1] to cancel
A solution has been found , result is 72395540
Interrupt request received at [pool-1-thread-3] to cancel
Interrupt request received at [pool-1-thread-4] to cancel
Interrupt request received at [pool-1-thread-5] to cancel
As seen now, after output was received, the ThreadPool sent an interrupt on the remaining threads, which terminated immediately.
Throwing an InterruptedException was not a must. I could throw any exception for that matter. Or I could just return.
if (Thread.interrupted()) {
        System.out.println("Interrupt request received at " + threadName + " to cancel");
        // throw new InterruptedException();
        return -1L;
 }
The -1 value does not matter since nobody cares of the result from the remaining threads. There is one more overloaded version of invokeAny which accepts a time limit on the task to be performed. If a solution is not received within the specified time, the method throws a TimeoutException:
try {
    Long solution = executorService.invokeAny(solvers, 3l, TimeUnit.MICROSECONDS);
    System.out.println("A solution has been found , result is " + solution);
} catch (TimeoutException e) {
    e.printStackTrace();
} finally {
    executorService.shutdown();
}
The interrupt logic is applicable to the above method too.

No comments:

Post a Comment