我正在尝试ZeroMQ,并试图找到一些工作.我的第一个想法是使用inproc传输设置一个REP / REQ,看看我是否可以在两个线程之间发送消息.以下代码大部分来自clzmq示例,但似乎不起作用.
服务器和客户端都绑定到传输,但是当客户端尝试执行发送它并且只是坐在那里.我没有ZeroMQ经验,所以我不知道在哪里先看,任何帮助将不胜感激.这是违规(冒犯性)代码:
using System; using System.Diagnostics; using System.Threading; using NUnit.Framework; using ZMQ; namespace PostBox { [TestFixture] public class Class1 { private const string Address = "inproc://test"; private const uint MessageSize = 10; private const int RoundtripCount = 100; [Test] public void Should() { var clientThread = new Thread(StartClient); clientThread.Start(); var serverThread = new Thread(StartServer); serverThread.Start(); clientThread.Join(); serverThread.Join(); Console.WriteLine("Done with life"); } private void StartServer() { // Initialise 0MQ infrastructure using (var ctx = new Context(1)) { using (var skt = ctx.Socket(SocketType.REP)) { skt.Bind(Address); Console.WriteLine("Server has bound"); // Bounce the messages. for (var i = 0; i < RoundtripCount; i++) { var msg = skt.Recv(); Debug.Assert(msg.Length == MessageSize); skt.Send(msg); } Thread.Sleep(1000); } } Console.WriteLine("Done with server"); } private void StartClient() { Thread.Sleep(2000); // Initialise 0MQ infrastructure using (var ctx = new Context(1)) { using (var skt = ctx.Socket(SocketType.REQ)) { skt.Bind(Address); Console.WriteLine("Client has bound"); // Create a message to send. var msg = new byte[MessageSize]; // Start measuring the time. var watch = new Stopwatch(); watch.Start(); // Start sending messages. for (var i = 0; i < RoundtripCount; i++) { skt.Send(msg); msg = skt.Recv(); Debug.Assert(msg.Length == MessageSize); Console.Write("."); } // Stop measuring the time. watch.Stop(); var elapsedTime = watch.ElapsedTicks; // Print out the test parameters. Console.WriteLine("message size: " + MessageSize + " [B]"); Console.WriteLine("roundtrip count: " + RoundtripCount); // Compute and print out the latency. var latency = (double)(elapsedTime) / RoundtripCount / 2 * 1000000 / Stopwatch.Frequency; Console.WriteLine("Your average latency is {0} [us]",latency.ToString("f2")); } } Console.WriteLine("Done with client"); } } }
编辑:
我在下面的答案的帮助下得到了这个工作,但是它也需要我改变一个绑定到一个连接,这是有道理的,当你考虑到,因为我们有一个服务器绑定到本地传输和客户端连接到远程运输.以下是更新的代码:
using System; using System.Diagnostics; using System.Threading; using NUnit.Framework; using ZMQ; namespace PostBox { [TestFixture] public class Class1 { private const string Address = "inproc://test"; private const uint MessageSize = 10; private const int RoundtripCount = 100; private static Context ctx; [Test] public void Should() { using (ctx = new Context(1)) { var clientThread = new Thread(StartClient); clientThread.Start(); var serverThread = new Thread(StartServer); serverThread.Start(); clientThread.Join(); serverThread.Join(); Console.WriteLine("Done with life"); } } private void StartServer() { try { using (var skt = ctx.Socket(SocketType.REP)) { skt.Bind(Address); Console.WriteLine("Server has bound"); // Bounce the messages. for (var i = 0; i < RoundtripCount; i++) { var msg = skt.Recv(); Debug.Assert(msg.Length == MessageSize); skt.Send(msg); } Thread.Sleep(1000); } Console.WriteLine("Done with server"); } catch (System.Exception e) { Console.WriteLine(e.Message); } } private void StartClient() { Thread.Sleep(2000); try { // Initialise 0MQ infrastructure using (var skt = ctx.Socket(SocketType.REQ)) { skt.Connect(Address); Console.WriteLine("Client has bound"); // Create a message to send. var msg = new byte[MessageSize]; // Start measuring the time. var watch = new Stopwatch(); watch.Start(); // Start sending messages. for (var i = 0; i < RoundtripCount; i++) { skt.Send(msg); msg = skt.Recv(); Debug.Assert(msg.Length == MessageSize); Console.Write("."); } // Stop measuring the time. watch.Stop(); var elapsedTime = watch.ElapsedTicks; // Print out the test parameters. Console.WriteLine("message size: " + MessageSize + " [B]"); Console.WriteLine("roundtrip count: " + RoundtripCount); // Compute and print out the latency. var latency = (double)(elapsedTime) / RoundtripCount / 2 * 1000000 / Stopwatch.Frequency; Console.WriteLine("Your average latency is {0} [us]",latency.ToString("f2")); } Console.WriteLine("Done with client"); } catch (System.Exception e) { Console.WriteLine(e.Message); } } } }
解决方法
我相信,两个线程都需要使用相同的上下文. Zeromq指南建议不要在一个进程中使用多个上下文.
创建上下文,在两个线程之间共享上下文.这应该工作.
创建上下文,在两个线程之间共享上下文.这应该工作.
从http://zguide.zeromq.org/chapter:all
You MUST create a ‘context’ object for your process,and pass that to all threads. The context collects ØMQ’s state. To create a connection across the inproc: transport,both server and client thread must share the same context object.