Goal: I want to code a microservice exposing an endpoint receiving and responding a message with repeated. I tried apply what I learned from Proto official guide and I coded this proto:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
import "google/protobuf/wrappers.proto";
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
I could gradle build and I got this TransactionsServiceGrpcKt autogenerated
package com.mybank.endpoint
import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic
/**
* Holder for Kotlin coroutine-based client and server APIs for
* com.mybank.endpoint.TransactionsService.
*/
object TransactionsServiceGrpcKt {
@JvmStatic
val serviceDescriptor: ServiceDescriptor
get() = TransactionsServiceGrpc.getServiceDescriptor()
val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
@JvmStatic
get() = TransactionsServiceGrpc.getPostTransactionsMethod()
/**
* A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
* coroutines.
*/
@StubFor(TransactionsServiceGrpc::class)
class TransactionsServiceCoroutineStub @JvmOverloads constructor(
channel: Channel,
callOptions: CallOptions = DEFAULT
) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
= TransactionsServiceCoroutineStub(channel, callOptions)
/**
* Executes this RPC and returns the response message, suspending until the RPC completes
* with [`Status.OK`][io.grpc.Status]. If the RPC completes with another status, a
* corresponding
* [StatusException] is thrown. If this coroutine is cancelled, the RPC is also cancelled
* with the corresponding exception as a cause.
*
* @param request The request message to send to the server.
*
* @return The single response from the server.
*/
suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(
channel,
TransactionsServiceGrpc.getPostTransactionsMethod(),
request,
callOptions,
Metadata()
)}
/**
* Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
* coroutines.
*/
abstract class TransactionsServiceCoroutineImplBase(
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractCoroutineServerImpl(coroutineContext) {
/**
* Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
*
* If this method fails with a [StatusException], the RPC will fail with the corresponding
* [io.grpc.Status]. If this method fails with a [java.util.concurrent.CancellationException],
* the RPC will fail
* with status `Status.CANCELLED`. If this method fails for any other reason, the RPC will
* fail with `Status.UNKNOWN` with the exception as a cause.
*
* @param request The request from the client.
*/
open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))
final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
.addMethod(unaryServerMethodDefinition(
context = this.context,
descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
implementation = ::postTransactions
)).build()
}
}
So far so good. Now I want to implement it and I am completed stuck.
Here is all three tentatives and its errors
package com.mybank.endpoint
import io.grpc.stub.StreamObserver
import javax.inject.Singleton
@Singleton
class TransactionsEndpoint : TransactionsServiceGrpc.TransactionsServiceImplBase(){
//First tentative
//This complains "'postTransactions' overrides nothing" and IntelliJ suggest second next approach
//override fun postTransactions(request: TransactionsRequest?) : TransactionsReply {
//Second Tentative
// override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
// //it complains Type mismatch... Found: TransactionsReply
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
//Third Tentative
//This causes:
//Return type is 'TransactionsReply', which is not a subtype of overridden public open
// fun postTransactions(request: TransactionsRequest!, responseObserver: StreamObserver<TransactionsReply!>!):
// Unit defined in com.mybank.endpoint.TransactionsServiceGrpc.TransactionsServiceImplBase
//override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) : TransactionsReply {
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
}
gradle.build
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
I have successfully created my first grpc endpoint with very basic request/reply based on String and now I want to move forward by creating a list of a message. As an analogy, let's say I want a DTO/Pojo which contains a list of an entity.
Honestly, I am totally stuck. So, my main question is: how implement a proto service with repeated message?
An useful comment that can give a north is, why I see in autogenerated stub a method to be implemented with "..., responseObserver: StreamObserver?" instead of a simple "TransactionsReply" as I clearly specify in my proto? What relationship between StreamObserver and repeated message?
Here is the whole project in my GitHub develop branch
You will find two protos: one well successful implement with simple request/reply and other failing as explained above.
*** edited after first answer from Louis
I am quite confused. With a simple proto as
...
service Account {
rpc SendDebit (DebitRequest) returns (DebitReply) {}
}
message DebitRequest {
string name = 1;
}
message DebitReply {
string message = 1;
}
I can implemented with
override suspend fun sendDebit(request: DebitRequest): DebitReply {
return DebitReply.newBuilder().setMessage("teste").build()
}
Nevertheless, with
...
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
I can't override with same type of response (note that the reply is exactly the same)
neither this implementation proposed from IntelliJ
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
nor this with similar response approach
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) :TransactionsReply {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
On top of that, why if the reply is exactly the same, in my first approach I didn't get StreamObserver proposed from IntelliJ?
*** Final solution thanks to Louis' help. I extended wrong abstract class
override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
return TransactionsReply.newBuilder().setMessage("testReply").build()
}