一、為什么要使用MSMQ在一個(gè)分布式的環(huán)境中,我們往往需要根據(jù)具體的情況采用不同的方式進(jìn)行數(shù)據(jù)的傳輸。比如在一個(gè)Intranet內(nèi),我們一般通過TCP進(jìn)行高效的數(shù)據(jù)通信;而在一個(gè)Internet的環(huán)境中,我們則通常使用Http進(jìn)行跨平臺(tái)的數(shù)據(jù)交換。而這些通信方式具有一個(gè)顯著的特點(diǎn),那就是他們是基于Connection的,也就是說,交互雙方在進(jìn)行通信的時(shí)候必須保證有一個(gè)可用的Connection存在于他們之間。而在某些時(shí)候,比如那些使用撥號(hào)連接的用戶、以及使用便攜式計(jì)算機(jī)的用戶,我們不能保證在他們和需要訪問的Server之間有一個(gè)的可靠的連接,在這種情況下,基于Messaging Queue的連接就顯得尤為重要了。我們今天就來談?wù)勗赪CF中如何使用MSMQ。
MSMQ不僅僅是作為支持客戶端連接工具而存在,合理的使用MSMQ可以在很大程度上提升系統(tǒng)的Performance和Scalability。我們先來看看MSMQ能給我們帶來怎樣的好處:
1.MSMQ是基于Disconnection
MSMQ通過Message Queue進(jìn)行通信,這種通信方式為離線工作成為了可能。比如在介紹MSMQ時(shí)都會(huì)提到的Order Delivery的例子:在一個(gè)基于B2C的系統(tǒng)中,訂單從各種各樣的客戶傳來,由于 客戶的各異性,不能保證每個(gè)客戶在每時(shí)每刻都和用于接收訂單的Server保持一個(gè)可靠的連接,我們有時(shí)候甚至允許客戶即使在離線的情況下也可以遞交訂單(雖然訂單不能發(fā)送到訂單的接收方,但是我們可以通過某種機(jī)制保證先在本地保存該訂單,一旦連接建立,則馬上向接收方遞交訂單),而MSMQ則有效地提供了這樣的機(jī)制:Server端建立一個(gè)Message Queue來接收來個(gè)客戶的訂單,客戶端通過向該Message Queue發(fā)送承載了訂單數(shù)據(jù)的Message實(shí)現(xiàn)訂單的遞交。如果在客戶離線的情況下,他仍然可以通過客戶端程序進(jìn)行訂單遞交的操作,存儲(chǔ)著訂單數(shù)據(jù)的Message會(huì)被暫時(shí)保存在本地的Message Queue中,一旦客戶聯(lián)機(jī),MSMQ將Message從中取出,發(fā)送到真正的接收方,而這個(gè)動(dòng)作對(duì)于用戶的透明的。
2.MSMQ天生是One-way、異步的
在MSMQ中,Message始終以O(shè)ne-way的方式進(jìn)行發(fā)送,所以MSMQ具有天生的異步特性。所以MSMQ使用于那些對(duì)于用戶的請(qǐng)求,Server端無需立即響應(yīng)的場景。也就是說Server對(duì)數(shù)據(jù)的處理無需和Client的數(shù)據(jù)的發(fā)送進(jìn)行同步,它可以獨(dú)自地按照自己的Schedule進(jìn)行工作。這可以避免峰值負(fù)載。比如Server端可以在一個(gè)相對(duì)低負(fù)載的時(shí)段(比如深夜)來對(duì)接收到的Order進(jìn)行批處理,而無需一天24小時(shí)一直進(jìn)行Order的監(jiān)聽、接收和處理。
3.MSMQ能夠提供高質(zhì)量的Reliable Messaging
我們知道,在一般的情況下,如果Client端以異步的方式對(duì)Service進(jìn)行調(diào)用就意味著:Client無法獲知Message是否成功抵達(dá)Service端;也不會(huì)獲得Service端執(zhí)行的結(jié)果和出錯(cuò)信息。但是我們?nèi)匀徽fMSMQ為我們提供了可靠的傳輸(Reliable Messaging),這主要是因?yàn)镸SMQ為我們提供一些列Reliable Messaging的機(jī)制:
-
超時(shí)機(jī)制(Timeout):可以設(shè)置發(fā)送和接收的時(shí)間,超出該時(shí)間則被認(rèn)為操作失敗。
-
確認(rèn)機(jī)制(Acknowledgement):當(dāng)Message成功抵達(dá)Destination Queue,或者被成功接收,向發(fā)送端發(fā)送一個(gè)Acknowledgement message用以確認(rèn)操作的狀態(tài)。
-
日志機(jī)制(Journaling):當(dāng)Message被發(fā)送或接收后,被Copy一份存放在Journal Queue中。
此外,MSMQ還提供了死信隊(duì)列(Dead letter Queue)用以保存發(fā)送失敗的message。這一切保證了保證了Reliable Messaging。
二、 MSMQ在WCF的運(yùn)用
在WCF中,MSMQ提供的數(shù)據(jù)傳輸功能被封裝在一個(gè)Binding中,提供WCF Endpoint之間、以及Endpoint和現(xiàn)有的基于MSMQ的Application進(jìn)行通信的實(shí)現(xiàn)。為此WCF為我們提供了兩種不同的built-in binding:
-
NetMsmqBinding:從提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding沒有什么區(qū)別:在兩個(gè)Endpoint之間實(shí)現(xiàn)了數(shù)據(jù)的通信,所不同的是,它提供的是基于MSMQ的Reliable Messaging。從變成模式上看,和一般的binding完全一樣。
-
MsmqIntegrationBinding:從命名上我們可以看出,MsmqIntegrationBinding主要用于需要將我們的WCF Application和現(xiàn)有的基于MSMQ的Application集成的情況。MsmqIntegrationBinding實(shí)現(xiàn)了WCF Endpoint和某個(gè)Message Queue進(jìn)行數(shù)據(jù)的通信,具體來說,就是實(shí)現(xiàn)了單一的向某個(gè)Message Queue 發(fā)送Message,和從某個(gè)Message Queue中接收Message的功能。從編程模式上看,也有所不同,比如Operation只接收一個(gè)MsmqMessage<T>的參數(shù)。
這是Client和Service通信的圖示:
三、MSMQ和Transaction
MSMQ提供對(duì)Transaction的支持。在一般的情況下,MSMQ通過Message Queue Transaction實(shí)現(xiàn)對(duì)Transaction的原生的支持,借助Message Queue Transaction,可以把基于一個(gè)或多個(gè)Message Queue的相關(guān)操作納入同一個(gè)Transaction中。
Message Queue Transaction僅僅限于基于Message Queue的操作,倘若操作涉及到另外一些資源,比如SQL Server, 則可以使用基于DTC的分布式Transaction。
對(duì)于WCF中MSMQ,由于Client和Service的相對(duì)獨(dú)立(可能Client發(fā)送Message到Service處理Message會(huì)相隔很長一段時(shí)間),所以Client和Service的操作只能納入不同的Transaction中,如下圖。
四、Sample1:NetMsmqBinding
我們首先做一個(gè)基于NetMsmqBinding Sample,實(shí)現(xiàn)的功能就是我們開篇所提出的Order Delivery。我們說過,NetMsmqBinding和一般的binding在實(shí)現(xiàn)的功能和變成模式上完全一樣。下面是我們熟悉的4層結(jié)構(gòu):
1.Contract
DataContract:Order & OrderItem
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.QueuedService.Contract
{
[DataContract]
[KnownType(typeof(OrderItem))]
public class Order
{
Private Fields#region Private Fields
private Guid _orderNo;
private DateTime _orderDate;
private Guid _supplierID;
private string _supplierName;
private IList<OrderItem> _orderItems;
#endregion
Constructors#region Constructors
public Order()
{
this._orderItems = new List<OrderItem>();
}
public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
{
this._orderNo = orderNo;
this._orderDate = orderDate;
this._supplierID = supplierID;
this._supplierName = supplierName;
this._orderItems = new List<OrderItem>();
}
#endregion
Public Properties#region Public Properties
[DataMember]
public Guid OrderNo
{
get { return _orderNo; }
set { _orderNo = value; }
}
[DataMember]
public DateTime OrderDate
{
get { return _orderDate; }
set { _orderDate = value; }
}
[DataMember]
public Guid SupplierID
{
get { return _supplierID; }
set { _supplierID = value; }
}
[DataMember]
public string SupplierName
{
get { return _supplierName; }
set { _supplierName = value; }
}
[DataMember]
public IList<OrderItem> OrderItems
{
get { return _orderItems; }
set { _orderItems = value; }
}
#endregion
Public Methods#region Public Methods
public override string ToString()
{
string description = string.Format("General Informaion:\n\tOrder No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
StringBuilder productList = new StringBuilder();
productList.AppendLine("\nProducts:");
int index = 0;
foreach (OrderItem item in this._orderItems)
{
productList.AppendLine(string.Format("\n{4}. \tNo.\t\t: {0}\n\tName\t\t: {1}\n\tPrice\t\t: {2}\n\tQuantity\t: {3}", item.ProductID, item.ProductName, item.UnitPrice, item.Quantity, ++index));
}
return description + productList.ToString();
}
#endregion
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.QueuedService.Contract
{
[DataContract]
public class OrderItem
{
Private Fields#region Private Fields
private Guid _productID;
private string _productName;
private decimal _unitPrice;
private int _quantity;
#endregion
Constructors#region Constructors
public OrderItem()
{ }
public OrderItem(Guid productID, string productName, decimal unitPrice, int quantity)
{
this._productID = productID;
this._productName = productName;
this._unitPrice = unitPrice;
this._quantity = quantity;
}
#endregion
Public Properties#region Public Properties
[DataMember]
public Guid ProductID
{
get { return _productID; }
set { _productID = value; }
}
[DataMember]
public string ProductName
{
get { return _productName; }
set { _productName = value; }
}
[DataMember]
public decimal UnitPrice
{
get { return _unitPrice; }
set { _unitPrice = value; }
}
[DataMember]
public int Quantity
{
get { return _quantity; }
set { _quantity = value; }
}
#endregion
}
}
ServiceContract: IOrderProcessor
using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.QueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order);
}
}
2.Service:IOrderProcessor:
using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
namespace Artech.QueuedService.Service
{
public class OrderProcessorService:IOrderProcessor
{
ISubmitOrder Members#region ISubmitOrder Members
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void Submit(Order order)
{
Orders.Add(order);
Console.WriteLine("Receive an order.");
Console.WriteLine(order.ToString());
}
#endregion
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
namespace Artech.QueuedService.Service
{
public static class Orders
{
private static IDictionary<Guid, Order> _orderList = new Dictionary<Guid, Order>();
public static void Add(Order order)
{
_orderList.Add(order.OrderNo, order);
}
public static Order GetOrder(Guid orderNo)
{
return _orderList[orderNo];
}
}
}
3.Hosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
<message clientCredentialType="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.QueuedService.Service. OrderProcessorService">
<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor" />
</service>
</services>
</system.serviceModel>
</configuration>
在默認(rèn)的情況下,netMsmqBinding 的msmqAuthenticationMode為WindowsDomain,由于基于WindowsDomain必須安裝AD,利于在本機(jī)模擬,我把msmqAuthenticationMode改為None,相應(yīng)的ProtectionLevel和clientCredentialType改為None。
Program:
using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.ServiceModel;
using Artech.QueuedService.Service;
namespace Artech.QueuedService.Hosting
{
class Program
{
static void Main(string[] args)
{
string path = @".\private$\orders";
if(!MessageQueue.Exists(path))
{
MessageQueue.Create(path,true);
}
using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
{
host.Opened += delegate
{
Console.WriteLine("Service has begun to listen\n\n");
};
host.Open();
Console.Read();
}
}
}
}
在Host Service之前,通過MessageQueue.Create創(chuàng)建一個(gè)Message Queue,第二個(gè)參數(shù)為表明Queue是否支持Transaction的indicator,這里支持Transaction。
4.Client:
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
<message clientCredentialType="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<client>
<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor"
name="defaultEndpoint" />
</client>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
using System.Transactions;
namespace Artech.QueuedService.Client
{
class Program
{
static void Main(string[] args)
{
ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
IOrderProcessor channel = channelFactory.CreateChannel();
Order order = new Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(),"A Company");
order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"PC",5000,20));
order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"Printer",7000,2));
Console.WriteLine("Submit order to server");
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
{
channel.Submit(order);
scope.Complete();
}
Console.Read();
}
}
}
先后運(yùn)行Host和Client,Host端有下面的輸出: