0
votes

Need help with Java multiple threading

I have a case as below:

There are many records. Each record has about 250 fields. Each field needs to be validated against on a predefined rule.

So I defined a class, FieldInfo, to represent each field:

public class FieldInfo {
    private String name;
    private String value;
    private String error_code;
    private String error_message;

    // ignore getters and setters
} 

a class Record to represent a record:

public class Record {
    List<FieldInfo> fields;

    // omit getter and setter here
}

and the rule interface and class:

public interface BusinessRule {
    // validating one field needs some other fields' value in the same record. So the list of all fields for a certain record passed in as parameter 
    public FieldInfo validate(List<FieldInfo> fields);
}

public class FieldName_Rule implements BusinessRule {

    public FieldInfo validate(List<FieldInfo> fields) { 
    // will do 
    // 1. pickup those fields required for validating this target field, including this target field
    // 2. performs validation logics A, B, C... 

    // note: all rules only read data from a database, no update/insert operations. 
    }
}

User can submit 5000 records or more at a time for process. The performance requirement is high. I was thinking to have multiple threads for the submitted, for example 5000, records (means one thread run several records), and in each thread, fork another multiple threads on each record to run rules.

But unfortunately, such embedded multi-threading always died in my case.

Here are some key parts from the above solution:

public class BusinessRuleService {

    @Autowired
    private ValidationHandler handler;

    public String process(String xmlRequest) {
        List<Record> records = XmlConverter.unmarshall(xmlRequest).toList();
        ExecutorService es = Executors.newFixedThreadPool(100);
        List<CompletableFuture<Integer> futures = 
                records.stream().map(r->CompletableFuture.supplyAsync(()-> handler.invoke(r), es)).collect(Collectors.toList());
        List<Integer> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println("total records %d processed.", result.size());
        es.shutdown();
        return XmlConverter.marshallObject(records);
    }
}

@Component
public class ValidationHandlerImpl implements ValidationHandler {

    @Autowired
    private List<BusinessRule> rules;

    @Override
    public int invoke(Record record) {

        ExecutorService es = Executors.newFixedThreadPool(250);
        List<CompletableFuture<FieldInfo> futures = 
                rules.stream().map(r->CompletableFuture.supplyAsync(()-> r.validate(record.getFields()), es)).collect(Collectors.toList());
        List<FieldInfo> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println("total records %d processed.", result.size());
        es.shutdown();
        return 0;
    }
}

The workflow is: User submits a list of records in an xml string format. One of the application endpoint launches the process method in a BusinessRuleService object. The process uses CompletableFuture to compose tasks and submit the tasks to a ExecutorService which has a thread pool of size 100. Each task in the CompletableFuture list then launches ValidationHandler object. The ValidationHandler object composes another CompletableFuture task and submit the task to another ExecutorService which has the pool size the same as the rule list size.

The above solution is proper?

Note: my current solution is: the submitted records are processed in sequence. And the 250 rules are processed in parallel for each record. With this solution, it takes more than 2 hours for 5000 records. Such poor performance is not acceptable by business.

I am very new to concurrent/multi-threading programming. Much appreciate for all kind of helps!

1
So since you have a performance problem, you've surely profiled your application to determine where the performance hotspots are? You're not assuming things, and trying to blindly optimize your software without knowing whether it's helpful or not, are you? - Kayaman
what do you mean by "embedded multi-threading"? - Alexei Kaigorodov
If you need on-the-spot parallel processing of independent things, have a look at streams (like streaming lists), and then parallel() - tevemadar
We do not know what kind of validation is performed, but from what you tell us, your problem may very well be the DB access time for each validation, and not necessarily the CPU consummed. In which case you MUST profile to see where time is spent. @Kayaman 's advice is the best. Measure. Then you'll find that maybe instead of parallelizing, you should pre-fetch DB data once for all. Maybe you should index your FieldInfo by name in map instead of iterating on a huge list. Maybe you should cache some stuff you compute often. And only then maybe you should parallelize. - GPI
@Kayaman You are correct. I am kinda blindly optimizing the program. Thanks for your comment. Will try to gain knowledge on profiling for performance. I updated my post. Please provide your comments and suggestions. Much appreciated! - Min Jiang

1 Answers

0
votes

This is a well known "single producer - multiple consumers" pattern. The classic solution is to create a BlockingQueue<Record> queue, and put records there at the pace of their reading. On the other end of the queue, a number of working threads read records from the queue and process them (in our case, validate the fields):

class ValidatingThread extends Tread {
   BlockingQueue<Record> queue;
   FieldName_Rule validator = new FieldName_Rule();

   public Validator (BlockingQueue<Record> queue) {
      this.queue = queue;
   }

   public void run() {
      Record record = queue.take();
      validator.validate(collectFields(record));
   }
}

The optimal number of threads equals to the Runtime.getRuntime().availableProcessors(). Start them all at the beginning, and do not use "embedded multi-threading". The task how to stop the threads after all the records are processed, is left as a learning assignment.