1
votes

I have a spring batch job which trigger a hadoop job. Please see below my job configuration. When I launch the job I get the following exception...

Configuration:

<hdp:job id="mr-my-job" 
        input-path="/data/input/"
        output-path="/data/output/"
        jar-by-class="org.test.Main" 
        mapper="org.test.Test1$Map"
        combiner="org.test.Test1$Combiner"
        reducer="org.test.Test1$ReduceFromCombiner"
        number-reducers="7"
        scope="step" />

Exception:

org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'mr-my-job' must be of type [org.apache.hadoop.mapreduce.Job], but was actually of type [$Proxy5]
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:375)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
        at org.springframework.data.hadoop.mapreduce.JobExecutor.findJobs(JobExecutor.java:254)
        at org.springframework.data.hadoop.mapreduce.JobExecutor.startJobs(JobExecutor.java:166)
        at org.springframework.data.hadoop.batch.mapreduce.JobTasklet.execute(JobTasklet.java:90)
        at org.springframework.data.hadoop.batch.mapreduce.JobTasklet$$FastClassBySpringCGLIB$$4805a065.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:717)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
        at org.springframework.data.hadoop.batch.mapreduce.JobTasklet$$EnhancerBySpringCGLIB$$3be21557.execute(<generated>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at $Proxy2.execute(Unknown Source)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)

I added CGLIB 3.1 into classpath and added <bean class="org.springframework.batch.core.scope.StepScope" p:proxyTargetClass="true" /> in my XML but still getting same error.


I debugged and found that exception is thrown at below line in JobExecutor class while converting bean of type org.springframework.aop.framework.JdkDynamicAopProxy@b57ceeff to org.springframework.aop.framework.JdkDynamicAopProxy@b57ceeff. I think auto proxy from CGLIB is not getting applied here because it is still using JDK proxy.

JobExecutor class:

for (String name : jobNames) {
    js.add(beanFactory.getBean(name, Job.class));
}

AbstractBeanFactory class

return getTypeConverter().convertIfNecessary(bean, requiredType);

Please advice.

1
probably somewhere (in Tasklet, I suppose) you are wiring a concrete Hadoop job class class and not an interface. check your codeLuca Basso Ricci
Thanks @LucaBassoRicci for the response. I am putting my job configuration in original post. Can you please check where am I doing wrong?user3222372
remove scope="step" from "my-mr-job"Luca Basso Ricci

1 Answers

0
votes

Thanks Luca Basso Ricci. Remove scope="step" from "my-mr-job" worked.