Bei meinem ThreadManager Code werden Threads durch von BaseWorkItem abgeleiteten Klassen repräsentiert. ThreadManager macht so einiges um verschiedene Arten von Threads zu verwalten. Ist aber alles sehr alt und nicht so OOP-konform wie mir jetzt lieb wäre, und wahrscheinlich sowieso nicht mehr wirklich zeitgemäß.
using System;
using System.Threading;
using System.Collections;
using System.Collections.Specialized;
using System.Web;
using System.Diagnostics;
using System.Globalization;
using DSNTS.BaseServices;
namespace DSNTS.ExtendedServices
{
/// <summary>
/// ThreadManager encapsulates thread processing for all kinds of threads
/// </summary>
[CodeCategory("ThreadMgr")]
public sealed class SThreadManager : IObjectStats
{
// This is the global thread manager singleton code
private static SThreadManager __threadMgr = null;
public static void CreateThreadManager(int reservedThreadsCount, int _
maxSystemThreads)
{
if (__threadMgr==null) __threadMgr = new SThreadManager( _
reservedThreadsCount, maxSystemThreads);
} // CreateThreadManager()
public static SThreadManager ThreadManager {get{return __threadMgr;}}
// Allow only a single instance of ThreadManager
private SThreadManager() {}
private SThreadManager(int reservedThreadsCount, int maxSystemThreads)
{
_reservedThreadsCount = reservedThreadsCount;
_maxSystemThreads = maxSystemThreads;
if (SObjectStats.ObjectStats!=null) SObjectStats.ObjectStats.RegisterObject( _
"ThreadManager", this as IObjectStats);
}
private static int _maxSystemThreads = 10;
public static long GetWorkItemId()
{
return Interlocked.Increment(ref _workItemNum);
} // GetWorkItemId()
// Application closedown flag and event
private bool _closeDown = false;
public bool CloseDown
{
get {return _closeDown;}
}
ManualResetEvent _cdevent = new ManualResetEvent(false);
ManualResetEvent _cdevent1 = null;
// Queue of waiting work items
private Queue _workItems = new Queue(100);
// System/timer/pool threads running
private Hashtable _runningThreads = new Hashtable(_maxSystemThreads);
// Timer threads
private Hashtable _timerThreads = new Hashtable(5);
// Stats stuff
// Threading Counters
private short _runningSystemThreadsCount = 0;
private short _maxRunningSystemThreadsCount = 0;
private short _runningThreadsCount = 0;
private short _maxRunningThreadsCount = 0;
// Each work item gets a unique id (number)
private static long _workItemNum = 0;
// Note: queued worker count is the count of threads which have been passed _
to threadpool
// but have not yet started execution.
private int _queuedWorkerThreadsCount = 0;
private int _maxQueuedWorkerThreadsCount = 0;
private int _runningWorkerThreadsCount = 0;
private int _maxRunningWorkerThreadsCount = 0;
private int _maxHeldWorkerThreadsCount = 0;
private int _queuedTimerThreadsCount = 0;
private int _maxQueuedTimerThreadsCount = 0;
private int _runningTimerThreadsCount = 0;
private int _maxRunningTimerThreadsCount = 0;
private int _reservedThreadsCount = 2;
// Application post closedown (interval in seconds)
public void DoCloseDown1(int interval)
{
if (TC.TraceOn) TC.Trace(1, "Thread manager stopping active threads at:" & _
"{tsms}");
_closeDown = true;
ListActiveThreads();
lock(_runningThreads.SyncRoot)
{
if (TC.TraceOn) TC.Trace(1, "Thread manager running count 1: {0}", _
_runningSystemThreadsCount);
foreach(DictionaryEntry de in _runningThreads)
{
try
{
if (de.Value is BaseThread)
{
BaseThread ct = (BaseThread)de.Value;
ct.CloseDown();
}
}
catch(System.Exception ex)
{EH.UE(ex);}
}
if (TC.TraceOn) TC.Trace(1, "Thread manager running count 2: {0}", _
_runningSystemThreadsCount);
_cdevent.Set();
scheduleQueuedPoolThreads();
foreach(DictionaryEntry de in _timerThreads)
{
if (de.Value is TimerThread)
{
TimerThread ct = (TimerThread)de.Value;
ct.KillTimer();
}
}
_timerThreads.Clear();
ListActiveThreads();
if (_runningThreadsCount>0) _cdevent1 = new ManualResetEvent(false);
if (TC.TraceOn) TC.Trace(1, "Thread manager running count 3: {0}", _
_runningSystemThreadsCount);
} // lock()
// Wait for all threads to end
if (_cdevent1!=null) _cdevent1.WaitOne(interval*1000, false);
if (TC.TraceOn)
{
lock(_runningThreads.SyncRoot)
{
ListActiveThreads();
TC.Trace(1, "Thread manager running count 4: {0}", _
_runningSystemThreadsCount);
foreach(DictionaryEntry de in _runningThreads)
{
if (de.Value is SystemThread)
{
BaseThread ct = (BaseThread)de.Value;
TC.Trace(1, "Thread manager closedown (1) still running: {0}", _
ct.WI.ProcessId);
}
}
}
TC.Trace(1, "Thread manager closedown complete at:" & _
"{tsms}"+DateTime.Now.ToLongTimeString());
}
} // DoCloseDown1()
public void DoCloseDown2(int interval)
{
_closeDown = true;
if (TC.TraceOn) TC.Trace(1, "Thread manager aborting active threads at:" & _
"{tsms}");
lock(_runningThreads.SyncRoot)
{
foreach(DictionaryEntry de in _runningThreads)
{
if (de.Value is SystemThread)
{
SystemThread cst = (SystemThread)de.Value;
cst.Abort();
}
}
if (_runningThreadsCount>0) _cdevent1 = new ManualResetEvent(false);
if (TC.TraceOn) TC.Trace(1, "Thread manager aborting active threads" & _
"complete at: {tsms}");
}
// Wait for all threads to exit
if (_cdevent1==null) _cdevent1.WaitOne(interval*1000, false);
ListActiveThreads();
if (TC.TraceOn) TC.Trace(1, "Thread manager running count 5: {0}", _
_runningSystemThreadsCount);
} // DoCloseDown2()
// General WorkItem Thread Scheduler
public BaseThread RunWorkItem(IWorkItem wi)
{
switch(wi.ThreadType)
{
case WorkItemThreadType.Pool:
IPoolWorkItem pwi = wi as IPoolWorkItem;
if (pwi==null) return null;
QueueWorkItem(pwi);
break;
case WorkItemThreadType.System:
ISystemWorkItem swi = wi as ISystemWorkItem;
if (swi==null) return null;
return StartSystemThread(wi, swi.Start, swi.Wait);
case WorkItemThreadType.Timed:
ITimedWorkItem twi = wi as ITimedWorkItem;
if (twi==null) return null;
return StartTimerThread(wi, twi.DelayMilliseconds, twi.RepeatMilliseconds);
}
return null;
}
// System Threads
// Start a ThreadManager System Thread
// A system thread is (meant to be) a permanently running thread
// The number of such threads is limited (for performance reasons)
// to the _maxSystemThreads value defined somewhere above.
// Each such thread is named by its WorkItem.ProcessId value, and each such _
thread
// should therefore have a unique process id.
public SystemThread StartSystemThread(IWorkItem wi, bool start, bool wait)
{
SystemThread t = null;
// Add thread to collection
lock(_runningThreads.SyncRoot)
{
if (_runningThreads[wi.ProcessId]==null)
{
if (_runningSystemThreadsCount<_maxSystemThreads)
{
int tn = ++_runningSystemThreadsCount;
if (tn>_maxRunningSystemThreadsCount)
_maxRunningSystemThreadsCount = _runningSystemThreadsCount;
tn = ++_runningThreadsCount;
if (tn>_maxRunningThreadsCount)
_maxRunningThreadsCount = _runningThreadsCount;
t = new SystemThread(wi);
if (TC.TraceOn) TC.Trace(1, "System thread created: {0} current count:" & _
"{1} {tsms}",wi, _runningSystemThreadsCount);
_runningThreads[wi.ProcessId] = t;
t.T.Priority = wi.Priority;
}
else
if (TC.TraceOn) TC.Trace(1, "System thread request: {0} would exceed max" & _
"count: {1} {tsms}", wi, _runningSystemThreadsCount);
}
else
if (TC.TraceOn) TC.Trace(1, "System thread request: {0} already running:" & _
"{1} {tsms}", wi, _runningSystemThreadsCount);
}
if (t==null)
throw new OperationException("Invalid system thread request for: {0}", wi);
if (start) t.Start();
if (wait) t.Wait();
return t;
} // StartSystemThread()
// Start of system thread registration
internal void RegisterSystemThread(SystemThread cst)
{
IWorkItem wi = cst.WI;
if (wi==null) return;
InitThread(wi);
if (TC.TraceOn) TC.Trace(1, "System thread started: {0} current count: {1}" & _
"{tsms}", wi, _runningSystemThreadsCount);
} // RegisterSystemThread()
// End of system thread deregistration
internal void DeregisterSystemThread(SystemThread cst)
{
IWorkItem wi = cst.WI;
if (wi==null) return;
lock(_runningThreads.SyncRoot)
{
try
{
_runningThreads.Remove(wi.ProcessId);
}
finally
{
_runningSystemThreadsCount--;
_runningThreadsCount--;
if (_closeDown&&(_runningThreadsCount==0)&&(_cdevent1!=null)) _
_cdevent1.Set();
if (TC.TraceOn) TC.Trace(1, "System thread ended {0} current count: {1}" & _
"{tsms}", wi, _runningSystemThreadsCount);
TermThread(wi);
}
}
} // DeregisterSystemThread()
// Common thread start/end
private void InitThread(IWorkItem wi)
{
if (wi.Trace) TC.InitThreadContext(wi.ContextData);
} // InitThread()
private void TermThread(IWorkItem wi)
{
if (wi.Trace) TC.TermThreadContext();
} // TermThread()
// Determine if a system thread is running
public bool IsSystemThreadRunning(string processId)
{
lock(_runningThreads.SyncRoot)
{
return _runningThreads[processId]!=null;
}
} // IsSystemThreadRunning()
// Wait for a while
public void ThreadWait(int seconds)
{
_cdevent.WaitOne(seconds*1000, false);
} // ThreadWait()
// Threadpool Threads
// Schedule a unit of work on a threadpool thread
public void QueueWorkItem(IWorkItem wi)
{
IPoolWorkItem pwi = wi as IPoolWorkItem;
if (pwi!=null&&pwi.PassThrough)
// if passthru from another threadpool thread just call DoWork()
wi.DoWork();
else
{
if (_closeDown) return;
PoolThread cpt = new PoolThread(wi);
// Queue a threadpoool item
lock(_runningThreads.SyncRoot)
{
if (pwi!=null&&pwi.ImmediateThread)
{
// If system option start thread immediately
if (TC.TraceOn) TC.Trace(1, "Queue WorkItem: {0}", wi);
incQWC();
cpt.Schedule();
}
else
{
// otherwise put it on queue
_workItems.Enqueue(cpt);
if (wi.Trace&&TC.TraceOn) TC.Trace(1, "Queue WorkItem held thread count:" & _
"{0}", _workItems.Count);
if (_workItems.Count>_maxHeldWorkerThreadsCount)
{
_maxHeldWorkerThreadsCount++;
if (wi.Trace&&TC.TraceOn) TC.Trace(1, "Queue WorkItem max held thread" & _
"count: {0}", _maxHeldWorkerThreadsCount);
}
// and see if anything can be scheduled now
try
{
scheduleQueuedPoolThreads();
}
catch(System.Exception ex)
{EH.UE(ex);}
}
}
}
} // QueueWorkItem()
// Start of worker thread registration
internal void RegisterWorker(PoolThread cpt)
{
IWorkItem wi = cpt.WI;
if (wi==null) return;
InitThread(wi);
lock(_runningThreads.SyncRoot)
{
_runningWorkerThreadsCount++;
_queuedWorkerThreadsCount--;
string id = wi.ProcessId+cpt.T.GetHashCode().ToString( _
CultureInfo.InvariantCulture);
_runningThreads[id] = cpt;
int tn = ++_runningThreadsCount;
if (tn>_maxRunningThreadsCount)
_maxRunningThreadsCount = _runningThreadsCount;
if (TC.TraceOn)
{
TC.Trace(1, "Work thread started: {0}", wi);
TC.Trace(1, "Activeworker count: {0}", _runningWorkerThreadsCount);
TC.Trace(1, "Queuedworker count: {0}", _queuedWorkerThreadsCount);
}
}
} // RegisterWorker()
// End of worker thread deregistration
internal void DeregisterWorker(PoolThread cpt)
{
IWorkItem wi = cpt.WI;
if (wi==null) return;
lock(_runningThreads.SyncRoot)
{
_runningWorkerThreadsCount--;
_runningThreadsCount--;
if (_closeDown&&(_runningThreadsCount==0)&&(_cdevent1!=null)) _cdevent1.Set( _
);
string id = wi.ProcessId+cpt.T.GetHashCode().ToString( _
CultureInfo.InvariantCulture);
if (TC.TraceOn) TC.Trace(1, "Worker thread ended: {0}", wi);
_runningThreads.Remove(id);
try
{
scheduleQueuedPoolThreads();
}
catch(System.Exception ex)
{EH.UE(ex);}
TC.Trace(1, "Activeworker count: {0}", _runningWorkerThreadsCount);
TC.Trace(1, "Queuedworker count: {0}", _queuedWorkerThreadsCount);
}
TermThread(wi);
} // DeregisterWorker()
// Increment queued worker count
private void incQWC()
{
_queuedWorkerThreadsCount++;
TC.Trace(1, "Queuedworker count: {0}", _queuedWorkerThreadsCount);
if (_queuedWorkerThreadsCount>_maxQueuedWorkerThreadsCount)
{
_maxQueuedWorkerThreadsCount++;
TC.Trace(1, "MaxQueuedworker count: {0}", _maxQueuedWorkerThreadsCount);
}
}
// Private schedule thread(s) in the waiting queue
// (Must be invoked from within lock(_runningThreads.SyncRoot))
private void scheduleQueuedPoolThreads()
{
// We assume this is called by another threadpool thread so use _
availableThreads+1
// to decide if we can schedule another thread. The only alternative would _
be if this was called from
// a ThreadManager SystemThread, but we do not (yet) cater for this _
possibility.
while ((availableThreads()+1)>_reservedThreadsCount&&_workItems.Count>0)
{
PoolThread cpt = (PoolThread)_workItems.Dequeue();
if (!_closeDown) // Flush entries if closedown
{
if (TC.TraceOn) TC.Trace(1, "Scheduling: {0}", cpt.WI);
incQWC();
cpt.Schedule();
}
}
} // scheduleQueuedPoolThreads()
// Public schedule thread(s) in the waiting queue
// This may be called for example at end of web request thread.
public void ScheduleQueuedPoolThreads()
{
lock(_runningThreads.SyncRoot)
{
scheduleQueuedPoolThreads();
}
} // ScheduleQueuedPoolThreads()
// Get available threadpool threads
// Determine the clr available threadpool threads
private int availableThreads()
{
int available_threads;
int cpta;
ThreadPool.GetAvailableThreads(out available_threads, out cpta);
if (TC.TraceOn) TC.Trace(1, "Available worker threads: {0}", _
available_threads);
return available_threads;
} // availableThreads()
// Timer threads
// Start a timer (millisecond units)
public TimerThread StartTimerThread(IWorkItem wi, int delayMS, int repeatMS)
{
if (TC.TraceOn) TC.Trace(1, "Starting timer thread: {0}", wi);
if ((_timerThreads[wi.ProcessId])!=null)
throw new OperationException("Invalid timer threading request for: {0}", _
wi);
lock(_runningThreads.SyncRoot)
{
TimerThread ctt = new TimerThread(wi, delayMS, repeatMS);
_timerThreads[wi.ProcessId] = ctt;
int tn = _queuedTimerThreadsCount++;
if(tn>_maxQueuedTimerThreadsCount)
_maxQueuedTimerThreadsCount = tn;
if (TC.TraceOn) TC.Trace(1, "Queued timer threads: {0}", _
_queuedTimerThreadsCount);
return ctt;
}
} // StartTimerThread()
// Stop a timer
internal void StopTimerThread(TimerThread ctt)
{
IWorkItem wi = ctt.WI;
if (wi==null) return;
if (TC.TraceOn) TC.Trace(1, "Stopping timer thread: {0}", wi);
try
{
ctt.KillTimer();
}
catch(System.Exception ex)
{EH.UE(ex);}
if(_timerThreads[wi.ProcessId]==null) return;
lock(_runningThreads.SyncRoot)
{
_timerThreads.Remove(wi.ProcessId);
_queuedTimerThreadsCount--;
if (TC.TraceOn) TC.Trace(1, "Queued timer threads: {0}", _
_queuedTimerThreadsCount);
}
} // StopTimerThread()
// Start of timer thread registration
internal void RegisterTimer(TimerThread ctt)
{
IWorkItem wi = ctt.WI;
if (wi==null) return;
lock(_runningThreads.SyncRoot)
{
_runningTimerThreadsCount++;
if (_runningTimerThreadsCount>_maxRunningTimerThreadsCount)
_maxRunningTimerThreadsCount++;
int tn = ++_runningThreadsCount;
if (tn>_maxRunningThreadsCount)
_maxRunningThreadsCount = _runningThreadsCount;
string id = wi.ProcessId+ctt.T.GetHashCode().ToString( _
CultureInfo.InvariantCulture);
_runningThreads[id] = ctt;
InitThread(wi);
if (TC.TraceOn) TC.Trace(1, "Timer thread starting: {0} {tsms}", wi);
}
} // RegisterTimer()
// End of timer thread deregistration
internal void DeregisterTimer(TimerThread ctt)
{
IWorkItem wi = ctt.WI;
if (wi==null) return;
lock(_runningThreads.SyncRoot)
{
_runningTimerThreadsCount--;
_runningThreadsCount--;
if (_closeDown&&(_runningThreadsCount==0)&&(_cdevent1!=null)) _cdevent1.Set( _
);
string id = wi.ProcessId+ctt.T.GetHashCode().ToString( _
CultureInfo.InvariantCulture);
_runningThreads.Remove(id);
if (TC.TraceOn) TC.Trace(1, "Timer thread ending: {0} {tsms}", wi);
TermThread(wi);
}
} // DeregisterTimer()
// Test! Routine to list active threads
public void ListActiveThreads()
{
if (!TC.TraceOn) return;
lock(_runningThreads.SyncRoot)
{
TC.Trace(1, "Active threads: {0}/{1}", _runningSystemThreadsCount, _
_runningThreads.Count);
foreach(DictionaryEntry de in _runningThreads)
{
BaseThread ct = (BaseThread)de.Value;
if (ct.T!=null&&ct.WI!=null)
TC.Trace(1, "Thread: {0} {1}",ct.T.GetHashCode(), ct.WI);
else
TC.Trace(1, "Thread: ???? work item ???? id ????");
}
}
} // ListActiveThreads()
// Test! Routine to list some thread data
public void ListThreadData(string at, BaseThread ct)
{
if (TC.TraceOn) TC.Trace(1, "At: {0} {1} data current: {2} ct.T: {3}",
at,
ct.WI!=null?ct.WI.ToString():"No Work Item",
Thread.CurrentThread.GetHashCode(),
ct.T!=null?ct.T.GetHashCode().ToString( _
CultureInfo.InvariantCulture):"no thread in ct.T");
}
// Statistics Routines
// Thread statistics properties
public long HeldWorkerThreadsCount
{
get
{
lock(_runningThreads.SyncRoot)
{
return _workItems.Count;
}
}
}
public long MaxHeldWorkerThreadsCount
{
get {return _maxHeldWorkerThreadsCount;}
}
public long TotalWorkItemsProcessed
{
get {return _workItemNum;}
}
public long QueuedWorkerThreadsCount
{
get {return _queuedWorkerThreadsCount;}
}
public long RunningWorkerThreadsCount
{
get {return _runningWorkerThreadsCount;}
}
public long MaxQueuedWorkerThreadsCount
{
get {return _maxQueuedWorkerThreadsCount;}
}
public long MaxRunningWorkerThreadsCount
{
get {return _maxRunningWorkerThreadsCount;}
}
public long RunningSystemThreadsCount
{
get {return _runningSystemThreadsCount;}
}
public long MaxRunningSystemThreadsCount
{
get {return _maxRunningSystemThreadsCount;}
}
public long RunningThreadsCount
{
get {return _runningThreadsCount;}
}
public long MaxRunningThreadsCount
{
get {return _maxRunningThreadsCount;}
}
public long QueuedTimerThreadsCount
{
get {return _queuedTimerThreadsCount;}
}
public long MaxQueuedTimerThreadsCount
{
get {return _maxQueuedTimerThreadsCount;}
}
public long RunningTimerThreadsCount
{
get {return _runningTimerThreadsCount;}
}
public long MaxRunningTimerThreadsCount
{
get {return _maxRunningTimerThreadsCount;}
}
// Interface IObjectStats
public void GetObjectStatistics(string prefix)
{
SObjectStats.ObjectStats.Add(prefix, _
"ThreadManager:Total/TimerThreadObjectsCount", _timerThreads.Count);
SObjectStats.ObjectStats.Add(prefix, _
"ThreadManager:Total/RunningThreadObjectsCount", _runningThreads.Count);
SObjectStats.ObjectStats.Add(prefix, _
"ThreadManager:Total/QueuedWorkItemObjectsCount", _workItems.Count);
}
} // class ThreadManager
} ________
Alle Angaben ohne Gewähr. Keine Haftung für Vorschläge, Tipps oder sonstige Hilfe, falls es schiefgeht, nur Zeit verschwendet oder man sonst nicht zufrieden ist |