Thomas Darimont
Erfahrenes Mitglied
Hallo,
hier mal ein Beispiel für ein kleines Task-Executions System zur sequentiellen und parallelen Ausführung von Tasks. Dabei kann man den Code der innerhalb eines Tasks ausgeführt wird über zwei Arten bereit Stellen. Zum einen kann man ein delegate (siehe Runnable) definieren und dieses an einen generischen Task übergeben oder von Task ableiten und die Execute Methode überschreiben.
Viel Spaß ;-)
Das Beispiel:
Das ITask interface:
Hier mal ein selbst definierter Task:
Unser Runnable delegate:
Unser generisches ITask interface
Unsere Task Implementierung:
Unser TaskExecutor interface:
Unsere TaskExecutor Implementierung:
Unser ITaskExecutionStrategyinterface:
Unsere DefaultTaskExecutionStrategy Implementierung:
(Führt den angegeben Task synchron im Thread des Aufrufers aus)
Unsere ThreadPoolTaskExecutionStrategy:
(Führt den Task asynchron in einem neuen Thread aus)
Unser IFuture interface:
Damit können wir auf das beenden eines Tasks warten)
Unsere Future Implementierung:
Beispiellauf:
Gruß Tom
hier mal ein Beispiel für ein kleines Task-Executions System zur sequentiellen und parallelen Ausführung von Tasks. Dabei kann man den Code der innerhalb eines Tasks ausgeführt wird über zwei Arten bereit Stellen. Zum einen kann man ein delegate (siehe Runnable) definieren und dieses an einen generischen Task übergeben oder von Task ableiten und die Execute Methode überschreiben.
Viel Spaß ;-)
Das Beispiel:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class TastExecutorExample
{
public static void Main(string[] args)
{
ITaskExecutor taskExecutor = new TaskExecutor();
Stopwatch stopWatch = new Stopwatch();
int numberOfTasks = 5;
Console.WriteLine("Sequential Task Execution");
stopWatch.Start();
for (int i = 0; i < numberOfTasks; i++)
{
//Example for custom Tasks
ITask<int, int> task = new CustomTask();
task.Argument = i;
taskExecutor.Execute(task);
Console.WriteLine(task + " result -> " + task.Result);
}
stopWatch.Stop();
Console.WriteLine("Execution took: " + stopWatch.Elapsed.Seconds + " seconds");
Console.WriteLine("###############################");
Console.WriteLine("Parallel Task Execution:");
taskExecutor.TaskExecutionStrategy = new ThreadPoolTaskExecutionStrategy();
Runnable<ITask<int, int>> runnable = delegate(ITask<int, int> task)
{
for (int i = 0; i < 5; i++)
{
Console.WriteLine(task + ": " + i);
Thread.Sleep(1000);
}
task.Result = task.Argument * task.Argument;
};
stopWatch.Reset();
stopWatch.Start();
IList<IFuture<ITask<int, int>>> futures = new List<IFuture<ITask<int, int>>>();
for (int i = 0; i < numberOfTasks ; i++)
{
//Example for generic Tasks
ITask<int, int> task = new Task<int, int>(runnable);
task.Argument = i;
futures.Add(taskExecutor.Schedule(task));
}
foreach (IFuture<ITask<int, int>> future in futures)
{
ITask<int, int> task = future.Get();
Console.WriteLine(task + " result -> " + task.Result);
}
stopWatch.Stop();
Console.WriteLine("Execution took: " + stopWatch.Elapsed.Seconds + " seconds");
Console.WriteLine("##################");
Console.WriteLine("Get Single result with no timeout");
ITask<int, int> singleTask = new Task<int, int>(runnable);
singleTask.Argument = 11;
Console.WriteLine(taskExecutor.Schedule(singleTask).Get() + ": " + singleTask.Result);
Console.WriteLine("##################");
Console.WriteLine("Get Single result with timeout 3s");
singleTask = new Task<int, int>(runnable);
singleTask.Argument = 11;
try
{
Console.WriteLine(taskExecutor.Schedule(singleTask).Get(3000) + ": " + singleTask.Result);
Console.WriteLine("Should never be reached");
}
catch (TimeoutException timeoutException)
{
Console.WriteLine(timeoutException.Message);
}
finally
{
Console.WriteLine("###");
}
}
}
}
Das ITask interface:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public interface ITask
{
Guid Id
{
get;
}
bool IsScheduled
{
get;
set;
}
void Schedule();
}
}
Hier mal ein selbst definierter Task:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class CustomTask : Task<int, int>
{
protected override void Execute()
{
for (int i = 0; i < 5; i++)
{
Console.WriteLine(this + ": " + i);
Thread.Sleep(1000);
}
this.Result = this.Argument * this.Argument;
}
}
}
Unser Runnable delegate:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public delegate void Runnable<TTask>(TTask task) where TTask : ITask;
}
Unser generisches ITask interface
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public interface ITask<TArgument, TResult> : ITask
{
TArgument Argument
{
get;
set;
}
TResult Result
{
get;
set;
}
Runnable<ITask<TArgument, TResult>> Run
{
get;
set;
}
}
}
Unsere Task Implementierung:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class Task<TArgument, TResult> : ITask<TArgument, TResult>
{
#region ITask Member
public Task()
{
this.id = Guid.NewGuid();
this.run = delegate(ITask<TArgument, TResult> task)
{
Execute();
};
}
public Task(Runnable<ITask<TArgument, TResult>> run)
: this()
{
this.run = run;
}
public bool IsScheduled
{
get { return isScheduled; }
set { isScheduled = value; }
}private bool isScheduled;
public Guid Id
{
get { return id; }
}private Guid id;
public TArgument Argument
{
get { return argument; }
set { argument = value; }
} private TArgument argument;
protected virtual void Execute()
{
//noop
}
public void Schedule()
{
this.Run(this);
}
public TResult Result
{
get { return result; }
set { result = value; }
}
private TResult result;
public Runnable<ITask<TArgument, TResult>> Run
{
get { return run; }
set { run = value; }
}private Runnable<ITask<TArgument, TResult>> run;
public override string ToString()
{
return string.Format("Task ID:{0}", id);
}
public override bool Equals(object obj)
{
if (null == obj)
{
return false;
}
if (this == obj)
{
return true;
}
if (!GetType().IsAssignableFrom(obj.GetType()))
{
return false;
}
return id.Equals(((ITask)obj).Id);
}
public override int GetHashCode()
{
return this.id.GetHashCode();
}
#endregion
}
}
Unser TaskExecutor interface:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public interface ITaskExecutor
{
ITaskExecutionStrategy TaskExecutionStrategy
{
get;
set;
}
void Execute(ITask task);
IFuture<TTask> Schedule<TTask>(TTask task) where TTask : ITask;
}
}
Unsere TaskExecutor Implementierung:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class TaskExecutor : ITaskExecutor
{
public ITaskExecutionStrategy TaskExecutionStrategy
{
get { return taskExecutionStrategy; }
set { taskExecutionStrategy = value; }
} private ITaskExecutionStrategy taskExecutionStrategy = new DefaultTaskExecutionStrategy();
public virtual void Execute(ITask task)
{
TaskExecutionStrategy.Execute(task);
}
public virtual IFuture<TTask> Schedule<TTask>(TTask task) where TTask : ITask
{
return TaskExecutionStrategy.Schedule(task);
}
}
}
Unser ITaskExecutionStrategyinterface:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public interface ITaskExecutionStrategy
{
void Execute(ITask task);
IFuture<TTask> Schedule<TTask>(TTask task) where TTask : ITask;
}
}
Unsere DefaultTaskExecutionStrategy Implementierung:
(Führt den angegeben Task synchron im Thread des Aufrufers aus)
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class DefaultTaskExecutionStrategy : ITaskExecutionStrategy
{
#region ITaskExecutionStrategy Member
public virtual void Execute(ITask task)
{
if (null != task)
{
task.Schedule();
}
}
public virtual IFuture<TTask> Schedule<TTask>(TTask task) where TTask : ITask
{
throw new NotImplementedException("Scheduling not implemented for single threaded task execution");
}
#endregion
}
}
Unsere ThreadPoolTaskExecutionStrategy:
(Führt den Task asynchron in einem neuen Thread aus)
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class ThreadPoolTaskExecutionStrategy : DefaultTaskExecutionStrategy
{
internal IDictionary<Guid, object> currentTasksSet = new Dictionary<Guid, object>();
internal IDictionary<Guid, Thread> taskIdToExecutingThreadMap = new Dictionary<Guid, Thread>();
internal object tasksLock = new object();
public override void Execute(ITask task)
{
ThreadPool.QueueUserWorkItem(delegate(object target)
{
lock (tasksLock)
{
if (currentTasksSet.ContainsKey(task.Id))
{
throw new Exception("Task: " + task + " has already scheduled...");
}
else
{
currentTasksSet.Add(task.Id, null);
task.IsScheduled = true;
}
taskIdToExecutingThreadMap.Add(task.Id, Thread.CurrentThread);
}
base.Execute(task);
lock (tasksLock)
{
if (taskIdToExecutingThreadMap.ContainsKey(task.Id))
{
taskIdToExecutingThreadMap.Remove(task.Id);
}
if (currentTasksSet.ContainsKey(task.Id))
{
currentTasksSet.Remove(task.Id);
}
}
});
}
public override IFuture<TTask> Schedule<TTask>(TTask task)
{
this.Execute(task);
return new Future<TTask>(task, this);
}
}
}
Unser IFuture interface:
Damit können wir auf das beenden eines Tasks warten)
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public interface IFuture<TTask>
{
TTask Get();
TTask Get(long timeOutInMilliSeconds);
}
}
Unsere Future Implementierung:
C#:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace De.Tutorials.Training
{
public class Future<TTask> : IFuture<TTask> where TTask : ITask
{
private TTask task;
private ThreadPoolTaskExecutionStrategy threadPoolTaskExecutionStrategy;
public Future(TTask task, ThreadPoolTaskExecutionStrategy threadPoolTaskExecutionStrategy)
{
this.task = task;
this.threadPoolTaskExecutionStrategy = threadPoolTaskExecutionStrategy;
}
public TTask Get()
{
return this.Get(0);
}
public TTask Get(long timeOutInMilliSeconds)
{
bool resultAvailable = false;
Stopwatch stopWatch = new Stopwatch();
if (timeOutInMilliSeconds > 0)
{
stopWatch.Start();
}
bool gotTimeout = false;
while (!resultAvailable)
{
lock (threadPoolTaskExecutionStrategy.tasksLock)
{
resultAvailable = !threadPoolTaskExecutionStrategy.currentTasksSet.ContainsKey(task.Id) && task.IsScheduled;
if (stopWatch.ElapsedMilliseconds > timeOutInMilliSeconds)
{
if (threadPoolTaskExecutionStrategy.taskIdToExecutingThreadMap.ContainsKey(task.Id))
{
Thread executingThread = threadPoolTaskExecutionStrategy.taskIdToExecutingThreadMap[task.Id];
threadPoolTaskExecutionStrategy.taskIdToExecutingThreadMap.Remove(task.Id);
executingThread.Abort();
}
gotTimeout = true;
break;
}
}
Thread.Sleep(200);
}
this.threadPoolTaskExecutionStrategy = null;
TTask result = this.task;
this.task = default(TTask);
if (gotTimeout)
{
throw new TimeoutException(string.Format("Task Id:{0} did not complete before timeout", result.Id));
}
return result;
}
}
}
Beispiellauf:
Code:
Sequential Task Execution
Task ID:a0642399-486a-414e-8c28-3b303f314680: 0
Task ID:a0642399-486a-414e-8c28-3b303f314680: 1
Task ID:a0642399-486a-414e-8c28-3b303f314680: 2
Task ID:a0642399-486a-414e-8c28-3b303f314680: 3
Task ID:a0642399-486a-414e-8c28-3b303f314680: 4
Task ID:a0642399-486a-414e-8c28-3b303f314680: result -> 0
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: 0
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: 1
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: 2
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: 3
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: 4
Task ID:fb9e2730-d146-4d1b-add1-e824e55397bc: result -> 1
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: 0
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: 1
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: 2
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: 3
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: 4
Task ID:129f1138-bed9-445f-8e91-f1ec408bd5e1: result -> 4
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: 0
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: 1
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: 2
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: 3
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: 4
Task ID:730fbea2-bcb9-45ab-b69f-bf3c938819af: result -> 9
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: 0
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: 1
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: 2
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: 3
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: 4
Task ID:e4765815-956a-4ab6-b12b-102043c4568f: result -> 16
Execution took: 25 seconds
###############################
Parallel Task Execution:
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda: 0
Task ID:e9c4969c-4d57-4820-8f77-130032de787a: 0
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda: 1
Task ID:e9c4969c-4d57-4820-8f77-130032de787a: 1
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1: 0
Task ID:268d30f1-44b9-461b-b723-290ca5733c80: 0
Task ID:e9c4969c-4d57-4820-8f77-130032de787a: 2
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1: 1
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda: 2
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932: 0
Task ID:268d30f1-44b9-461b-b723-290ca5733c80: 1
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932: 1
Task ID:e9c4969c-4d57-4820-8f77-130032de787a: 3
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1: 2
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda: 3
Task ID:268d30f1-44b9-461b-b723-290ca5733c80: 2
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda: 4
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932: 2
Task ID:e9c4969c-4d57-4820-8f77-130032de787a: 4
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1: 3
Task ID:268d30f1-44b9-461b-b723-290ca5733c80: 3
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932: 3
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1: 4
Task ID:3d437e72-5492-4c77-b348-96b9e6878dda result -> 0
Task ID:e9c4969c-4d57-4820-8f77-130032de787a result -> 1
Task ID:268d30f1-44b9-461b-b723-290ca5733c80: 4
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932: 4
Task ID:1e1abf9d-8f01-4da0-8b5f-b36c3b05bfd1 result -> 4
Task ID:268d30f1-44b9-461b-b723-290ca5733c80 result -> 9
Task ID:f742cabe-89e1-407f-adad-46e70d8a7932 result -> 16
Execution took: 7 seconds
##################
Get Single result with no timeout
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: 0
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: 1
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: 2
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: 3
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: 4
Task ID:68314aa9-90ca-4b5d-a932-86586ca32260: result ->121
##################
Get Single result with timeout 3s
Task ID:6bddecbf-f8be-4460-93c1-af2907b17639: 0
Task ID:6bddecbf-f8be-4460-93c1-af2907b17639: 1
Task ID:6bddecbf-f8be-4460-93c1-af2907b17639: 2
Task ID:6bddecbf-f8be-4460-93c1-af2907b17639: 3
Task Id:6bddecbf-f8be-4460-93c1-af2907b17639 did not complete before timeout
###
Drücken Sie eine beliebige Taste . . .
Gruß Tom