Sunday, January 28, 2007

Thread safe TStack (TThreadStack)

I've moved my blog and it's post to my new blog, please go to Thread safe TStack (TThreadStack) on Landman Code Recently i've been doing a lot of multithreading, and luckily Delphi provides with a handy bunch of classes to make the developers life easier. You've got some basic synchronization classes (TMutex, TEvent, TCriticalSection, TMultiReadExclusiveWriteSynchronizer...) and an basic data container (TThreadList). But for one program I needed a thread safe stack. In this post I will describe how I created my own.

There is not thread safe stack in Borland Turbo Delphi 2006, so off course I started with a Google for delphi TThreadStack, which at this time gives zero results. Searching the newsgroups I found an interesting group called comp.programming.threads, searching that group I found one Pascal Lock-Free stack (which basically means not using a critical section, or any other mechanism, to lock the data). But after testing it, FastMM pointed out an memory leak. I provided the author with a test case to cause the memory leak, but after 5 months no reply.

But I'm a hard person to please, and although it is indeed lock free, I was wondering if in my situation this lock-free solution wasn't to complicated and perhaps slower. So I created an simple TThreadStack from looking at the principle of the TThreadList (Very simple!), and compared it with the lock-free solution.

The source demonstrates is the test project.

program Project2;

{$APPTYPE CONSOLE}

uses
  FastMM4,
  unThreadStack,
  FreeStack, Math,
  Windows, SysUtils, Classes;
const
  NumberOfAllocations = 1000;
  PopTimeOut = 1;
  PushTimeOut = 1;
  NumberOfThreads = 8; // must be an multiple of 4
  NumberOfTest = 10;
type
  TTestRec = packed record
    BigField: array[0..254] of Char;
    SmallerField: Extended;
    SmallField: Byte;
  end;
  PTestRec = ^TTestRec;

  TFreeStackThread = class(TThread)
    FPopper: Boolean;
    FDestination: TFreeStack;
    FFinished: THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(ADestination: TFreeStack; APopper: Boolean; AFinished: THandle);
  end;

  TThreadStackThread = class(TThread)
    FPopper: Boolean;
    FDestination: TThreadStack;
    FFinished: THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(ADestination: TThreadStack; APopper: Boolean; AFinished: THandle);
  end;

  { TThreadStackThread }

constructor TThreadStackThread.Create(ADestination: TThreadStack;
  APopper: Boolean; AFinished: THandle);
begin
  FDestination := ADestination;
  FPopper := APopper;
  FFinished := AFinished;
  inherited Create(False);
end;

procedure TThreadStackThread.Execute;
var
  TempStack: TStack;
  p: PTestRec;
  counter: Int64;
  temp: LongWord;
begin
  counter := 0;
  while (not Terminated) and (counter < NumberOfAllocations) do
  begin
    TempStack := FDestination.LockStack;
    try
      if FPopper then
      begin
        if TempStack.Count > 0 then
        begin
          Dispose(PTestRec(TempStack.Pop));
          inc(counter);
        end;
      end
      else
      begin
        New(p);
        TempStack.Push(p);
        inc(counter);
      end;
    finally
      FDestination.UnlockStack;
    end;
    if FPopper then
      Sleep(PopTimeOut)
    else
      Sleep(PushTimeOut);
  end;
  ReleaseSemaphore(FFinished, 1, @temp);
end;

{ TFreeStackThread }

constructor TFreeStackThread.Create(ADestination: TFreeStack; APopper: Boolean; AFinished: THandle);
begin
  FDestination := ADestination;
  FPopper := APopper;
  FFinished := AFinished;
  inherited Create(False);
end;

procedure TFreeStackThread.Execute;
var
  p: PTestRec;
  counter: Int64;
  temp: LongWord;
begin
  counter := 0;
  p := nil;
  while (not Terminated) and (counter < NumberOfAllocations) do
  begin
    if FPopper then
    begin
      if FDestination.Count > 0 then
      begin
        if FDestination.Pop(TObject(p)) then
        begin
          Dispose(p);
          inc(counter);
        end;
      end;
    end
    else
    begin
      if p = nil then
        New(p);
      if FDestination.Push(TObject(p)) then
      begin
        p := nil;
        inc(counter);
      end;
    end;
    if FPopper then
      Sleep(PopTimeOut)
    else
      Sleep(PushTimeOut);
  end;
  ReleaseSemaphore(FFinished, 1, @temp);
end;

var
  AThreadStacksTests: array[0..NumberOfThreads - 1] of TThreadStackThread;
  AFreeStacksTests: array[0..NumberOfThreads - 1] of TFreeStackThread;
  i, j: Integer;
  Start, Stop, Freq: Int64;
  ThreadStacks: array[0..1] of TThreadStack;
  FreeStacks: array[0..1] of TFreeStack;
  ResultThread: array[0..NumberOfTest - 1] of Double;
  ResultFree: array[0..NumberOfTest - 1] of Double;
  Finished: THandle;
  Mean, StdDev: Extended;
begin
  ThreadStacks[0] := TThreadStack.Create;
  ThreadStacks[1] := TThreadStack.Create;
  FreeStacks[0] := TFreeStack.Create;
  FreeStacks[1] := TFreeStack.Create;
  Finished := CreateSemaphore(nil, 0, NumberOfThreads, 'Thread runners');
  Writeln('Starting ThreadStack threads');
  for j := 0 to NumberOfTest - 1 do
  begin
    QueryPerformanceCounter(Start);
    for I := 0 to NumberOfThreads - 1 do
      AThreadStacksTests[i] := TThreadStackThread.Create(ThreadStacks[i mod 2], (i mod 4) >= 2, Finished);
    for I := 0 to NumberOfThreads - 1 do
      WaitForSingleObject(Finished, INFINITE);
    QueryPerformanceCounter(Stop);
    for I := 0 to NumberOfThreads - 1 do
      AThreadStacksTests[i].Free;
    ResultThread[j] := Stop - Start;
  end;
  Writeln('ThreadStack done.');
  Writeln('Starting TFreeStack threads');
  for j := 0 to NumberOfTest - 1 do
  begin
    QueryPerformanceCounter(Start);
    for I := 0 to NumberOfThreads - 1 do
      AFreeStacksTests[i] := TFreeStackThread.Create(FreeStacks[i mod 2], (i mod 4) >= 2, Finished);
    for I := 0 to NumberOfThreads - 1 do
      WaitForSingleObject(Finished, INFINITE);
    QueryPerformanceCounter(Stop);
    for I := 0 to NumberOfThreads - 1 do
      AFreeStacksTests[i].Free;
    ResultFree[j] := Stop - Start;
  end;
  Writeln('TFreeStack done.');
  Writeln(Format('Calculating the mean and the standard deviation out of %d runs.', [NumberOfTest]));
  QueryPerformanceFrequency(Freq);
  MeanAndStdDev(ResultThread, Mean, StdDev);
  Writeln(Format('TThreadStack: %f (%f)', [Mean, StdDev]));
  Writeln(Format('TThreadStack: %fms (%fms)', [Mean / (Freq / 1000), StdDev / (Freq / 1000)]));
  MeanAndStdDev(ResultFree, Mean, StdDev);
  Writeln(Format('TFreeStack:   %f (%f)', [Mean, StdDev]));
  Writeln(Format('TFreeStack:   %fms (%fms)', [Mean / (Freq / 1000), StdDev / (Freq / 1000)]));
  if DebugHook <> 0 then
    Readln;
  { freeing everything}
  ThreadStacks[0].Free;
  ThreadStacks[1].Free;
  FreeStacks[0].Free;
  FreeStacks[1].Free;
  CloseHandle(Finished);
end.


This one also creates the memory leak. Running the code on a P4 2.26 returns this result:

Starting ThreadStack threads
ThreadStack done.
Starting TFreeStack threads
TFreeStack done.
Calculating the mean and the standard deviation out of 10 runs.
TThreadStack: 7152374,20 (26574,65)
TThreadStack: 1998,12ms (7,42ms)
TFreeStack: 7339945,80 (310570,01)
TFreeStack: 2050,52ms (86,76ms)

So the TFreestack is all most the same speed as the simple critical section based TThreadStack, although I know now the test isn't what you'd call regular, because it's constantly trying to push and pop. So perhaps a more normal situation would result differently. But in my program this situation was expected.

I also liked the simplicity of the TThreadStack above the complexity of the FreeStack, were you had to compile a piece TASM (containing the CAS) for it to work. Another show stopper was the memory leak. But actually I just wanted to post my simple TThreadStack so that next time I have to use it, I can just wander to my own blog. So without further ado, I bring you the following source.

unit unThreadStack;

interface
uses
  Windows, Contnrs;
type
  TStack = Contnrs.TStack;
  TThreadStack = class
  private
    FStack: TStack;
    FLock :TRTLCriticalSection;
  public
    constructor Create();
    destructor Destroy;  override;
    function LockStack : TStack;
    procedure UnlockStack;
    function Count: Integer;
    function Push(AItem: Pointer): Pointer;
    function Pop: Pointer;
    function Peek: Pointer;
  end;

implementation

{ TThreadStack }

function TThreadStack.LockStack: TStack;
begin
  EnterCriticalSection(FLock);
  Result := FStack;
end;

function TThreadStack.Count: Integer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Count;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

constructor TThreadStack.Create();
begin
  inherited Create();
  InitializeCriticalSection(FLock);
  FStack := Contnrs.TStack.Create;
end;

destructor TThreadStack.Destroy;
begin
  DeleteCriticalSection(FLock);
  FStack.Free;
  inherited;
end;

function TThreadStack.Peek: Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Peek;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

function TThreadStack.Pop: Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Pop;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

function TThreadStack.Push(AItem: Pointer): Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Push(AItem);
  finally
    LeaveCriticalSection(FLock);
  end;
end;

procedure TThreadStack.UnlockStack;
begin
  LeaveCriticalSection(FLock);
end;

end.

If you use this source, I would like it if you just left a comment on this blog.