最近系統考量可靠性問題,在中間加上一層 Message Queue(MQ)的架構,採用的是目前最多人使用的RabbitMQ當作server服務。順便直接以百萬筆資料為單位塞入MQ來測試一下效能。
Producer程式
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();//引用stopwatch物件
sw.Reset();//碼表歸零
sw.Start();//碼表開始計時
//
string queue = "info";
try
{
var factory = new RabbitMQ.Client.ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
bool durable = true;
channel.QueueDeclare(queue, durable, false, false, null);
RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
for (int i = 1; i <= 100; i++)
{
LogInfo info = new LogInfo();
info.SYS_ID = "系統";
info.COMPANY_ID = "公司";
info.STORE_ID = "店別";
info.Content = "訊息";
string m1 = Newtonsoft.Json.JsonConvert.SerializeObject(info);
var body = Encoding.UTF8.GetBytes(m1);
channel.BasicPublish("", queue, properties, body);
}
}
}
}
catch (System.Exception ex)
{
Console.WriteLine(ex.ToString());
}
sw.Stop();
string result1 = sw.Elapsed.TotalMilliseconds.ToString();
Console.WriteLine("共花費:" + result1 + "毫秒");
Console.ReadLine();
Consumer端程式
string queue = "info";
try
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue, true, false, false, null); // 定義處理那一個queue
channel.BasicQos(0, 1, false); // 每次處理1則
var consumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel);
// 定義收到queue的內容處理方式
consumer.Received += (sender, e) =>
{
byte[] body = e.Body.ToArray();
string message1 = Encoding.UTF8.GetString(body);
LogInfo log = JsonConvert.DeserializeObject<LogInfo>(message1); // 將queue中的json轉回物件
// 以下可以更改為自己要處理的事項
Console.WriteLine(log.DateTime.ToString("yyyy/MM/dd HH:mm:ss:FFF")+" "+log.Content ); // 先顯示畫面上
//
channel.BasicAck(e.DeliveryTag, false); // 處理完手動回應
};
channel.BasicConsume(queue, false, consumer); // 開始處理
Console.ReadLine();
connection.Close();
channel.Close();
}
catch (System.Exception ex)
{
Console.WriteLine(ex.ToString());
}