分布式命令模式——互联系统的一种可扩展的命令模式



分布式命令模式是一种用来解决架构设计的建议模式。相比设计通常的应用而言,在互联系统中应该更多地考虑使用它。这种模式的目标是让独立系统与互联系统都有相同的设计。这种模式允许开发者将精力集中在设计一个遵循命令模式的通常的应用上,而不需要考虑该应用将与其他的系统相连。这种模式在设计命令时将开发者从考虑传输和通信协议中解放出来,并且能保持系统的简洁性。当某些命令被执行时,设计者不需要担心怎样发送必要的数据给服务器,以及怎样处理从服务器接收到的数据然后反应到互联应用的UI上。当一条命令在一个本地运行系统上被执行,分布式命令模式会考虑在所有的互联系统上同时执行相同命令的障碍,来让它们保持相同的状态。从一个设计者的角度出发,这样的架构仍然和那些使用通常的命令模式且没有网络意识的桌面应用一样简单,然而如果命令已在一个系统内部被调用,它能够执行所有必要的通信以在所有的互联系统上执行命令。



 

命令模式概要



命令模式是一种在web和桌面系统中都被广泛采用的设计模式。它允许开发者在用户和系统的执行操作上思考和设计架构。例如,当一个文件被打开,一个FileOpenCommand发生,必要的代码只服务于该命令。这种简化的架构设计缘于,当实现了一个操作,需要完成的任务已经非常地明确。命令模式也强制设计出一个可复用的系统,因为那些很小的命令的“类”是可复用的,并且既可以被用户执行也可以被系统执行。例如,一个用户通过选择File菜单下的Open命令可能产生一个FileOpenCommand,它有可能被一个插件或者宏自动执行,或者需要打开一个文件的其他命令执行。但结果是,该命令仅被写入一次,在一个地方被维护,但它却提供广泛的复用性。

 

分布式系统的问题



当设计一个互联系统的架构时,设计时需要考虑到怎样执行命令。不仅是在“源系统”本身,还要在所有其他的互联系统上。例如,一个聊天系统与一个聊天室相连。如果一个用户发出一条消息,消息需要被显示在用户的屏幕上,同时也需要显示在所有在聊天室中在线用户的屏幕上。这引发了一个架构设计的问题——一个命令需要在“源系统”被执行,并且必要的数据需要被发往服务器以“传播”消息到所有其他的相连用户。每一个连接着的用户都需要从服务器接收到数据,传输它,并且在屏幕上显示。

这种情况导致了一个额外的架构设计以及为每一个操作执行的通信而编码。首先,遵循命令模式,我们只能在相同的系统上执行命令,更特殊的是,只能在相同的进程上。然后,我们需要准备网络传输。所以,需要有某些用来维持服务端和客户端通信的特殊的协议待开发。服务端和客户端都需要维护一段处理该协议的代码。当一个消息从网络上被接收,消息首先需要被处理,然后需要写某些特别的代码来给该消息执行命令。

这里有一个设计,它展示了一个传统的聊天应用怎样被开发:



这样设计的缺点是:

(1)    对每一个操作,都会有一个命令,都会有一次关于传输必要的数据到服务器或者从服务端接收数据的协议解析。

(2)    如果一个命令改变了,为了反映执行的改变,协议也需要被修改。

(3)    对每一个操作,开发者都需要思考三个问题:

i一个命令需要被开发以在本地应用程序上执行

ii一条消息发送到服务器以及包装所需数据的逻辑

iii一个用来接收那条消息的接收器,处理消息的逻辑和执行必要的操作

 

分布式命令模式



分布式命令模式通过引入一种架构来解决这些问题,当任何一个系统执行一个命令时,它能够使相同的命令在互联系统上自动执行。这种模式提供一种方式以在从一个连接系统到另一个连接系统的进程内执行命令。不需要发送一个自定义的消息(该消息包含要求的数据给服务器来告知命令的执行的),对客户端来讲也不需要读取和转换一条消息来提取必要的数据。系统上执行的命令都将被序列化并传输到其他系统,然后在本地执行,以模拟该命令发生自“源系统”。

分布式命令模式有如下的好处:

(1)    一个命令“类”总是以和一个简单的没有网络意识的独立系统同样的方式开发。

(2)    对设计者来讲,没有必要开发自定义的协议或者为了给每个命令在发送和接受时携带必要的数据而设计消息。

(3)    当一个命令被执行,它不仅会在本地进程上会被执行,而且在另一个连接系统的进程上也会被执行。

(4)    它能够使互联系统对彼此来讲是“内同步”的,而且不需要知道其他系统的存在。

(5)    它将开发者从思考和实现某些问题中解放出来,那些问题是:怎样使所有的互联系统彼此是“内同步”地执行命令,就像是在每个连接系统的本地调用一样。

(6)    开发者不需要担心是否命令在相同的进程上执行或者将来在其他的进程执行。

(7)    它拒绝大量的为通信而构建的代码,有一个统一的方式发送和接收任何命令

(8)    很有可能无缝地实现撤销/重做命令。如果一个应用执行“撤销”,所有的连接系统都以同样的方式执行“撤销”操作。

这里有一个插图,它展示了一条命令如何在所有的互联系统上以统一的方式执行:



下面的一段命令类型的简单代码,它展示了命令总是对互联的系统是无意识的:

[Serializable]
public class SendMessageCommand : UICommand, ICommand
{
public string From;
public string Message;

public SendMessageCommand(
string from,
string message)
{
this.From = from;
this.Message = message;
}

void ICommand.Execute()
{
base.UI.ChatHistoryTextBox.AppendText( this.From
+ " says:" + Environment.NewLine );
base.UI.ChatHistoryTextBox.AppendText( this.Message
+ Environment.NewLine );
}
}


分布式命令模式的开发



下面一步一步展示分布式命令模式的开发,逐步展示怎样将一个通常的命令模式转变成一个分布式的命令模式

第一步  通常的命令模式



这是一个通常的命令模式。这里,调用者创建一个命令并且直接调用命令的执行方法来执行命令。

ICommand command = new FileSaveCommand( fileContent );
command.Execute();


下面是另一个变形过的命令模式,命令不是被调用者直接执行。它通过一个命令的执行器来执行命令,将其命名为命令的门面【外观】。变动如下:



这种模式的不同之处在于,一个命令将不会执行被调用者执行。调用者创建命令的实例,然后将该实例传递给Command Facade。Command Facade总是调用命令的执行方法。

ICommand command = new FileSaveCommand( fileContent );
CommandFacade.Execute( command );


这种模式的好处是,Command Facade可以与任何的命令的执行交互并且它能够提供某些具有上下文的命令和命令的执行需要的额外的“元数据”。它也扮演着中央命令执行室的角色。在分布式命令模式中,这种特殊的“属性”很有用。

第二步  引进一个总线

在分布式命令模式中,Command Facade事实上是一个命令总线。命令总线是遵循消息总线模式而创建的。消息总线引进一个总线,它能够知道怎样格式化一条消息以及怎样用一些通道来发送消息。在分布式命令模式中,命令总线也类似于消息总线。它持有一个命令执行器的管线【pipeline】。无论何时,一条命令被发往消息总线,它将通知管线中所有的执行器来执行命令。执行器的工作类似于通道,实际上是做一些命令的执行操作。



命令总线不改变命令被执行的方式。调用者像上面那样书写相同的代码:

ICommand command = new FileSaveCommand( fileContent );
CommandBus.Execute( command );


然而,命令总线的内部,它会调用管线内所有的执行器来执行命令:

CommandBus.execute( ICommand command )
{
foreach( ICommandExecutor executor in this.Pipeline )
{
executor.Execute( command );
}
}


第三步  介绍执行器

执行器为命令提供执行环境。尽管命令执行需要包含必要的代码,但命令执行器提供命令执行期间的必要支持。例如,画一个带有文本框的windows form。一个命令需要在执行时为文本框中加入某些文本。但命令是一个独立的类,并且它没有对窗体中的文本框的引用。所以,该窗体就是命令的执行器,从命令总线上截取命令,并且给命令对象提供文本框的引用。它然后执行命令:

class MainForm : ICommandExecutor
{
ICommandExecutor.Execute( ICommand command )
{
TextAddCommand cmd = command as TextAddCommand;
cmd.TheTextBox = this.MessageTextBox;
cmd.Execute();
}
}


这是本地执行器的一种方式,这里命令在进程内执行。当然还有另外一种命令的执行方式,它被称为分布式命令执行器,它有责任通过传输器发送和接受命令。这种类型的执行器负责维持所有的互联系统是内部同步的,并且提供命令的分布式服务。

然而,分布式执行器并不直接执行命令。它仅仅使用一个传输器发送命令到另一个连接系统上。在另一端,有另一个分布式命令执行器,它从它自己的传输器接收命令。然后通知命令总线——一个命令被接收!然后总线通过管线再次发送执行命令。这次,在另一端的本地执行器接受到命令然后在它自己的上下文执行命令。这导致在另一台计算机上相同命令的执行就好像命令在那里被创建一样。



第四步  介绍传输器

传输器提供通信服务。它位于分布式命令执行器和网络通信库之间,提供透明的消息传输服务。一个传输器是基于应用程序使用的通信媒介创建的。例如,当一个客户端程序使用一个TCP/IP连接到服务器,它就会使用有发送和接收数据的Sockets的二进制模式的传输器。在成功地建立一个连接之后,一个分布式的执行器将基于传输器而创建,然后执行器被压入命令总线来传递命令。比如:

ITransporter transporter = new BinaryTransporter( address, port );
ICommandExecutor executor = new BinaryDistributedExecutor( transporter );
CommandBus.AddExecutor( executor );

CommandBus.Execute( new HiCommand() );


上面的伪代码展示了如何将一个分布式执行器与一个传输器关联起来。当执行器被加入总线,它已经准备好接收和发送任何用来在总线上执行的命令了。因此,当一个总线上的命令被执行时,命令将会被发送给分布式执行器,执行器将命令发送给传输器,轮到它使用它创建的Socket来传输消息。

分布式命令模式的架构是一种管线类型的架构,它可以被看成是如下的形式:



技术架构



 DCP的技术架构图如下:



执行顺序



当一个命令被发送到命令总线上,接下来的执行将会发生:

(1)    命令总线传递命令给命令管线里的每一个执行器

(2)    如果执行器是一个本地执行器,它将直接执行命令

(3)    如果执行器是一个分布式执行器,它将使用一个RemoteCommand类来装饰命令【采用装饰器模式】。该类包含了实际的命令,并且另外还包含一个传输器ID,它用来识别已经被发送的命令。

(4)    远程命令然后被发送给传输器,以使用某种通信协议将命令发送给目的地。

(5)    在另一端,传输器接收消息。它基于消息来创建命令对象。

(6)    被发送给分布式命令执行器的命令是被附属在传输器上的。执行器接收命令,然后它修改RemoteCommand的Source属性,将其设置为接收命令的传输器的ID。

(7)    执行器通知它的命令总线——命令到达。

(8)    命令总线通知所有在它管线内的命令执行器执行命令。

(9)    在这段时间内,原本通知命令总线已接收到命令的执行器将被命令总线再次调用,因为它在管线内。但它从remote command的source属性看到,是它自身发送命令到总线的。所以,它并不会再次发送命令给传输器,以此来防止无限循环。

怎样应用该模式



对实现该模式有兴趣的应用需要做以下事情:

(1)    创建一个命令总线仅一次,并且一旦创建完成,它就不可更改

(2)    创建一个传输层,它需要实现Itransporter仅一次,并且一旦创建完成,它将不可更改

(3)    创建一个本地命令执行器来执行本地命令,并且一旦创建完成,通常情况下它将不需要改动

(4)    创建一个分布式命令执行器使用传输器来发送命令给其他的连接系统,并且一旦创建完成,通常情况下将不必改动

(5)    开发需要的命令

简单的聊天应用



这是一个演示分布式命令模式的很好的例子,它是一个聊天应用。该聊天应用有两个模式——服务端模式和客户端模式。



服务端模式



服务端开启一个传输器来侦听来自客户端的连接。它也创建一个本地的命令总线,并注册它自身以接收发送到总线上的命令,这样它就可以执行本地命令。

// 1. Create the command bus and attach myself as one of the command executors

this._MyBus = new CommandBus( true );
this._MyBus.AddExecutor( this );

// 2. Establish a collaboration server, this is the transporter

this._Server = new CollaboratorServer(
int.Parse( myPortTextBox.Text ),
Guid.NewGuid().ToString(), SERVER_NAME,
new PacketReceiveHandler( PacketReceived ),
new NewClientHandler( NewClientConnected ),
new ClientClosedHandler( ClientClosed ) );

// 3. Goto listen mode

this._Server.Listen();


无论什么时候,只要有客户端连接到服务器,为客户端创建的传输器将被附有一个新创建的分布式命令执行器,然后它将被加入命令总线。最终,任何被发送给总线的命令既可以在本地被执行,同时也可以被发送给连接着的客户端。

private bool NewClientConnected( CollaborationClient client,
out string uniqueID, out string name )
{
uniqueID = Guid.NewGuid().ToString();
name = uniqueID;

// This client will participate in distributed command execution

ICommandExecutor executor = new DistributedExecutorHTTP( client );
this._MyBus.AddExecutor( executor );

// Accept the client

return true;
}


只要客户端发送任何的命令,它们都将被总线接收,然后总线将再次通过管线发送以执行命令。命令在本地服务器被执行,同时也被广播到所有其他与服务器关联着的客户端。这将导致,如果一个客户端说——Hi,那么所有的客户端和服务器将显示Hi。

客户端模式



客户端首先创建一个命令总线,并将自身看做一个命令执行器,以便可以在本地执行命令。

当它连接到服务器,它将创建一个分布式的命令执行器,然后将执行器加入到命令总线。这样,任何在客户端执行的命令也将被发送到服务器。

// 1. Create the command bus and attach myself as one executor of commands

this._MyBus = new CommandBus( false );
this._MyBus.AddExecutor( this );

// 2. Establish connection

this._Client = new CollaborationClient(
serverTextBox.Text,
int.Parse( serverPortTextBox.Text ),
Guid.NewGuid().ToString(),
nameTextBox.Text,
new PacketReceiveHandler( PacketReceived ),
new ClientClosedHandler( ClientClosed ) );

// 3. This client will provide distributed command execution service

this._MyBus.AddExecutor( new DistributedExecutorHTTP( this._Client ) );

// 4. Create the command to inform new client

ICommand command = new NewClientCommand(
new ConnectedClient( this._Client.UniqueId, nameTextBox.Text ) );

// 5. Execute the command

this._MyBus.Execute( command );




执行命令—发送消息



执行一个命令非常得简单,它不需要额外的“努力”就能使所有的客户端和服务器保持同步。例如,需要发送一条消息,接下来的代码就已经足够了:

this._MyBus.Execute( new SendMessageCommand( nameTextBox.Text,
messageTextBox.Text ) );


相似的,为了断开连接并且通知每个人——你要离开聊天室,下面的代码就足够了:

this._MyBus.Execute( new ClientLeftCommand( client ) );




本地命令执行器



本地命令执行器从命令总线上接收命令,在本地执行它们。例如,main窗体是一个本地的命令执行器。它基本上就是接收命令,然后装饰必要的UI支持,最终执行命令。例如,下面的通用代码对一个本地命令执行器来讲就足够了:

void ICommandExecutor.Execute(ICommand command)
{
ICommand actualCommand = command;

// If this is a remote command,

// then the actual command is packed inside it

if( command is RemoteCommand )
{
RemoteCommand remoteCommand = command as RemoteCommand;
actualCommand = remoteCommand.ActualCommand;
}

// Call the execute method on UI thread

if( this.InvokeRequired )
this.Invoke( new MethodInvoker( command.Execute ) );
else
command.Execute();
}




传输层——HTTP传输



该应用程序有一个非常有用的传输层,它使用HTTP协议通信。总共有两个类,一个是CollaborationServer,它扮演者服务器的角色,另一个是CollaborationClient,它扮演者服务器的客户端。

传输层是一个完全通用的类库。它接收HTTP包,并通过网络发送包。该类库也可以用在多种应用上,以交换文本和二进制数据。它也可以被用来创建一个HTTP服务器,而不需要写任何的通信代码,因为它已经使用了HTTP协议来交换数据。下面是一个简单的基本会话:

请求:

GET / HTTP/1.1
Correlation-ID: 0B83745D-2AAB-4bce-8AC9-B8A590F07768


响应:

HTTP/1.1 200 Ok
Correlation-ID: 0B83745D-2AAB-4bce-8AC9-B8A590F07768


该类库是定制的并且增强了客户端-服务器的协作。这个库的特殊性在于,它只打开一个TCP/IP套接字,但提供在相同套接字上的双向通信方式。所以,它是一个针对点对点通信的且防火墙友好的解决方案。

该协作库是完全面向对象的。它以http消息的方式来接收和发送命令。这里有一个HTTPMessage,它包含了被发送和接收的消息的明细。该HTTPMessage被一个称为Packet的类继承。这里有两种类型的packets,PacketRequest被用来发送消息给另一方,PacketResponse被从另一方返回。



一个应用程序只需要扩展PacketRequest 以及GenerciSuccessPacket(可选)来提供自己的消息。例如,WhatIsYourNameRequest类可以被扩展PacketRequest来发送可识别的请求,另一端可以返回MyIdentityResponse(扩展GenerciSuccessPacket),以此来响应该请求。

而CommandPacketRequest携带一个命令,且序列化了该命令,将其转换成了一个HTTP 消息的包。简单得说,它获得了一个HTTPMessage,然后从消息体重构建了一个命令。

/// <summary>

/// HTTP Request Packet which encapsulates a distributed command

/// </summary>

public class CommandPacketRequest : PacketRequest
{
private RemoteCommand _Command;

private static BinaryFormatter _Serializer = new BinaryFormatter();

public RemoteCommand Command
{
get { return this._Command; }
set { this._Command = value; }
}

public CommandPacketRequest( HttpMessage msg ) : base( msg )
{
// Deserialize the message body and extract the command from it

MemoryStream stream = new MemoryStream( base.Body );
this._Command = _Serializer.Deserialize( stream ) as RemoteCommand;
}

public CommandPacketRequest(RemoteCommand command)
{
// Serialize the command

MemoryStream stream = new MemoryStream( 256 );
_Serializer.Serialize( stream, command );

// Store the data in the body of the message

base.Body = stream.GetBuffer();
}
}


接下来的这些类,它们在HTTP协议上提供分布式命令执行:





这个简单的应用程序里面有什么



在这个简单的应用程序里,你将看到如下内容:

(1)    一个可供分布式命令执行的类库

(2)    一个非常强大的通信类库,它提供双向的HTTP通信

(3)    一个可供直接使用的聊天应用

源代码下载