Daily Archives: June 15, 2008

Distributed Agents using DSS/VPL

In this post, I explore some ideas to implement distributed agents, leveraging the features from Decentrilized Software Services (DSS) and Visual Programming Language (VPL), included in Microsoft Robotics Developer Studio (I’m working with CTP 2.0 version). You can download the source code from my Skydrive:

AjDssAgents-0.1.zip

In my post:

Web Crawler example using DSS (Decentralized Software Services)

I wrote DSS service components orchestrated from VPL. In that example, there are a Dispatcher, a Resolver, a Downloader, and a Harvester components.

Let’s suppose we have many machines to run the web crawling process. We want to deploy and run MANY downloaders and harvesters, in a grid of machines, using automatic load balancing. The problem with VPL orchestration is that it doesn’t support load balancing out of the box. Then, I wrote an example where the components communicate each other, as agents, using special messages.

An agent, in this example, is a DSS service component, capable of receiving and processes appropiate incoming messages. It can send outgoing messages to other components. Instead of sending a message to one of the other components, an agent specify in the message the type of agent to which the message is forwarded.

Another specialized component, AgentHost, is in charge of receive such outgoing messages, and it forward them to a local or remote agent, according to its type.

The solution

The solution has three projects:

AjDssAgents contains the generic agent contract and types, and the concrete AgentHost implementation.

DecrementAgent and WebCrawler contain simple agents to use in the example. The web crawler code is similar to the implementation described in the post mentioned above.

The message

Agents interchanges messages, objects of type AgentMessage:

[DataContract] public class AgentMessage { [DataMember] public string From { get; set; } [DataMember] public string To { get; set; } [DataMember] public string Action { get; set; } [DataMember] public object Payload { get; set; } }

The From field indicates the origin of the message (I’m not using this field yet). The To field is the physical address (DSS address) of the target agents, or its logical type. In the current example, I’m using only logical types. Why a logical type? If a message has a To with value “WebCrawler/Dispatcher”, it will be forwarder to one agent that has that logical type.

How an agent knows what other agents are running and what are their logical types? It doesn’t. The component that keeps that information is the local singleton AgentHost. Each agent post their outgoing message to the local AgentHost, so, this components selects a target agent and forwards the message to it.

The Agents

Each agent is a DSS service component, with an address assigned when it is created. During the start of the agent, it sends to its local AgentHost a DSS message, indicating its address and its logical type (i.e., WebCrawler/Dispatcher). This is the way an AgentHost knows the agents that are running in its local DssHost process. See the starting code for the Dispatcher agent in WebCrawler example:

 

protected override void Start() { base.Start(); // Add service specific initialization here. _state.AgentType = "WebCrawler/Dispatcher"; host.NewNode newNode = new host.NewNode(new host.AgentInfo() { Address = this.ServiceInfo.Service, AgentType = _state.AgentType }); _hostPort.Post(newNode); }

The type of the agent is keep in its state.

This is a typical code, from Dispatcher agent, showing the treatment of an incoming message and the production of outgoing messages:

 

[ServiceHandler(ServiceHandlerBehavior.Concurrent)] public IEnumerator<ITask> PostMessageHandler(generic.PostMessage postMessage) { if (postMessage.Body.Action.Equals("Dispatch")) Dispatch(postMessage.Body); else if (postMessage.Body.Action.Equals("Resolve")) Resolve(postMessage.Body); postMessage.ResponsePort.Post(DefaultSubmitResponseType.Instance); yield break; } private void Dispatch(AgentMessage msg) { LogInfo("Entering Dispatcher with Action: " + msg.Action); LogInfo("URL: " + msg.Payload); DownloadTarget target = new DownloadTarget(); target.Uri = (string) msg.Payload; target.Depth = 1; AgentMessage postmsg = new AgentMessage() { Action = "Resolve", To = _state.AgentType, Payload = target }; host.PostMessage post = new host.PostMessage(postmsg); _hostPort.Post(post); } private void Resolve(AgentMessage msg) { LogInfo("Entering Dispatcher with Action: " + msg.Action); DownloadTarget downloadtarget = (DownloadTarget)msg.Payload; LogInfo("URL: " + downloadtarget.Uri + ", Depth: " + downloadtarget.Depth); DownloadTarget target = ProcessUrl(downloadtarget); if (target != null) { AgentMessage agentmsg = new AgentMessage() { To = "WebCrawler/Downloader", Action="Download", Payload = downloadtarget }; host.PostMessage postmsg = new host.PostMessage(agentmsg); _hostPort.Post(postmsg); } }

The AgentHost

There is one and only one AgentHost per running DssHost. The AgentHost receives new agent information (address and logical type), and keeps that information in its state.

It receives messages from local agents, and then it forward them to other local agents, or to a remote AgentHost. In the later case, it serialize the payload to a string, using XML serialization (a generic object cannot be send using the DSS generated proxies). This is the structure of a remote message:

 

[DataContract] public class RemoteAgentMessage { [DataMember] public string From { get; set; } [DataMember] public string To { get; set; } [DataMember] public string Action { get; set; } [DataMember] public string PayloadTypeName { get; set; } [DataMember] public string Payload { get; set; } }

Note that the remote message has an string Payload (XML serialization of the original payload), and its qualified type, so the target host can deserialize the payload to reconstruct the original object.

An AgentHost supports subscription. Other AgentHosts can subscribe to new agent informations. In general, if you have three machine, you start one AgentHost in each machine, and subcribe each agent host to the others. In this way, any AgentHost has all the information about the running agents, local and remote ones.

A distributed Web Crawler VPL example

The WebCrawlerVpl2 VPL example contains the diagram:

There are one Dispatcher, two Downloaders and two Harverters agents. The Dispatcher launch the initial URL to crawl, and keeps a list of downloaded URLs. A Downloader reads the content of each page to crawl. A Harvester examines that content and gets new links to download.

Note that they are two AgentHost, and they are related so each one sends new agent information to the other.

All these agents and components are distributed in two nodes:

Windows node will run on localhost:50000/50001, and Windows0 node uses localhost:50002/50003 as address. You can modify these settings, add more agents and nodes, without changing the code.

To run the distributed app, you must compile using the Build -> Compile as a Service option in VPL menu. Note: you must change the settings in VPL properties, now they are pointing to local directories in my machine:

VPL will show the compiling process:

After compiling the VPL example, go to MRDS DOS prompt, change to the bin directory, and launch the rundeployer.cmd:

I run the deployer in my local machine. If you plan to run the example in remote machines, you must start the deployer in each one.

Now, we are ready to run the web crawler. Select the Run -> Run on distributed nodes option, and the application will start. A dialog prompt for the URL to crawl. After entering a valid URL, the process begins to retrieve the pages in the site. You can see the first AgentHost state at:

http://localhost:50000/agenthost

There are three local agents and two remote ones.

In the other side, there is another AgentHost:

http://localhost:50002/agenthost0

See the difference: here are two local nodes, and three remote ones.

To see the advance of the process, point your explorer to

http://localhost:50000/console/output

Conclusions

With these ideas, we can implement grid-alike applications, running in many nodes. We lost the VPL orchestration, we can’t draw the road of the messages. But we gain load balancing and dynamic deploying. With some additional effort, we can write a controlling service, to starts and deploy the system in a new remote machine, even in the middle of a running process. The serialization of arbitrary objects is possible, but with custom serialization.

I could add subscription to messaging, in a future version. That is, an agent could receive some messages that are not for it, according to some subscription criteria. The suscriptions could be kept by the AgentHosts. When an AgentHost route an outcoming message, it could forward it to any interested local or remote agent.

Angel “Java” Lopez
http://www.ajlopez.com/en