The Right Protocol for Enterprise Applications – introducing RTMP/C – reliable and easy combination of RPC and RTMP

Enterprise (especially financial) applications are different from the Web ones. The formers are distributed beasts sprawling across multiple servers/systems to provide real-time, rich, multi-format data. Here you have two key words – distributed and data. Distributed means unpredictable response time. That is a showstopper for conventional RPC methodology – you need to consider messaging. Also, it means that the client is a part of distributed chain – not all the Web restrictions are applicable, but QoS (Quality of Service), reliability and performance are part of the design.The data means that it comes in variety of formats that might be foreign to the Web server platform or client – some massaging is required.What the developers need is something to absolve them from all the issues of Internet communications and Server infrastructure. Something that works like RemoteObject but is scalable, support server push, is reliable, provides automatic QoS, reconnection of the transport, has better performance then regular RPC. On the logical level, it should deliver results in strongly typed ActionScript objects and should be accomplishable with a couple of lines of the application code. This is just a tip of the iceberg of what an actual enterprise bus communication needs, but it should be good enough for the introduction of the concept.

The implementation shown below is based on FDS 2.0.1. I might have separate entry on implementation with MidnightCoders product or openAMF/Tomcat with non-blocking IO Connector later this Summer.

Step 1. Teach Messaging to understand RPC calls:

This code is a slight variation of the BatchGateway AMF “batch handler” from the book. Optimization is removed for brevity. You create new descendent of RTMPEndpoint class that would watch for message coming through RTMP channel to be “RemoteCall” :

private Object processCall(String destinationName, String methodName, List parameters) throws IllegalAccessException, IllegalArgumentException, Throwable { Object result = null;

initRemoting();

RemotingDestination remotingDestination = (RemotingDestination)srv.getDestinationByName(destinationName);

FactoryInstance factoryInstance = remotingDestination.getFactoryInstance();

Object instance = factoryInstance.lookup();

MethodMatcher methodMatcher = remotingDestination.getMethodMatcher();

Method method = methodMatcher.getMethod(instance.getClass(), methodName, parameters);

try {

result = method.invoke(instance, parameters.toArray());

}finally {

factoryInstance.operationComplete(instance);

}

return result;

}

public Message serviceMessage(Message message) {

if (message is AsyncMessage and message.body is RemoteCall)

RemoteCall bm = (RemoteCall)((AsyncMessage)message).getBody();

try {

Object o = processCall(bm.destinationName, bm.methodName, bm.parameters);

AsyncMessage am = asyncMessage(message);

am.setClientId(/*you need to pass rtmp consumer id here – will add later*/ “1”);

am.setBody(o);

rtmpServer.push(am, am.getClientId() ); }catch(Exception e) {…}else return super.serviceMessage(message);}private AsyncMessage asyncMessage(Message message) { AsyncMessage am = new AsyncMessage();

am.setClientId(message.getClientId());

am.setCorrelationId(message.getMessageId());

am.setMessageId(UUIDUtils.createUUID(false));

am.setDestination(message.getDestination());

return a;

} Now register new RTMPEndpoint in services-config.xml and you can execute POJO calls over RTMP.

Now register new RTMPEndpoint in services-config.xml and you can execute POJO calls over RTMP.

Step 2. Call to the server is implemented with single API in descendant of Producer:

public static function call(rc:RemoteCall, r:Responder = null, noneOneMany:String = “none”) : AsyncToken {

Messsage m = new AsyncMessage(rc);

var t : AsyncToken = new AsyncToken(m);

ConsumerForRPC.calls[m.messageId] = t;

send(m);

return t;

}

Step 3. On the client side, establish Consumer receiving point. You will need just descendent of Consumer object to be connected to the same destiantion so you can start receiving the messages. You will also need to provide Producer/Consumer association to the RTMPEndpoint (see commets in the code) so the responses are sent to the appropriate client.

Here is a piece of code in the consumer that allows you to do RPC from the server to the client:

private static function processMessage(message:IMessage) : void {

if (message.body is RemoteCall) {

var rc : RemoteCall = message.body as RemoteCall;

var obj : Object = destinations[bm.destinationName]; // you register live objects as targets for RPC, factories/safety missing for brevity obj[bm.methodName].apply(obj, [bm.parameters.source]);

}

else {

// standard processing – the data is most likely result from RPC call – see if it is and call the responders:

var corId : String = (message as AsyncMessage).correlationId; var t : AsyncToken = calls[ corId ];

if (t == null) return ;// regular push via messaging

for (var i:int = 0; i < t.responders.length; i++) {

var r:Responder = t.responders[ i ]; r.result(

new ResultEvent(“result”, false, true, message.body, t, message));

}

if (t.runOnce) delete calls[ corId ]; }}}}

}

}

That provides RPC like communications – you register either Responder for result/error or directly call methods on the objects. You also pass through regular messages pushed from server without processing.

Finally, if you decide to code a server RPC response via server side event model you just move the code with the return in the appropriate event handler.

That’s all, folks 😉

PS. I left out QoS and reliability information as every client has different requirements – but essentially you queue the messages and set up headers to provide out-of-band QoS and reliability info.