Need help with Java multiple threading
I have a case as below:
There are many records. Each record has about 250 fields. Each field needs to be validated against on a predefined rule.
So I defined a class, FieldInfo, to represent each field:
public class FieldInfo {
private String name;
private String value;
private String error_code;
private String error_message;
// ignore getters and setters
}
a class Record to represent a record:
public class Record {
List<FieldInfo> fields;
// omit getter and setter here
}
and the rule interface and class:
public interface BusinessRule {
// validating one field needs some other fields' value in the same record. So the list of all fields for a certain record passed in as parameter
public FieldInfo validate(List<FieldInfo> fields);
}
public class FieldName_Rule implements BusinessRule {
public FieldInfo validate(List<FieldInfo> fields) {
// will do
// 1. pickup those fields required for validating this target field, including this target field
// 2. performs validation logics A, B, C...
// note: all rules only read data from a database, no update/insert operations.
}
}
User can submit 5000 records or more at a time for process. The performance requirement is high. I was thinking to have multiple threads for the submitted, for example 5000, records (means one thread run several records), and in each thread, fork another multiple threads on each record to run rules.
But unfortunately, such embedded multi-threading always died in my case.
Here are some key parts from the above solution:
public class BusinessRuleService {
@Autowired
private ValidationHandler handler;
public String process(String xmlRequest) {
List<Record> records = XmlConverter.unmarshall(xmlRequest).toList();
ExecutorService es = Executors.newFixedThreadPool(100);
List<CompletableFuture<Integer> futures =
records.stream().map(r->CompletableFuture.supplyAsync(()-> handler.invoke(r), es)).collect(Collectors.toList());
List<Integer> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return XmlConverter.marshallObject(records);
}
}
@Component
public class ValidationHandlerImpl implements ValidationHandler {
@Autowired
private List<BusinessRule> rules;
@Override
public int invoke(Record record) {
ExecutorService es = Executors.newFixedThreadPool(250);
List<CompletableFuture<FieldInfo> futures =
rules.stream().map(r->CompletableFuture.supplyAsync(()-> r.validate(record.getFields()), es)).collect(Collectors.toList());
List<FieldInfo> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return 0;
}
}
The workflow is: User submits a list of records in an xml string format. One of the application endpoint launches the process method in a BusinessRuleService object. The process uses CompletableFuture to compose tasks and submit the tasks to a ExecutorService which has a thread pool of size 100. Each task in the CompletableFuture list then launches ValidationHandler object. The ValidationHandler object composes another CompletableFuture task and submit the task to another ExecutorService which has the pool size the same as the rule list size.
The above solution is proper?
Note: my current solution is: the submitted records are processed in sequence. And the 250 rules are processed in parallel for each record. With this solution, it takes more than 2 hours for 5000 records. Such poor performance is not acceptable by business.
I am very new to concurrent/multi-threading programming. Much appreciate for all kind of helps!
parallel()- tevemadar