1
votes

I am working on a part of a Mule ESB application and the issue I am having is very analogous to a chat room.

The chat room (server, Mule ESB) can listen for many chatters (clients, external applications) and when a single chatter (c1) inputs a message, the server echos it to all the other chatters, except for c1. In my case though, I would need to also echo it to c1.

I am having a hard time wrapping my mind around some detailed aspects of how the message processing works in Mule ESB. I read the documentation about transports and believe I may need to work with Message Receivers and Message Dispatchers.

Consider the following configuration:

<mule ...>
       <tcp:connector name="tcpConn" doc:name="TCP connector" clientSoTimeout="0" keepAlive="true" keepSendSocketOpen="true" receiveBacklog="0" receiveBufferSize="0" reuseAddress="true" sendBufferSize="0" serverSoTimeout="0" socketSoLinger="0" validateConnections="true">
            <reconnect-forever />
            <tcp:direct-protocol payloadOnly="true"/>
        </tcp:connector>
        </spring:beans>
        <tcp:endpoint exchange-pattern="request-response" host="localhost" port="8090" name="ListenEndpoint" responseTimeout="10000" doc:name="TCP"/>
        <flow name="ChatroomExampleFlow1" doc:name="ChatroomExampleFlow1">
            <tcp:inbound-endpoint exchange-pattern="request-response" ref="ListenEndpoint" responseTimeout="10000" doc:name="Client Message"/>
            <logger level="INFO" doc:name="Logger"/>
            <all doc:name="All">
                <tcp:outbound-endpoint host="${client1.host}" port="${client1.port}" responseTimeout="10000" doc:name="TCP"/>
                <tcp:outbound-endpoint host="${client2.host}" port="${client2.port}" responseTimeout="10000" doc:name="TCP"/>
                <tcp:outbound-endpoint host="${client3.host}" port="${client3.port}" responseTimeout="10000" doc:name="TCP"/>
            </all>

        </flow>
</mule>

If I know at configuration time the host and port of all clients, then this configuration holds. However, my goal is to be able to send out the messages to clients that have established a connection, which could be up to n clients. As more clients connect/disconnect, I would need to add or remove them from the list.

There exists a <recipient-list expression="#[app.registry.remoteEndpoints]"/> where the expression should evaluate to a List of endpoints and the flow will send the message to every endpoint in the collection. However, for my purposes this does not work (see answer below). See the documentation for recipient list

<mule ...>
    <tcp:connector name="tcpConn" doc:name="TCP connector" clientSoTimeout="0" keepAlive="true" keepSendSocketOpen="true" receiveBacklog="0" receiveBufferSize="0" reuseAddress="true" sendBufferSize="0" serverSoTimeout="0" socketSoLinger="0" validateConnections="true">
        <reconnect-forever />
        <tcp:direct-protocol payloadOnly="true"/>
    </tcp:connector>
    <spring:beans>
        <spring:bean name="remoteEndpoints" scope="singleton" class="java.util.ArrayList"/>
    </spring:beans>
    <tcp:endpoint  host="localhost" exchange-pattern="one-way" port="8090" name="ListenEndpoint" responseTimeout="10000" doc:name="TCP"/>
    <flow name="ChatroomExampleFlow1" doc:name="ChatroomExampleFlow1">
        <tcp:inbound-endpoint  ref="ListenEndpoint" responseTimeout="10000" doc:name="Client Message"/>
        <echo-component doc:name="Echo"/>
        <component class="EndpointStorer" doc:name="Java"/>
        <recipient-list expression="#[app.registry.remoteEndpoints]"/>
    </flow>
</mule>

And with EndpointStorer looking like this:

public Object onCall(MuleEventContext eventContext) throws Exception
    {
        Registry registry = eventContext.getMuleContext().getRegistry();


        MuleMessage message = eventContext.getMessage();
        ArrayList arrList = (ArrayList) registry.get("remoteEndpoints"); 
        Object remoteAddress = message.getOutboundProperty("MULE_REMOTE_CLIENT_ADDRESS");

        String tcpAddr = "tcp://" + remoteAddress.toString().substring(1); //substring 1 because we see the remote client address as something like /127.0.0.1:4943
        if (!arrList.contains(tcpAddr))
        {
            arrList.add(tcpAddr);
        }

        return message;
    }
1
Why does the connection to tcp://127.0.0.1:3866 fail? Is there a server listening there?David Dossot
There is no server listening there. The external clients connect to the Mule instance using an OS-assigned port (I think they're called ephemeral ports). My goal is to send out the messages via the very same pipe that was created originally. In essence, I need to have a single socket to both send and receive messages to/from Mule.Randy
Kind of like a request-response scenario, but we "respond" to all the connected clients on the TCP inbound endpoint!Randy
I understand now. I will delete my response because it is incorrect. You can't reach established inbound TCP connections, at least not using TCP outbound endpoints. It may be possible to extend the TCP connector to expose the Socket in github.com/mulesoft/mule/blob/mule-3.x/transports/tcp/src/main/… but it's no small feat.David Dossot
I was hoping to avoid having to extend Mule :( If you'd like to put that as an answer, I can accept it. Thanks a lot for your time!Randy

1 Answers

0
votes

You can't reach established inbound TCP connections, at least not using TCP outbound endpoints. It may be possible to extend the TCP connector to expose the Socket in http://github.com/mulesoft/mule/blob/mule-3.x/transports/tcp/src/main/… but it's no small feat.

You could consider using the AJAX transport.