I am using infinispan as distributed hibernate cache L2, configured using jgroups on AWS. However I am facing a problem on heavy load on the following scenario:
- There are initially 2 EC2 instances available
- Load increase drastically
- Load balance starts 4 EC2 instances
- Load decrease partially
- Load balance reduce 2 EC2 instances
- All instances remaining starts to hold database connections(Hikari pool) until none are available, doing all following requests return error due timeout waiting for free ones.
The remaining instances try to communicate to the old ones and gets no response, holding the connection while waiting for response.
All entities are using READ_WRITE strategy.
Infinispan configuration: org/infinispan/hibernate/cache/commons/builder/infinispan-configs.xml
region.factory_class: org.infinispan.hibernate.cache.commons.InfinispanRegionFactory
Following Jgroups configuration was edited from: org/infinispan/infinispan-core/9.2.0.Final/infinispan-core-9.2.0.Final.jar/default-configs/default-jgroups-tcp.xml
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.0.xsd">
<TCP bind_port="7800"
enable_diagnostics="false"
thread_naming_pattern="pl"
send_buf_size="640k"
sock_conn_timeout="300"
bundler_type="no-bundler"
thread_pool.min_threads="${jgroups.thread_pool.min_threads:50}"
thread_pool.max_threads="${jgroups.thread_pool.max_threads:500}"
thread_pool.keep_alive_time="30000"
/>
<AWS_ELB_PING
region="sa-east-1"
load_balancers_names="elb-name"
/>
<MERGE3 min_interval="5000"
max_interval="30000"
/>
<FD_SOCK />
<FD_ALL timeout="9000"
interval="3000"
timeout_check_interval="1000"
/>
<VERIFY_SUSPECT timeout="5000" />
<pbcast.NAKACK2 use_mcast_xmit="false"
xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
resend_last_seqno="true"
/>
<UNICAST3 xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
conn_expiry_timeout="0"
/>
<pbcast.STABLE stability_delay="500"
desired_avg_gossip="5000"
max_bytes="1M"
/>
<pbcast.GMS print_local_addr="false"
install_view_locally_first="true"
join_timeout="${jgroups.join_timeout:5000}"
/>
<MFC max_credits="2m"
min_threshold="0.40"
/>
<FRAG3/>
</config>
AWS_ELB_PING: This class is a implementation of Discovery class, where uses AWS ELB api to discovery all ips available.
I removed logs and some boilerplate code from code below:
public class AWS_ELB_PING extends Discovery {
private static final String LIST_ELEMENT_SEPARATOR = ",";
static {
ClassConfigurator.addProtocol((short) 790, AWS_ELB_PING.class); // id must be unique
}
private String region;
private String load_balancers_names;
private int bind_port = 7800;
private AmazonElasticLoadBalancing amazonELBClient;
private AmazonEC2 amazonEC2Client;
private List<String> getLoadBalancersNamesList() {
return Arrays.asList(Optional.ofNullable(load_balancers_names).orElse("").split(LIST_ELEMENT_SEPARATOR));
}
@Override
public void init() throws Exception {
super.init();
DefaultAWSCredentialsProviderChain awsCredentialsProviderChain = DefaultAWSCredentialsProviderChain.getInstance();
amazonELBClient = AmazonElasticLoadBalancingClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentialsProviderChain)
.build();
amazonEC2Client = AmazonEC2ClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentialsProviderChain)
.build();
}
@Override
public void discoveryRequestReceived(final Address sender, final String logical_name,
final PhysicalAddress physical_addr) {
super.discoveryRequestReceived(sender, logical_name, physical_addr);
}
@Override
public void findMembers(final List<Address> members, final boolean initialDiscovery, final Responses responses) {
PhysicalAddress physicalAddress = null;
PingData data = null;
if (!use_ip_addrs || !initialDiscovery) {
physicalAddress = (PhysicalAddress) super.down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
data = new PingData(local_addr, false, NameCache.get(local_addr), physicalAddress);
if (members != null && members.size() <= max_members_in_discovery_request) {
data.mbrs(members);
}
}
sendDiscoveryRequests(physicalAddress, data, initialDiscovery, getLoadBalancersInstances());
}
private Set<Instance> getLoadBalancersInstances() {
final List<String> loadBalancerNames = getLoadBalancersNamesList();
final List<LoadBalancerDescription> loadBalancerDescriptions = amazonELBClient
.describeLoadBalancers(new DescribeLoadBalancersRequest().withLoadBalancerNames(loadBalancerNames))
.getLoadBalancerDescriptions();
checkLoadBalancersExists(loadBalancerNames, loadBalancerDescriptions);
final List<String> instanceIds = loadBalancerDescriptions.stream()
.flatMap(loadBalancer -> loadBalancer.getInstances().stream())
.map(instance -> instance.getInstanceId())
.collect(toList());
return amazonEC2Client.describeInstances(new DescribeInstancesRequest().withInstanceIds(instanceIds))
.getReservations()
.stream()
.map(Reservation::getInstances)
.flatMap(List::stream)
.collect(Collectors.toSet());
}
private void checkLoadBalancersExists(final List<String> loadBalancerNames,
final List<LoadBalancerDescription> loadBalancerDescriptions) {
final Set<String> difference = Sets.difference(new HashSet<>(loadBalancerNames),
loadBalancerDescriptions
.stream()
.map(LoadBalancerDescription::getLoadBalancerName)
.collect(Collectors.toSet()));
}
private PhysicalAddress toPhysicalAddress(final Instance instance) {
try {
return new IpAddress(instance.getPrivateIpAddress(), bind_port);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private void sendDiscoveryRequests(@Nullable final PhysicalAddress localAddress, @Nullable final PingData data,
final boolean initialDiscovery, final Set<Instance> instances) {
final PingHeader header = new PingHeader(PingHeader.GET_MBRS_REQ)
.clusterName(cluster_name)
.initialDiscovery(initialDiscovery);
instances.stream()
.map(this::toPhysicalAddress)
.filter(physicalAddress -> !physicalAddress.equals(localAddress))
.forEach(physicalAddress -> sendDiscoveryRequest(data, header, physicalAddress));
}
private void sendDiscoveryRequest(@Nullable final PingData data, final PingHeader header,
final PhysicalAddress destinationAddress) {
final Message message = new Message(destinationAddress)
.setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE, Message.Flag.OOB)
.putHeader(this.id, header);
if (data != null) {
message.setBuffer(marshal(data));
}
if (async_discovery_use_separate_thread_per_request) {
timer.execute(() -> sendDiscoveryRequest(message), sends_can_block);
} else {
sendDiscoveryRequest(message);
}
}
protected void sendDiscoveryRequest(final Message message) {
try {
super.down(message);
} catch (final Throwable t) {
}
}
@Override
public boolean isDynamic() {
return true;
}
@Override
public void stop() {
try {
if (amazonEC2Client != null) {
amazonEC2Client.shutdown();
}
if (amazonELBClient != null) {
amazonELBClient.shutdown();
}
} catch (final Exception e) {
} finally {
super.stop();
}
}
}
Someone already faced this kind of problem ?