构建轻量消息传递系统

Eric Gunnerson
Microsoft Corporation
2003 年 9 月 8 日

摘要: Eric Gunnerson 将讨论基于套接字的体系结构以及如何创建在 PC 和 Pocket PC 上都可以运行的高效、易用的消息传递系统。(本文包含一些指向英文站点的链接。)

下载 csharp09182003_sample.exe 文件。(请注意,在示例文件中,程序员的注释使用的是英文,本文中将其译为中文是为了便于读者理解。)

上个月我钻研了一下 Remoting,最后发现它不能在 Pocket PC 上使用。这个月,我将向您展示我构建的基于套接字的系统,但首先,我想先谈几个其他主题。

关于 Blog 和人类

以前我提到过我有一个 Web 日志(也称为 Blog),但是我没有提到如何读取它。您可以通过访问 Web 站点 http://blogs.gotdotnet.com/ericgu/ 来读取它,但是对每个要读取的 Blog,您都不得不访问该站点。这太麻烦了,您可能会对几个站点这样做,但是这样做的站点不会超过五个。

您需要的只是这样一种方法:通过这种方法,无需访问 Web 站点,就能了解何时有可用的新内容。要实现这种方法,可以使用 Blog 软件将 XML 内容作为 RSS 内容提供。然后,使用名为 RSS 阅读器(或 RSS 收集器)的软件程序监视此内容,就可以了解何时出现了新内容。可以在 http://backend.userland.com/directory/167/aggregators 找到收集器的列表。

现在有两种收集器。第一种是独立的程序,它定期提取您感兴趣的 Blog 的 RSS 内容,并告诉您何时出现了新内容。我使用的是名为 SharpReaderopen in new window 的免费收集器软件,它是用 C# 编写的。第二种收集器作为电子邮件(例如 Microsoft Outlook®)的外接程序运行,并将新的 Blog 条目转换为 Outlook 文件夹中的消息。NewsGatoropen in new window 就是这种收集器。

现在,让我们回到以前规划的程序。

回顾往事

在 Microsoft Visual Studio® 2002 的开发过程中,我是 C# 编译器测试主管,同时还是一名语言设计师。虽然我非常喜欢做一名语言设计师,但我对语言设计只是一知半解。

发布 Visual Studio 2002 后,我决定从测试方面转向程序管理 (PM) 方面。这样我就可以离开语言方面,集中精力钻研项目系统和我们的社团工作。

最近,我们的程序管理团队内进行了一些调整,使我们不得不寻找新的编译器程序经理,这样我又回到了原来的位置。我非常高兴可以再次与语言打交道。

套接字和消息

如今,大多数 Web 服务和所有远程应用程序都使用一种远程过程调用 (RPC) 方法。您只需要执行一种类似函数调用的操作,屏幕后面就会发生一系列魔术般的变化,使您的操作能够在服务器上实现。而在底层,系统会在两台计算机之间传递消息,但是所有这些您都不会直接看到。

但是,当您切换到套接字时,却是在一个完全基于消息的系统中进行编程。这将改变您所编写的代码的种类,因为取回数据的唯一方法是通过消息。这与使用没有返回值和输出参数的 .NET 类有些类似,这种类的所有信息都是通过事件返回的。

我希望服务器告诉客户端什么时候歌曲改变了,这样使用消息就真的很有好处,因为无需客户端明确申请,信息就可以从服务器传输到客户端。但是,确实需要您用不同的方式进行处理。

不过,在我开始之前,我想先谈谈安全性问题。如果您正在计算机上打开一个端口,其他人就可以尝试使用该端口进行一些非法活动。他们可能会向端口写入垃圾信息,以便了解他们是否能够控制您的计算机或使您的计算机崩溃。

编写这种程序时,最好要考虑到这种可能性。就我而言,程序会在防火墙后我的家庭网络上运行,所以我感觉相对安全。

简单的套接字

我将从可以把整数加 1 的服务器开始。下面是服务器端的代码:

public static void Main()
{
    IPAddress localAddr = IPAddress.Parse("127.0.0.1");

    TcpListener listener = new TcpListener(localAddr, 9999);

    Console.WriteLine("Waiting for initial connection");
    listener.Start();
    Socket socket = listener.AcceptSocket();
    Console.WriteLine("Connected");
    NetworkStream stream = new NetworkStream(socket);
    BinaryReader reader = new BinaryReader(stream);
    BinaryWriter writer = new BinaryWriter(stream);

    int i = reader.ReadInt32();
    i++;
    writer.Write(i);
}

开始时,它在本地主机的端口 9999 创建 TCP 侦听器,启动该侦听器,然后等待连接。获得连接后,它接收一个整数,将该整数加 1,然后将它发送回去。

我应该指出我使用的本地主机地址是 127.0.0.1。当客户端和服务器位于同一台计算机上时,这种方法工作得很好,但当它们运行在不同的计算机上时就不行了。我将在后面的示例中展示一些更复杂的代码。示例代码位于 SimpleSockets 子目录中。

传递消息

通过套接字传递原始数据并不是什么有趣的事,而通过套接字传递对象会好一些。要通过套接字来传递对象,我们需要一种方法来获取对象并将其转换为字节流。一种明显的解决方案是使用运行时提供的序列化支持。不幸的是,这种方法存在很多问题。

第一个问题是序列化的系统开销很大,这意味着它使用的字节数要远远多于传递数据所需的字节数。如果使用 SOAP 格式化,这个问题会更糟糕。当然,这是否会成为问题,取决于您的应用程序在性能方面的要求。第二个问题是 Compact Framework 中不提供序列化。因为没有什么简单的解决办法,所以我们只能自己来解决这个问题。在进程中,我们将进行一些远远小于序列化的操作。

我首先创建一个枚举来定义可以传递何种消息:

public enum MessageType
{
    RequestEmployee = 1,
    Employee,
}

对于每种消息类型,我们都需要用一个对象来定义该对象。

public class RequestEmployee: ISocketObject
{
    int id;
    public RequestEmployee(int id)
    {
        this.id = id;
    }

    public RequestEmployee(BinaryReader reader)
    {
        id = reader.ReadInt32();
    }

    public int ID
    {
        get
        {
            return id;
        }
    }

    public void Send(BinaryWriter writer)
    {
        writer.Write((int) MessageType.RequestEmployee);
        writer.Write(id);
    }
}

采用的方法与 ISerializable 接口非常相似。ISocketObject 接口定义了一个 Send() 函数,用于序列化通过的数据,然后用一个构造函数将数据还原序列化。

只要这些对象序列化其自身,它们发送的第一个内容就必须是消息标识符,这样接收者才能了解传入的是何种对象并创建该对象。下面是客户端的代码:

RequestEmployee requestEmployee = new RequestEmployee(15);
requestEmployee.Send(writer);

MessageType messageType = (MessageType) reader.ReadInt32();

switch (messageType)
{
    case MessageType.Employee:
        Employee employee = new Employee(reader);
        Console.WriteLine("{0} = {1}", employee.Name, employee.Address);
        break;
}

此代码创建一个 RequestEmployee 对象,并将其发送到服务器,然后指明所返回的对象的类型,并将该对象还原序列化。

虽然示例项目分别标上了“客户端”和“服务器”,但是它们之间仅有的真正区别是建立连接的方式。而建立连接之后,两者都使用类似的代码发送和接收消息,尽管两者都有自己要处理的消息集。示例代码位于 SocketObjects 子目录中。

面向对象设计与实用主义

使用此方法的一个缺点是要以一个大型 switch 语句结束,而我们中的许多人都接受过这样的教育:大型 switch 语句是不良设计的表现之一。通常的面向对象 (OO) 方法是使用多态。

要使用多态,我们需要定义抽象基类,然后从该类派生出所有的消息对象。每个类都需要实现各自的方法来完成序列化、还原序列化和消息处理。然后,主要代码将:

  • 读取消息类型
  • 创建实例(使用反射)
  • 调用 HandleMessage() 虚函数

这可以工作,但是我不喜欢它的一些效果。首先,编写用于创建实例的代码时要非常小心;其次,由于使用了反射,它的运行速度会比较慢。更重要的是,消息处理是在 HandleMessage() 函数中进行的,这意味着它将是共享库的一部分。这实在很不幸,因为消息处理会受到消息传递方式的影响。因为这些问题,我已经决定使用一种较少面向对象、而又易于编写的方法。

使它变得实用

上一个示例只能处理单个消息。在实际环境中,我们需要不断处理消息。

服务器线程

我的最终目标是向现有的应用程序中添加服务器功能。由于我不想修改现有应用程序的代码,因此我需要在一个线程中运行我的服务器。我还希望允许同时处理多个连接。我将先实现这个愿望。

上一个示例侦听端口 9999,但是因为每个端口上只能有一个客户端进行通信,所以我需要一种方法,对每个连接使用一个不同的端口。SocketListener 类将侦听端口 9999,无论何时有连接请求传入,它都会查找一个未被占用的端口并将其发送回客户端。下面是这个类的概况:

public class SocketListener
{
    int port;
    Thread thread;

    public SocketListener(int port)
    {
        this.port = port;
        ThreadStart ts = new ThreadStart(WaitForConnection);
        thread = new Thread(ts);
        thread.IsBackground = true;
        thread.Start();
    }

    public void WaitForConnection()
    {
        // 此处包含主要代码
    }
}

WaitForConnection() 方法是一种能够完成所有工作的方法。类的构造函数将创建用于执行 WaitForConnection() 的新线程。打开套接字并接受连接的过程与前面的示例一样。下面是此线程的主循环:

while (true)
{
    Console.WriteLine("Waiting for initial connection");
    listener.Start();
    Socket socket = listener.AcceptSocket();
    NetworkStream stream = new NetworkStream(socket);
    BinaryReader reader = new BinaryReader(stream);
    BinaryWriter writer = new BinaryWriter(stream);

    Console.WriteLine("Connection Requested");

    int userPort = port + 1;
    TcpListener specificListener;
    while (true)
    {
        try
        {
            specificListener = 
                new TcpListener(localAddr, userPort);
            specificListener.Start();
            break;
        }
        catch (SocketException)
        {
            userPort++;
        }
    }
    // 远程用户应该使用 specificListener。
    // 将该端口发送回远程用户,
    // 并在该端口创建服务器以供使用
    SocketServer socketServer = new SocketServer(specificListener);

    writer.Write(userPort);
    writer.Close();
    reader.Close();
    stream.Close();
    socket.Close();
}

我希望能够支持多个连接,因此我使用了一个端口,供客户端表明它们需要建立连接。然后,服务器将查找空闲的端口,并将该端口发送回客户端,该端口即被该客户端用于建立连接。

我没有发现任何方法可以找出未被占用的端口,因此只能使用 while 循环,一直尝试端口,直到找到空闲的端口为止。然后将端口号发送回客户端并清空内容。

这里我要指出一个很微妙的地方。最初版本的 SocketServer 将端口号作为参数。不幸的是,这意味着在指定的端口上设置侦听器之前客户端可能已经提出申请,这太糟糕了。为了避免这种情况,我在把端口号发送给客户端之前创建了 TcpListener,以便确保不出现争用的情况。

SocketServer 类创建一个附加线程,并使用以下主循环:

try
{
    while (true)
    {
        MessageType messageType =
            (MessageType) reader.ReadInt32();

        switch (messageType)
        {
            case MessageType.RequestEmployee:
                Employee employee = 
                new Employee("Eric Gunnerson", "One Microsoft Way");
                employee.Send(writer);
                break;
        }
    }
}
catch (IOException)
{

}
finally
{
    socket.Close();
}

主循环是一种简单的获取请求/处理请求的循环。try-catch-finally 将从客户端断开连接时引发的异常中恢复过来。

客户端的事件

在客户端,我将编写一个 Windows 窗体客户端,既可用于 PC 也可用于 Pocket PC。Windows 窗体环境是基于事件的,特别适合使用事件来处理套接字消息。这是通过 SocketClient 类完成的。第一步是为每个消息定义一个委托和事件:

public delegate void EmployeeHandler(Employee employee);
public event EmployeeHandler EmployeeReceived;

第二步是编写用于发送事件的代码:

    case MessageType.Employee:
        Employee employee = new Employee(reader);
        if (EmployeeReceived != null)
            form.Invoke(EmployeeReceived, new object[] {employee});
        break;

或许我们需要在此事件发生时更新窗体。要使它更加可靠,需要在主 UI 线程中进行更新。这可以通过对窗体调用 Invoke() 来实现,此函数将安排要在主 UI 线程中调用的委托。

由于体系结构基于消息,因此服务器本身就提供了对异步事件的支持。示例包含一个由服务器每秒发送一次的 CurrentCount 消息。示例代码位于 SocketFinal 子目录中。

小结

总而言之,我非常喜欢这个基于套接字的体系结构。它简便易用,并且在 PC 和 Pocket PC 上都可以运行。

下个月

PDC 很快就要发布了,我们将“透露”一点相关消息。我们将透露的消息是非正式的,正如我们在 Whidbey 完成以前发布的消息一样,但是我们会提供更多有关新语言特性的细节。我将在下个月讨论这些内容。


Eric Gunnerson 是 Visual C# 组的程序经理,以前曾是 C# 语言设计组的成员,现在又回到了这个组,他著有 A Programmer's Introduction to C#, 2nd Edition。他从事编程工作已经有很长时间,积累了丰富的编程经验。他知道 8 英寸磁盘,而且还曾经用一只手装过磁带。在业余时间,他是一位略显烦人的 Roomba 观察员。

Contributors: FHL