I need help in designing multi thread based application which include dynamic url creation and its processing via thread.
I am using a spring scheduler in my application, which is scheduled every 30 seconds. And from that scheduled method I am invoking some service based apis which is in the loop and also in that I need 1 thread pool executor per API with 1 thread processing on it.
As this process is happening from scheduled method, every time new thread pool created, which is the problem here. You can see in the code.
What I want is, if for any of the API, if thread pool is already exists, then that thread should be able to recognize by its own by not creating the new thread pool and start processing.
Let me know, if more info needed.
Any suggestions are welcome and appreciated. Thanks
#API Properties
service.url=http://{0}.abc.net/xyz.php?
service.urls = abc1, abc2, abc3, abc4, abc5
@Service
public class APIServiceImpl implements APIService {
@Autowired
MsgService msgService;
private static final Logger LOGGER = Logger.getLogger(APIServiceImpl.class);
private static Properties fileProperties = PropertyUtility.getfileProperties();
@Scheduled(fixedDelayString = "30000")
public void getServiceMessage() throws ServiceException {
try {
long startTime = System.currentTimeMillis() / 1000L;
long endTime = startTime + 30;
String urlStr = fileProperties.getProperty("service.urls");
String[] urls = urlStr.split(",");
foreach(String url : urls){
serviceApi(url.trim(), startTime, endTime);
}
} catch (Exception e) {
throw new ServiceException(e);
}
}
private void serviceApi(String url, long startTime, long endTime) {
StringBuffer buffer = new StringBuffer();
buffer.append(java.text.MessageFormat.format(fileProperties.getProperty("service.url"), url));
buffer.append("starttime=" + startTime);
buffer.append("&endtime=" + endTime);
ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
executor.submit(new APIThreadHandler(buffer.toString(), messageService));
}
public class APIThreadHandler implements Runnable {
private static final Logger LOGGER = Logger.getLogger(APIThreadHandler.class);
private String url;
MsgService msgService;
public APIThreadHandler(String url, MsgService msgService) {
this.url = url;
this.msgService = msgService;
}
public void run() {
System.out.println("ThreadID: " + Thread.currentThread().getId());
try {
URL srcUrl = new URL(url);
List<Map<?, ?>> data = readObjectsFromCsv(srcUrl);
JSONArray jsonArray = new JSONArray(data);
for (int i = 0; i < jsonArray.length(); i++) {
msgService.sendMessage(jsonArray.getJSONObject(i));
}
} catch (Exception e) {
LOGGER.error("Exception occured - ", e);
}
}
private static List<Map<?, ?>> readObjectsFromCsv(URL url) {
List<Map<?, ?>> listMap = null;
try {
CsvSchema bootstrap = CsvSchema.emptySchema().withHeader();
CsvMapper csvMapper = new CsvMapper();
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(bootstrap).readValues(url);
listMap = mappingIterator.readAll();
} catch (Exception e) {
LOGGER.error("Exception occured - ", e);
}
return listMap;
}