Introduction
The example demonstrates a communication scenario where the capacity of one running service is not enough and you want to increase the performance by distributing the workload across multiple services.
To simulate the scenario, the example below implements a client application using a service to calculate π (PI) number. The calculation is split into many small intervals (400 in our example).
Therefore, the service gets overloaded and the performance drops down.
The solution for such scenarios could be using of load balancer. The load balancer is a component maintaining a farm of services (typically same services but running on different machines) and is exposed to receive messages instead of the real service. The client does not know about this setup and thinks it communicates directly with the service. When the client sends a request, the load balancer receives this message and forwards it to the service picked from the farm. The service processes the message and sends back the response that is routed via the load balancer back to the client.
The example can be downloaded from http://eneter.net/Downloads/Examples/LoadBalancing.zip.
To run the example applications must be executed in the following order:
1. PiCalculator up to 3 instances.
2. LoadBalancer
3. Client
The example bellow uses Eneter Messaging Framework that provides functionality for various communication scenarios.
(The framework is free for non-commercial use (License) and can be downloaded from http://www.eneter.net.
More detailed technical info can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html.)
Client
The client is a simple application containing a button sending the request to the service to calculate π (PI) number. The calculation is split into many small intervals. Therefore, the client sends about 400 messages each containing a small interval for the calculation.
Then it collects all responses and summarizes them together to get the final result, the PI number.
using System; using System.Windows.Forms; using Eneter.Messaging.EndPoints.TypedMessages; using Eneter.Messaging.MessagingSystems.MessagingSystemBase; using Eneter.Messaging.MessagingSystems.TcpMessagingSystem; namespace Client { public partial class Form1 : Form { // Requested calculation range. public class Range { public double From; public double To; } public Form1() { InitializeComponent(); OpenConnection(); } public void OpenConnection() { // Create TCP messaging for the communication. // Note: Requests are sent to the balancer that will forward them // to available services. IMessagingSystemFactory myMessaging = new TcpMessagingSystemFactory(); IDuplexOutputChannel anOutputChannel = myMessaging.CreateDuplexOutputChannel("tcp://127.0.0.1:8060/"); // Create sender to send requests. IDuplexTypedMessagesFactory aSenderFactory = new DuplexTypedMessagesFactory(); mySender = aSenderFactory.CreateDuplexTypedMessageSender<double, Range>(); // Subscribe to receive response messages. mySender.ResponseReceived += OnResponseReceived; // Attach the output channel and be able to send messages and receive responses. mySender.AttachDuplexOutputChannel(anOutputChannel); } private void Form1_FormClosed(object sender, FormClosedEventArgs e) { // Detach the output channel and stop listening for responses. mySender.DetachDuplexOutputChannel(); } private void CalculatePiBtn_Click(object sender, EventArgs e) { myCalculatedPi = 0.0; // Split calculation of PI to 400 ranges and sends them for the calculation. for (double i = -1.0; i <= 1.0; i += 0.005) { Range anInterval = new Range() { From = i, To = i + 0.005 }; mySender.SendRequestMessage(anInterval); } } private void OnResponseReceived(object sender, TypedResponseReceivedEventArgs<double> e) { // Receive responses (calculations for ranges) and calculate PI. myCalculatedPi += e.ResponseMessage; // Display the number. // Note: The UI control can be used only from the UI thread. InvokeInUIThread(() => ResultTextBox.Text = myCalculatedPi.ToString()); } // Helper method to invoke some functionality in UI thread. private void InvokeInUIThread(Action uiMethod) { // If we are not in the UI thread then we must synchronize via the invoke mechanism. if (InvokeRequired) { Invoke(uiMethod); } else { uiMethod(); } } private IDuplexTypedMessageSender<double, Range> mySender; private double myCalculatedPi; } }
Load Balancer
The load balancer is a simple console application acting as a real service. In this example it has a farm of three services. (To keep things simple, all three services run on the same computer. They listens to different ports.)
When a user presses the button in the client application, the load balancer receives about 400 request messages and distributes them to its three services from the farm.
To decide which service is picked from the farm, the load balancer uses Round-Robin algorithm. Therefore, all services are chosen evenly.
(The Round-Robin algorithm is suitable for load balancing if processing of requests takes approximately same time.)
The code is very simple. The whole implementation is here:
using System; using Eneter.Messaging.MessagingSystems.MessagingSystemBase; using Eneter.Messaging.MessagingSystems.TcpMessagingSystem; using Eneter.Messaging.Nodes.LoadBalancer; namespace LoadBalancer { class Program { static void Main(string[] args) { // Create TCP messaging for the communication with the client // and with services performing requests. IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory(); // Create load balancer. // It receives requests from the client and forwards them to available services // to balance the workload. ILoadBalancerFactory aLoadBalancerFactory = new RoundRobinBalancerFactory(aMessaging); ILoadBalancer aLoadBalancer = aLoadBalancerFactory.CreateLoadBalancer(); // Addresses of available services. string[] anAvailableServices = { "tcp://127.0.0.1:8071/", "tcp://127.0.0.1:8072/", "tcp://127.0.0.1:8073/" }; // Add IP addresses of services to the load balancer. foreach (string anIpAddress in anAvailableServices) { aLoadBalancer.AddDuplexOutputChannel(anIpAddress); } // Create input channel that will listen to requests from clients. IDuplexInputChannel anInputChannel = aMessaging.CreateDuplexInputChannel("tcp://127.0.0.1:8060/"); // Attach the input channel to the load balancer and start listening. aLoadBalancer.AttachDuplexInputChannel(anInputChannel); Console.WriteLine("Load Balancer is running.\r\nPress ENTER to stop."); Console.ReadLine(); // Stop lisening. aLoadBalancer.DetachDuplexInputChannel(); } } }
PI Calculator
The calculator is a simple service (console application). It receives requests to perform calculations for specific intervals. When the calculation is done, it sends back the calculation result.
using System; using System.Collections.Generic; using System.Net; using System.Net.NetworkInformation; using Eneter.Messaging.EndPoints.TypedMessages; using Eneter.Messaging.MessagingSystems.MessagingSystemBase; using Eneter.Messaging.MessagingSystems.TcpMessagingSystem; namespace PiCalculator { // Requested calculation range. public class Range { public double From; public double To; } class Program { static void Main(string[] args) { string aServiceAddress = GetMyAddress(); if (aServiceAddress == "") { Console.WriteLine("The service could not start because all possible ports are occupied."); return; } // Create TCP messaging for receiving requests. IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory(); IDuplexInputChannel anInputChannel = aMessaging.CreateDuplexInputChannel(aServiceAddress); // Create typed message receiver to receive requests. // It receives request messages of type Range and sends back // response messages of type double. IDuplexTypedMessagesFactory aReceiverFactory = new DuplexTypedMessagesFactory(); IDuplexTypedMessageReceiver<double, Range> aReceiver = aReceiverFactory.CreateDuplexTypedMessageReceiver<double, Range>(); // Subscribre to messages. aReceiver.MessageReceived += OnMessageReceived; // Attach the input channel and start listening. aReceiver.AttachDuplexInputChannel(anInputChannel); Console.WriteLine("Root Square Calculator listening to " + aServiceAddress + " is running.\r\n Press ENTER to stop."); Console.ReadLine(); // Detach the input channel and stop listening. aReceiver.DetachDuplexInputChannel(); } private static void OnMessageReceived(object sender, TypedRequestReceivedEventArgs<Range> e) { Console.WriteLine("Calculate From: {0} To: {1}", e.RequestMessage.From, e.RequestMessage.To); // Calculate requested range. double aResult = 0.0; double aDx = 0.000000001; for (double x = e.RequestMessage.From; x < e.RequestMessage.To; x += aDx) { aResult += 2 * Math.Sqrt(1 - x * x) * aDx; } // Response back the result. IDuplexTypedMessageReceiver<double, Range> aReceiver = (IDuplexTypedMessageReceiver<double, Range>)sender; aReceiver.SendResponseMessage(e.ResponseReceiverId, aResult); } // Helper method to get the address. // Note: Since our example services are running on the same machine, // we must ensure they do not try to listen to the same IP address. private static string GetMyAddress() { List<int> aPossiblePorts = new List<int>(new int[]{ 8071, 8072, 8073 }); // When we execute this service three time we want to get three instances // listening to different port. IPGlobalProperties anIpGlobalProperties = IPGlobalProperties.GetIPGlobalProperties(); IPEndPoint[] aTcpListeners = anIpGlobalProperties.GetActiveTcpListeners(); // Remove from the possible ports those which are used. foreach (IPEndPoint aListener in aTcpListeners) { aPossiblePorts.Remove(aListener.Port); } // Get the first available port. if (aPossiblePorts.Count > 0) { return "tcp://127.0.0.1:" + aPossiblePorts[0] + "/"; } // All three instances are used. return ""; } } }
And here are all applications communicating together.
Hi
ReplyDeleteHow to Implement Load Balancing to Distribute Workload with class AuthenticatedMessagingFactory ?
Thank you ~
how to run and install message framework code in java
ReplyDeletethank you in advance