I need to test pubsub subscriber with callback function using Pub/Sub Emulator. There are several examples with
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(1)
.setReturnImmediately(true) // return immediately if messages are not available
.setSubscription(subscription.getName())
.build();
But I need to implement test for MessageReceiver
I try with following code
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.*;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.*;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TTest {
private static final Logger log = LoggerFactory.getLogger(TTest.class);
private static String projectId = "test-project";
private static String topicId = "demo";
private static String subscriptionId = "demo";
private static String testSubscriptionId = "test";
private static final String hostPort = "127.0.0.1:8085";
private static ManagedChannel channel;
private static TransportChannelProvider channelProvider;
private static TopicAdminClient topicAdmin;
private static CredentialsProvider credentialsProvider;
private static Publisher publisher;
private static SubscriptionAdminClient subscriptionAdminClient;
private static ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
private static ProjectSubscriptionName testSubscriptionName = ProjectSubscriptionName.of(projectId, testSubscriptionId);
private static SubscriberStub subscriberStub;
@BeforeClass
public static void create() throws Exception {
channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
credentialsProvider = NoCredentialsProvider.create();
topicAdmin = TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build()
);
topicAdmin.createTopic(topicName);
publisher = Publisher.newBuilder(topicName)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
subscriptionAdminClient = SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
.setTransportChannelProvider(channelProvider)
.build());
subscriberStub = GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
subscriptionAdminClient.createSubscription(subscriptionName, topicName,
PushConfig.getDefaultInstance(), 120);
subscriptionAdminClient.createSubscription(ProjectSubscriptionName.of(projectId, "test"), topicName,
PushConfig.getDefaultInstance(), 120);
}
@Test
public void test() {
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Message")).build();
publisher.publish(pubsubMessage);
StringBuilder receivedMessage = new StringBuilder("");
TestPubSubMessageReceiver receiver = new TestPubSubMessageReceiver(receivedMessage);
Thread t = new Thread() {
@Override
public void run() {
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setChannelProvider(channelProvider).
setCredentialsProvider(credentialsProvider).build();
subscriber.awaitRunning();
}
};
Executors.newSingleThreadExecutor().execute(t);
PullResponse pullResponse = null;
int messageCount = 0;
while (messageCount == 0) {
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(1)
.setReturnImmediately(true) // return immediately if messages are not available
.setSubscription(testSubscriptionName.toString())
.build();
pullResponse = subscriberStub.pullCallable().call(pullRequest);
messageCount = pullResponse.getReceivedMessagesCount();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String testMessage = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Assert.assertEquals(testMessage, receivedMessage);
}
@AfterClass
public static void delete() throws IOException {
topicAdmin.deleteTopic(topicName);
subscriptionAdminClient.deleteSubscription(subscriptionName);
subscriptionAdminClient.deleteSubscription(testSubscriptionName);
channel.shutdownNow();
}
private class TestPubSubMessageReceiver implements MessageReceiver {
StringBuilder receivedMessage;
public TestPubSubMessageReceiver(StringBuilder receivedMessage) {
this.receivedMessage = receivedMessage;
}
@Override
public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
String message = pubsubMessage.getData().toStringUtf8();
log.info("Received : " + message);
receivedMessage.append(message);
}
}
}
Messages publish to topic properly but no any message receive to receive Message method. What I need to do.
Thank you.
EDIT 1
I ran the program with debug mode. Then I had following logs.
2020-07-08 21:15:29 DEBUG [grpc-nio-worker-ELG-1-3] i.g.n.s.i.g.netty.NettyClientHandler.log():214 - [id: 0x20545e3a, L:/127.0.0.1:10718 - R:/127.0.0.1:8085] OUTBOUND HEADERS: streamId=9 headers=GrpcHttp2OutboundHeaders[:authority: 127.0.0.1:8085, :path: /google.pubsub.v1.Subscriber/Pull, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.28.1, grpc-accept-encoding: gzip, grpc-timeout: 24999280u] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
2020-07-08 21:15:29 DEBUG [grpc-nio-worker-ELG-1-3] i.g.n.s.i.g.netty.NettyClientHandler.log():214 - [id: 0x20545e3a, L:/127.0.0.1:10718 - R:/127.0.0.1:8085] OUTBOUND DATA: streamId=9 padding=0 endStream=true length=53 bytes=00000000300a2a70726f6a656374732f73746c2d63617264696f2d6465762f737562736372697074696f6e732f7465737410011801
2020-07-08 21:15:29 DEBUG [grpc-nio-worker-ELG-1-3] i.g.n.s.i.g.netty.NettyClientHandler.log():214 - [id: 0x20545e3a, L:/127.0.0.1:10718 - R:/127.0.0.1:8085] OUTBOUND HEADERS: streamId=11 headers=GrpcHttp2OutboundHeaders[:authority: 127.0.0.1:8085, :path: /google.pubsub.v1.Publisher/Publish, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.28.1, grpc-accept-encoding: gzip, grpc-timeout: 4999823u] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
2020-07-08 21:15:29 DEBUG [grpc-nio-worker-ELG-1-3] i.g.n.s.i.g.netty.NettyClientHandler.log():214 - [id: 0x20545e3a, L:/127.0.0.1:10718 - R:/127.0.0.1:8085] OUTBOUND DATA: streamId=11 padding=0 endStream=true length=53
Also I connected asynchronous subscriber and PullRequest pullRequest = PullRequest.newBuilder() to same subscription Id PullRequest didn't receive any message. But when they connected to different subscription Ids related to same topic PullRequest receive message. Is asynchronous subscriber listen message but don't receive to call back or any other thing ?
EDIT 2
After lots of debugging I found this is a OS dependent issue. I ran test with following OS configurations
After lots of debugging I found this issue OS dependent. I try following OS combinations.
- Emulator - Windows 10 and Test Program - Windows 10 => Not working
- Emulator - Linux and Test Program - Windows 10 => Not working
- Emulator - Linux and Test Program - Linux => Working
- Emulator - Windows and Test Program - Linux => Working
That means Test program not working in windows 10. But working with GCP using client credentials. Is this client library issue? Pubsub client version : 1.104.1
EDIT 3
Emulator logs in windows terminal
pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Jul 09, 2020 11:50:31 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Jul 09, 2020 11:50:32 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Jul 09, 2020 11:50:33 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Jul 09, 2020 11:53:03 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Jul 09, 2020 11:53:03 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Jul 09, 2020 11:53:05 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Jul 09, 2020 11:53:05 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
Linux terminal logs also same to this.