c# – 使用BlockingCollection的活动框架作为消息队列

前端之家收集整理的这篇文章主要介绍了c# – 使用BlockingCollection的活动框架作为消息队列前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我最近一直在与“反应框架”一起做了一些工作,到目前为止绝对喜欢.我正在考虑用一些过滤的IObservables替换传统的轮询消息队列,以清理我的服务器操作.以旧的方式,我处理了像这样进入服务器的消息:
// Start spinning the process message loop
   Task.Factory.StartNew(() =>
   {
       while (true)
       {
           Command command = m_CommandQueue.Take();
           ProcessMessage(command);
       }
   },TaskCreationOptions.LongRunning);

这导致连续轮询线程将命令从客户机委派到ProcessMessage方法,其中我有一系列if / else-if语句来确定命令的类型,并根据其类型委派工作

我正在用一个事件驱动的系统替换这个,使用了Reactive,我写了以下代码

private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
 private IObservable<BesiegedMessage> m_MessagePublisher;

 m_MessagePublisher = m_MessageQueue
       .GetConsumingEnumerable()
       .ToObservable(TaskPoolScheduler.Default);

        // All generic Server messages (containing no properties) will be processed here
 IDisposable genericServerMessageSubscriber = m_MessagePublisher
       .Where(message => message is GenericServerMessage)
       .Subscribe(message =>
       {
           // do something with the generic server message here
       }

我的问题是,尽管如此,使用阻塞集合作为IObservable的支持是很好的做法吗?我没有看到Take()曾经被称为这种方式,这使我认为消息将在处理完毕后不被删除而在队列上堆积?

查看Subjects作为后备集合来驱动已经过滤的IObservables将会收到这些消息会更有效率吗?有没有我在这里错过的可能会有益于这个系统的架构?

解决方法

这里有一些直接从我的后方获得的东西 – 任何真正的解决方案都将取决于你的实际使用情况,但是这里是“最便宜的伪消息队列系统”:

思想/动机:

>有意识地暴露IObservable< T>使得订阅者可以执行他们想要的任何过滤/交叉订阅
>整个队列是无类型的,但是注册和发布是类型安全的(ish)
> YMMV与Publish()在哪里 – 尝试尝试移动它
一般来说,Subject是一个no-no,尽管在这种情况下,它确实会产生一些SIMPLE代码.
>可以“内部化”注册,以实际执行订阅,但是队列将需要管理创建的IDisposables – bah,让您的消费者处理它!

代码

public class TheCheapestPubSubEver
{    
    private Subject<object> _inner = new Subject<object>();

    public IObservable<T> Register<T>()
    {
        return _inner.OfType<T>().Publish().RefCount();
    }
    public void Publish<T>(T message)
    {
        _inner.OnNext(message);
    }
}

用法

void Main()
{
    var queue = new TheCheapestPubSubEver();

    var ofString = queue.Register<string>();
    var ofInt = queue.Register<int>();

    using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i)))
    using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s)))
    {
        queue.Publish("Foo");
        queue.Publish(1);
        Console.ReadLine();
    }
}

输出

A string! Foo
An int! 1

然而,这并不是严格执行“消费消费者” – 特定类型的多个注册表将导致多个观察者呼叫 – 即:

var queue = new TheCheapestPubSubEver();

var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();

using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}",s)))

{
    queue.Publish("Foo");
    queue.Publish(1);
    Console.ReadLine();
}

结果是:

A string! Foo
Another string! Foo
An int! 1
原文链接:https://www.f2er.com/csharp/95179.html

猜你在找的C#相关文章