Requirements
To understand the article, basic knowledge of OOP and Visual studio is required along with basic C# programming knowledge.
I have tried to explain the article in such a fashion that simulates the typical software project's environment. The program developed in this article does not actually publish a message to any queue. Just to simulate the message being encrypted and priority set, I have added some properties in the Message to support them. But in an actual project we might be doing the job in a different manner.
Existing system
Company ABC wants to build a new application called “Messenger” that publishes the messages to MSMQ. They are expecting two new consumers to this application as soon as it is developed. One consumer will be publishing sensitive data and so expects the message to be encrypted while publishing it to the Queue and they also want to set the priority for the message. The other consumer will be publishing simple text data that does not need any encryption and only sets the message priority.
New requests from other consumers are expected soon and the application should be built in such a way as to incorporate any changes to the existing architecture in the future.
So the application manager approaches the technical team with the following problem statement:
“I want to publish a message to a queue and it should undergo certain transformations or perform certain operations before publishing it.
I want my code design to support adding new operations or removing the existing operations.”
Then the Architect/Technical lead jumps in and proposes the “Pipes and Filters Pattern” as the solution to the problem and provides the design to the development team with the high-level details as in Figure 1.
Figure 1: Publishing message to the Queue
The color depictions used in the preceding figure are that an incoming message is in the Green color and is transformed to the Blue color after the message priority is set and then transformed to an Orange color after the message is encrypted and sent as an output.
High-Level Details
- The Pipes and Filters architectural pattern is one of the Messaging Patterns that help in splitting a large set of operations on a message into various processes so that each process can work on the message independently and complete the transformation of the message.
- Here each process will be called “Filters” and they are connected using the channels or connectors called “Pipes”.
- All the filters implement a common interface so that all will conform to the contract that they are supposed to work on.
- Message from the source will be set as input to the process and the output or result of the process will be sent as an input to another process and so on until it reaches the sink or destination process. (Note: Here the terms “Process” and “Filters” are used interchangeably).
- In the preceding diagram the first process sets the message priority by taking the incoming message using a pipe and then sends the output to another process to encrypt the message.
- Then the outgoing message is published to the Queue.
Advantages and disadvantages of this architectural pattern
- All the filters or operations are independent and each can be plugged into or unplugged without affecting the other one. So this is a great feature that can be specialized for each consumer of the feature.
- Because the filters are independent of each other, you cannot establish any communication between the filters and the message can be sent sequentially, in other words in a linear fashion.
Then the architect provides the low-level design for implanting the solution as is in Figure 2.
Class Diagram
Figure 2: Class Diagram
Class Diagram in detail
All operations expose one interface so that they comply with the interface implementation. In our example, Setting Message Priority and Encrypting the Message are the two operations or filters that must be implemented on the source message before publishing.
So we created an interface “IOperation<T>” that supports the generic data type so that the implementing classes adhere to the type it should support and implement the method of the interface “Execute(T input)” that takes the generic type T as the input and returns the same type. This also helps in various types to implement the same interface. This interface has only one method, “Execute”, that should be implemented by the implementing classes. This method takes an input parameter type, performs operations and returns the same type.
Note: Since we are working with messages, a message will always be the input and output for the filter or operation methods and so our method Execute also supports that.
Now getting to the operations
We need to create classes, each performing one operation and that takes the same input type and returns the same type. Depending on the requirements, we need to create two classes “EncryptMessage” and “MessagePriority” that encrypts and sets the priority for the message.
Now there must be some way that tells which operations must be registered from the incoming message. This job will be done by the pipeline classes. So we created an abstract base class “Pipelinebase” for the pipeline that supports generic types and it has methods for registering the operations and performing the operations. Depending on our requirements, we need one pipeline that sends a message from the client to the messaging queue and so we named our pipeline class “SendPipeline” that inherits the behavior of the base class “Pipeline” and also created an interface that defines our message called “IMessage” and our SendPipeline class supports the classes that implement the IMessage interface.
Also the SendPipeline class should have a constructor that takes Boolean parameters for each message operation. Since we have only two operations, we are creating a constructor that takes two Boolean parameters. If you have more parameters then it is advisable to use the enums with the flag attribute or a type instance that carries all the setting properties.
Then the development team comes with the coding implementation depending on the class diagram and specifications provided by the architect as in the following section.
Coding in Action
IOperation interface
This interface supports a generic type and all the classes that implement this interface will be implementing the Execute method that takes and returns the same type.
- namespace PipesAndFiltersExample
- {
-
-
-
-
-
- public interface IOperation<T>
- {
-
-
-
-
-
- T Execute(T input);
- }
- }
EncryptMessage class
This class encrypts the incoming message and implements the interface IOperation<IMessage>.
- namespace PipesAndFiltersExample
- {
-
-
-
- class EncryptMessage : IOperation<IMessage>
- {
-
-
-
-
-
- public IMessage Execute(IMessage input)
- {
- return Encrypt(input);
- }
-
-
-
-
-
-
- private IMessage Encrypt(IMessage input)
- {
- input.IsEncrypted = true;
-
- return input;
- }
- }
- }
MessagePriority class
This class sets the message priority and implements the interface IOperation<IMessage>.
- namespace PipesAndFiltersExample
- {
-
-
-
- class MessagePriority:IOperation<IMessage>
- {
-
-
-
-
-
- public IMessage Execute(IMessage input)
- {
- input.Priority = 1;
-
- return input;
- }
- }
- }
IMessage interface
- namespace PipesAndFiltersExample
- {
-
-
-
- public interface IMessage
- {
-
-
-
- int Priority{get;set;}
-
-
-
-
-
- bool IsEncrypted { get; set; }
-
-
-
-
-
- bool IsMessageEncrypted();
-
-
-
-
-
- bool IsMessagePrioritySet();
-
-
-
-
- string MessageId {get;}
-
-
-
-
- string Body { get; }
-
-
-
-
- string Header { get; }
-
-
-
-
- string Subject { get; }
- }
- }
Message class
- namespace PipesAndFiltersExample
- {
-
-
-
- public class Message:IMessage
- {
-
-
-
- private string messageId;
- private string body;
- private string header;
- private string subject;
-
-
-
-
- public int Priority { get; set; }
-
-
- public bool IsEncrypted { get; set; }
-
-
-
-
- public string MessageId
- {
- get { return messageId; }
- }
-
-
-
-
- public string Body
- {
- get { return body; }
- }
-
-
-
-
- public string Header
- {
- get { return header; }
- }
-
-
-
-
- public string Subject
- {
- get { return subject; }
- }
-
-
-
-
-
-
-
-
- public Message(string messageId, string body, string header, string subject)
- {
- this.messageId = messageId;
- this.body = body;
- this.header = header;
- this.subject = subject;
- }
-
-
-
-
-
- public bool IsMessageEncrypted()
- {
- return IsEncrypted;
- }
-
-
-
-
-
- public bool IsMessagePrioritySet()
- {
- return (Priority != 0);
- }
-
-
-
-
- }
- }
Pipelinebase abstract classThis class has the two methods,
Register and
PerformOperation. The Register method takes
IOperation<T> as an input and adds it to the list type operations in the class, so based on the choice of the clients, all the required operations will be added to the list using the Register method and then the PerformOperation method loops through all the operations and executes the method “
Execute” of each operation. Each operation takes an input message and does the operation and then the obtained message will be sent to another operation and likewise until all the operations are completed.
- using System.Collections.Generic;
- using System.Linq;
- namespace PipesAndFiltersExample
- {
-
-
-
-
- public abstract class Pipelinebase<T>
- {
- private readonly List<IOperation<T>> operations = new List<IOperation<T>>();
-
-
-
-
-
-
- public Pipelinebase<T> Register(IOperation<T> operation)
- {
- this.operations.Add(operation);
- return this;
- }
-
-
-
-
-
-
- public T PerformOperation(T input)
- {
- return this.operations.Aggregate(input, (current, operation) => operation.Execute(current));
- }
- }
- }
SendPipeline class
This class has a constructor that takes two parameters, bool setPriority, bool encryptMessage and registers the operation classes based on the parameter values.
- namespace PipesAndFiltersExample
- {
-
-
-
-
- public class SendPipeline : Pipelinebase<IMessage>
- {
-
-
-
-
-
-
- public SendPipeline(bool setPriority, bool encryptMessage)
- {
- if (setPriority)
- {
- Register(new MessagePriority());
- }
-
- if (encryptMessage)
- {
- Register(new EncryptMessage());
- }
- }
- }
- }
Two console applications have been created to show the two clients. One client wants to encrypt and set the message priority and the other just wants the message priority to be set.
Client 1 program
- using PipesAndFiltersExample;
- using System;
-
- namespace Client1
- {
- class Program
- {
- static void Main(string[] args)
- {
- IMessage message = new Message(messageId:"1",
- body:"This is the message body",
- header:"Header Information",
- subject:"Please set priority and encrypt the message");
-
-
- SendPipeline sendPipeline = new SendPipeline(true,true);
- var publishedMessage = sendPipeline.PerformOperation(message);
- Console.WriteLine("I am client1 and my messages should be prioritized and encrypted!!!");
- Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());
- Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());
- Console.WriteLine();
-
- Console.Read();
- }
- }
- }
Client 2 program
- using PipesAndFiltersExample;
- using System;
-
- namespace Client2
- {
- class Program
- {
- static void Main(string[] args)
- {
- IMessage message = new Message(messageId: "1",
- body: "This is the message body",
- header: "Header Information",
- subject: "Please set priority and encrypt the message");
-
-
- SendPipeline sendPipeline = new SendPipeline(true, false);
- var publishedMessage = sendPipeline.PerformOperation(message);
- Console.WriteLine("I am client2 and my messages should only be prioritized and not encrypted!!!");
- Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());
- Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());
- Console.WriteLine();
-
- Console.Read();
- }
- }
- }
When you run the code, you should see output as in Figure 3.
Client 1 programFigure 3: Client 1 Program output shows that the message is encrypted and the priority is set
Client 2 programFigure 4: Client 2 Program output shows that the message priority is set but not encrypted
You can start two projects by right-clicking the solution and selecting “Set StartUp Projects” and “Start” for the two projects in the “Action” column and you should be good to run two console applications at one shot as in the following screenshot: