2
votes

I want to serialize 'Message' object, I can successfully transfer it as bytes array through socketChannel. After that, I change the object's properties (so that it may have larger size), and then there's a problem in sending object back to the client. Once I try to obtain the object on the client side, I get an exception, it occurs when I deserealize Message obj in getResponse() method:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

But, somehow, this applies only for the first client (After the exception is thrown, connection with the first client is over) and when I start a new client (not closing server) I can successfully transfer the object back and forth, furthermore, it works for any new clients.

This is my minimal debuggable version:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request, person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
    }

    private void sendResponse(SocketChannel client, Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

I try to send this object as a bytes array:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command, Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",
                        command, person, result)
                .toString();
    }
}

I include instance of this class inside Message obj:

public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

I have no idea what is going wrong, hope for your help.

UPD: I used 'org.apache.commons:commons-lang3:3.5' dependency to serialize an object into bytes array

1
I see no minimal debuggable version here: Person and Message are missing, field server in class Client is missing, field controller in class ServerTerminal is missing. I also cannot see any main method(s) showing how you start client and server. This is why you did not receive any answers so far, despite the bounty.kriegaex
They matter because without them your sample code does not compile, without compilation I cannot run it and without running it I cannot help you debug it. You called your code a minimal debuggable sample, but unfortunately that is a false statement. The sample is smaller than minimal.kriegaex
Look, I want to help you here. So please take my advice and learn how to ask a good question by means of an MCVE. I am sure if your problem was reproducible, you would get the answer you are looking for quickly. But for now your sample classes are just pseudo code. Even without regard of the missing classes, the code uses two fields which are not present in the class. You are deliberately hiding information because you think you know where the problem is. How can you be so sure? And why make it more difficult to reproduce the problem?kriegaex
Ah okay, in your sample code the buffer sizes for client and server are different. If I use 64M or 1M on both sides, I can reproduce the problem. I will be busy for the rest of the evening, but I guess tomorrow I can take a look.kriegaex
You would be far better off losing the SocketChannel and the Apache stuff and just using Sockets and ObjectInput/OutputStreams directly. Then at least you can see exacftly what's going on. It's near enough to impossible to combine non-blocking mode with Serialization that it doesn't matter. I wouldn't attempt it and I'm a 24-year Java programmer.user207421

1 Answers

1
votes

I have never used Java NIO channels before, so I am not an expert. But I found out several things:

General:

  • In order to debug your code, it is helpful to use e.printStackTrace() instead of just System.out.println(e.getMessage()).

Client:

  • SocketChannel server in the client should be configured as blocking, otherwise it might read 0 bytes because there is no server response yet, which causes your problem.
  • You should always call ByteBuffer.clear() before reading something, not afterwards.
  • After reading, the position in the byte buffer has to be reset to 0 via responseBuffer.position(0) before calling get(byte[]), otherwise it will read undefined bytes after the ones just read.
  • You should size your byte arrays according to the number of bytes read, not the byte buffer size. It might work the other way around, but it is inefficient.

Server:

  • You should always call ByteBuffer.clear() before reading something, not afterwards.
  • After reading, the position in the byte buffer has to be reset to 0 via responseBuffer.position(0) before calling get(byte[]), otherwise it will read undefined bytes after the ones just read.
  • When catching exceptions during getRequest(key) calls, you should close the corresponding channel, otherwise after a client disconnects the server will indefinitely try to read from it, spamming your console log with error messages. My modification handles that case and also prints a nice log message telling which client (remote socket address) was closed.

Caveat: There is nothing in your code dealing with the situation that a request or response written into the channel on the one side is bigger than the maximum ByteBuffer size on the other side. Similarly, in theory a (de)serialised byte[] could also end up being bigger than the byte buffer.

Here are my diffs:

Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
   public void start() throws IOException {
     try {
       server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
-      server.configureBlocking(false);
+      server.configureBlocking(true);
     }
     catch (IOException e) {
       System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
       getResponse();
     }
     catch (Exception e) {
-      System.out.println(e.getMessage());
+      e.printStackTrace();
+//      System.out.println(e.getMessage());
       System.err.println("Connection lost");
       System.exit(0);
     }
   }
 
   public void getResponse() throws Exception {
-    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+    responseBuffer.clear();
 
     int read = server.read(responseBuffer);
-    responseBuffer.clear();
     if (read == -1) {
-      throw new Exception();
+      throw new Exception("EOF, cannot read server response");
     }
 
-    byte[] bytes = new byte[responseBuffer.limit()];
+    byte[] bytes = new byte[read];
+    responseBuffer.position(0);
     responseBuffer.get(bytes);
 
     Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
             getRequest(key);
           }
           catch (Exception e) {
-            System.err.println(e.getMessage());
+            e.printStackTrace();
+//            System.err.println(e.getMessage());
+            SocketChannel client = (SocketChannel) key.channel();
+            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+            client.close();
           }
         }
         iter.remove();
@@ -45,15 +49,16 @@
 
   private void getRequest(SelectionKey key) throws Exception {
     SocketChannel client = (SocketChannel) key.channel();
-    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+    requestBuffer.clear();
     int read = client.read(requestBuffer);
-    requestBuffer.clear();
     if (read == -1) {
       key.cancel();
       throw new Exception("Client disconnected at: " +
         ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
     }
-    byte[] bytes = new byte[requestBuffer.limit()];
+    byte[] bytes = new byte[read];
+    requestBuffer.position(0);
     requestBuffer.get(bytes);
     Message message = SerializationUtils.deserialize(bytes);
     sendResponse(client, message);

Just for completeness' sake, here are the full classes after I changed them:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

  private SocketChannel server;

  public void start() throws IOException {
    try {
      server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
      server.configureBlocking(true);
    }
    catch (IOException e) {
      System.err.println("Server isn't responding");
      System.exit(0);
    }

    Scanner scRequest = new Scanner(System.in);
    Scanner scState = new Scanner(System.in);

    System.out.println("Enter request:");
    String request = scRequest.nextLine();

    while (!request.equals("exit")) {
      try {
        // In my actual project class Person is a way different (But it's still a POJO)
        // I included it here to make sure I can get it back after sending to the server
        System.out.println("Enter a number:");
        Person person = new Person(scState.nextInt());
        sendRequest(request, person);

        System.out.println("\nEnter request:");
        request = scRequest.nextLine();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    }

    stop();
  }

  public void sendRequest(String sMessage, Person person) {
    Message message = new Message(sMessage, person);
    ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    try {
      server.write(requestBuffer);
      requestBuffer.clear();
      getResponse();
    }
    catch (Exception e) {
      e.printStackTrace();
//      System.out.println(e.getMessage());
      System.err.println("Connection lost");
      System.exit(0);
    }
  }

  public void getResponse() throws Exception {
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    responseBuffer.clear();

    int read = server.read(responseBuffer);
    if (read == -1) {
      throw new Exception("EOF, cannot read server response");
    }

    byte[] bytes = new byte[read];
    responseBuffer.position(0);
    responseBuffer.get(bytes);

    Message message = SerializationUtils.deserialize(bytes);
    System.out.println(message);
  }

  public void stop() throws IOException {
    server.close();
  }

  public static void main(String[] args) throws IOException {
    Client client = new Client();
    client.start();
  }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
  public void start() throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost", 5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server started");

    while (true) {
      selector.select();
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = iter.next();
        if (key.isAcceptable()) {
          register(selector, serverSocket);
        }
        if (key.isReadable()) {
          try {
            getRequest(key);
          }
          catch (Exception e) {
            e.printStackTrace();
//            System.err.println(e.getMessage());
            SocketChannel client = (SocketChannel) key.channel();
            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
            client.close();
          }
        }
        iter.remove();
      }
    }
  }

  private void getRequest(SelectionKey key) throws Exception {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    requestBuffer.clear();
    int read = client.read(requestBuffer);
    if (read == -1) {
      key.cancel();
      throw new Exception("Client disconnected at: " +
        ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
    }
    byte[] bytes = new byte[read];
    requestBuffer.position(0);
    requestBuffer.get(bytes);
    Message message = SerializationUtils.deserialize(bytes);
    sendResponse(client, message);
  }

  private void sendResponse(SocketChannel client, Message message) throws IOException {
    message.setResult("Some result");
    ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    while (responseBuffer.hasRemaining()) {
      client.write(responseBuffer);
    }
    responseBuffer.clear();
  }

  private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
    System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
  }

  public static void main(String[] args) throws Exception {
    new Server().start();
  }
}