我最近一直在与“反应框架”一起做了一些工作,到目前为止绝对喜欢.我正在考虑用一些过滤的IObservables替换传统的轮询消息队列,以清理我的服务器操作.以旧的方式,我处理了像这样进入服务器的消息:
// Start spinning the process message loop Task.Factory.StartNew(() => { while (true) { Command command = m_CommandQueue.Take(); ProcessMessage(command); } },TaskCreationOptions.LongRunning);@H_403_4@这导致连续轮询线程将命令从客户机委派到ProcessMessage方法,其中我有一系列if / else-if语句来确定命令的类型,并根据其类型委派工作 @H_403_4@我正在用一个事件驱动的系统替换这个,使用了Reactive,我写了以下代码:
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>(); private IObservable<BesiegedMessage> m_MessagePublisher; m_MessagePublisher = m_MessageQueue .GetConsumingEnumerable() .ToObservable(TaskPoolScheduler.Default); // All generic Server messages (containing no properties) will be processed here IDisposable genericServerMessageSubscriber = m_MessagePublisher .Where(message => message is GenericServerMessage) .Subscribe(message => { // do something with the generic server message here }@H_403_4@我的问题是,尽管如此,使用阻塞集合作为IObservable的支持是很好的做法吗?我没有看到Take()曾经被称为这种方式,这使我认为消息将在处理完毕后不被删除而在队列上堆积? @H_403_4@查看Subjects作为后备集合来驱动已经过滤的IObservables将会收到这些消息会更有效率吗?有没有我在这里错过的可能会有益于这个系统的架构?
解决方法
这里有一些直接从我的后方获得的东西 – 任何真正的解决方案都将取决于你的实际使用情况,但是这里是“最便宜的伪消息队列系统”:
@H_403_4@思想/动机:
@H_403_4@>有意识地暴露IObservable< T>使得订阅者可以执行他们想要的任何过滤/交叉订阅
>整个队列是无类型的,但是注册和发布是类型安全的(ish)
> YMMV与Publish()在哪里 – 尝试尝试移动它
一般来说,Subject是一个no-no,尽管在这种情况下,它确实会产生一些SIMPLE代码.
>可以“内部化”注册,以实际执行订阅,但是队列将需要管理创建的IDisposables – bah,让您的消费者处理它! @H_403_4@代码:
>整个队列是无类型的,但是注册和发布是类型安全的(ish)
> YMMV与Publish()在哪里 – 尝试尝试移动它
一般来说,Subject是一个no-no,尽管在这种情况下,它确实会产生一些SIMPLE代码.
>可以“内部化”注册,以实际执行订阅,但是队列将需要管理创建的IDisposables – bah,让您的消费者处理它! @H_403_4@代码:
public class TheCheapestPubSubEver { private Subject<object> _inner = new Subject<object>(); public IObservable<T> Register<T>() { return _inner.OfType<T>().Publish().RefCount(); } public void Publish<T>(T message) { _inner.OnNext(message); } }@H_403_4@用法:
void Main() { var queue = new TheCheapestPubSubEver(); var ofString = queue.Register<string>(); var ofInt = queue.Register<int>(); using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i))) using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s))) { queue.Publish("Foo"); queue.Publish(1); Console.ReadLine(); } }@H_403_4@输出:
A string! Foo An int! 1@H_403_4@然而,这并不是严格执行“消费消费者” – 特定类型的多个注册表将导致多个观察者呼叫 – 即:
var queue = new TheCheapestPubSubEver(); var ofString = queue.Register<string>(); var anotherOfString = queue.Register<string>(); var ofInt = queue.Register<int>(); using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i))) using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s))) using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}",s))) { queue.Publish("Foo"); queue.Publish(1); Console.ReadLine(); }@H_403_4@结果是:
A string! Foo Another string! Foo An int! 1