delphi – TThreadedQueue不能多个消费者?

前端之家收集整理的这篇文章主要介绍了delphi – TThreadedQueue不能多个消费者?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
试图在单个生产者多消费者方案中使用TThreadedQueue(Generics.Collections)。 (Delphi-XE)。
想法是将对象推入队列,并让几个工作线程排空队列。

它不能按预期工作,但。
当两个或多个工作线程调用PopItem时,会从TThreadedQueue抛出访问冲突。

如果PopItem的调用序列化了一个临界区,一切都很好。

当然TThreadedQueue应该能够处理多个消费者,所以我缺少的东西或者这是一个纯的bug在TThreadedQueue?

这里是一个简单的例子来产生错误

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',Windows,Messages,Classes,SysUtils,SyncObjs,Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName,': ',E.Message);
        ReadLn;
      end;
  end;
end.

更新:导致TThreadedQueue崩溃的TMonitor中的错误是在Delphi XE2中修复的。

更新2:上面的测试强调队列在空状态。 Darian Miller发现,在满状态下强调队列,仍然可以再现XE2中的错误错误再次出现在TMonitor中。有关更多信息,请参阅下面的答案。还有一个链接到QC101114。

更新3:
有了Delphi-XE2更新4,有一个宣布的修正TMonitor将解决问题在TThreadedQueue。我的测试到目前为止不能再现任何错误在TThreadedQueue了。
当队列为空且已满时,测试单个生产者/多个消费者线程。
还测试了多个生产者/多个消费者。我改变了读者线程和写线程从1到100没有任何毛刺。但是知道历史,我敢于别人打破TMonitor。

解决方法

嗯,很难确定没有大量的测试,但它肯定看起来像是一个错误,无论是在TThreadedQueue或在TMonitor。无论哪种方式,它在RTL而不是你的代码。您应该将此文件作为QC报告,并使用上面的示例作为“如何再现”代码

猜你在找的Delphi相关文章