0
votes

I want to use write behind strategy of Apache Ignite with my Spring boot Application with mysql database. I want to write cache data to mysql database with write behind asynchronously. I am new with Apache Ignite and I am referring docs from https://apacheignite.readme.io/docs/.

Below is my Spring boot configuration for using write behind strategy of Apache Ignite.

    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setIgniteInstanceName("ignite-1");
        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache");
        ccfg2.setIndexedTypes(Long.class, Contact.class);
        ccfg2.setWriteBehindEnabled(true);
        ccfg2.setWriteBehindFlushFrequency(1000);
        ccfg2.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setBackups(1);
        ccfg2.setWriteBehindFlushSize(0);
        ccfg2.setWriteBehindFlushThreadCount(1);
        ccfg2.setWriteBehindBatchSize(1);
        //ccfg2.setReadThrough(true);
        //ccfg2.setWriteThrough(true);

        // Memory Configuration
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        DataRegionConfiguration defaultDataRegionCfg = storageCfg.getDefaultDataRegionConfiguration();
        defaultDataRegionCfg.setPersistenceEnabled(false);   // Only Memory
        defaultDataRegionCfg.setMaxSize(1024 * 1024 * 256);  // 256MB
        defaultDataRegionCfg.setMetricsEnabled(true);
        cfg.setDataStorageConfiguration(storageCfg);


        CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>();
        f2.setDataSource(datasource);
        f2.setDialect(new MySQLDialect());
        JdbcType jdbcContactType = new JdbcType();
        jdbcContactType.setCacheName("ContactCache");
        jdbcContactType.setKeyType(Long.class);
        jdbcContactType.setValueType(Contact.class);
        jdbcContactType.setDatabaseTable("contact");
        jdbcContactType.setDatabaseSchema("demo");
        jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"),
                new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"),
                new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
        f2.setTypes(jdbcContactType);
        ccfg2.setCacheStoreFactory(f2);

         TcpCommunicationSpi spi = new  TcpCommunicationSpi();
         spi.setMessageQueueLimit(1000);

        CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
        ccfg.setIndexedTypes(Long.class, Person.class);
        ccfg.setWriteBehindEnabled(true);
        ccfg.setWriteBehindFlushFrequency(2000);
        ccfg.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
        ccfg.setWriteBehindFlushSize(0);
        ccfg.setWriteBehindFlushThreadCount(10);
        ccfg.setWriteBehindBatchSize(10);
        ccfg.setBackups(1);
        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
//      ccfg.setDataRegionName(storageCfg);
//      ccfg.setReadThrough(true);
//      ccfg.setWriteThrough(true);
        CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
        f.setDataSource(datasource);
        f.setDialect(new MySQLDialect());
        JdbcType jdbcType = new JdbcType();
        jdbcType.setCacheName("PersonCache");
        jdbcType.setKeyType(Long.class);
        jdbcType.setValueType(Person.class);
        jdbcType.setDatabaseTable("person");
        jdbcType.setDatabaseSchema("demo");
        jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"),
                new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"),
                new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"),
                new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"),
                new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"),
                new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"),
                new JdbcTypeField(Types.VARCHAR, "birth_date", String.class, "birthDate"));
        f.setTypes(jdbcType);
        ccfg.setCacheStoreFactory(f);

        Ignite ignite = Ignition.start(cfg);
        ignite.cluster().active(true);
        return ignite;
    }

With this configuration, If I set write through to true then data will successfully inserted to database. But write behind is not working with this configuration. I have refer https://apacheignite.readme.io/v2.7/docs/3rd-party-store page for write-behind configuration but I think it is not working. Please let me know if I miss something as I am newbie with Ignite.

My Repostiary code -

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {

    List<Person> findByFirstNameAndLastName(String firstName, String lastName);

    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<Contact> selectContacts(String firstName, String lastName);

    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<List<?>> selectContacts2(String firstName, String lastName);
}

Model class -

@QueryGroupIndex.List(
        @QueryGroupIndex(name="idx1")
) 
public class Person implements Serializable {

    private static final long serialVersionUID = -1271194616130404625L;
    private static final AtomicLong ID_GEN = new AtomicLong();

    @QuerySqlField(index = true)
    private Long id;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 0) 
    private String firstName;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 1) 
    private String lastName;
    private Gender gender;
    private Date birthDate;
    private String country;
    private String city;
    private String address;
    @Transient
    private List<Contact> contacts = new ArrayList<>();

    public void init() {
        this.id = ID_GEN.incrementAndGet();
    }

Controller class -

@RestController
@RequestMapping("/person")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @Autowired
    PersonRepository repository;

    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }

    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }

    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }

    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }

If anyone want to access full code base then It is available on - https://github.com/gotidhavalh/ignite-rest-service

1
What do you mean by "not working"? Entries never get inserted if you enable write behind? Please note you have to enable both write through and write behind for this to work. - alamar
Thanks @alamar for response. By "not working", I mean that entries are not inserted/updated in database after certain time. I have set this time 1minutes. I have tried with enabling write through and write behind both but entries are not updated in database with this configuration. - Dhaval Goti
Reproducer does not start with the following cryptic exception: org.springframework.data.mapping.PropertyReferenceException: No property deleteAll found for type Flight! Either way, you definitely need to have writeThrough enabled. - alamar
Ok. Thanks for this. - Dhaval Goti

1 Answers

1
votes

You need to enable both

    ccfg.setWriteThrough(true);

and

    ccfg.setWriteBehindEnabled(true);

for the Write Behind to work. Please also make sure to check its flush timeout (5s by default).