マルチスレッドプログラミング
ThreadPool に仕事を投げ込むタイプのマルチスレッドプログラミングです.
とりあえず簡単なサンプルを書いてテストすることに.
やりたいことは,
- Person は Task を TaskManager に Post する
- TaskManager は Task を ThreadPool に投げる
- Person は自分が頼んだ Task をキャンセルすることができる
ぐらいだろうか.
BackgroundWorker で良いんじゃないか?とも思いましたが,Pool にポイポイ投げ込む感じがほしかったので,今回は使いませんでした.
/// <summary> /// 丸投げ /// </summary> public class Person { private readonly string id; public Person(string id) { this.id = id; } public string Id { get { return this.id; } } // タスクを受ける. public void Accept(Task task) { TaskManager.Instance.Post(this, task); } // 指定された ID のタスクをキャンセルする. public void CancelTask(params String[] ids) { TaskManager.Instance.Cancel(this, ids); } } /// <summary> /// タスク抽象 /// </summary> public abstract class Task { protected readonly string id; protected volatile Thread thread = null; protected volatile Exception exception = null; protected volatile bool cancel = false; protected Task(string id) { this.id = id; } public string Id { get { return this.id; } } public Exception ThrownException { set { this.exception = value; } get { return this.exception; } } // タスク実行 public virtual void Execute() { this.thread = Thread.CurrentThread; if (this.cancel) { // 事前にキャンセルされた場合 return; } // 実行 Main(); } // タスクのメイン処理 protected abstract void Main(); // タスクが完了したときに呼び出す. // Main が例外を throw するかもしれないので別メソッド public virtual void Finished() { } // タスクをキャンセルする. // すぐには止まらないけど... public virtual void Cancel() { this.cancel = true; if (this.thread == null) { return; } if (!this.thread.ManagedThreadId.Equals( Thread.CurrentThread.ManagedThreadId)) { this.thread.Interrupt(); } } } /// <summary> /// 一応管理クラス /// </summary> public sealed class TaskManager { private static readonly TaskManager self = new TaskManager(); private readonly Dictionary<Object, List<Task>> taskMap; private volatile Boolean enable; private TaskManager() { this.taskMap = new Dictionary<Object, List<Task>>(); this.enable = true; } public static TaskManager Instance { get { return self; } } // client の task として post する. public void Post(Object client, Task task) { if (task == null) { return; } AddTask(client, task); ThreadPool.QueueUserWorkItem(delegate(Object o) { try { task.Execute(); } catch (Exception e) { task.ThrownException = e; } finally { lock (this.taskMap) { this.taskMap[client].Remove(task); if (this.taskMap[client].Count < 1) { this.taskMap.Remove(client); } } task.Finished(); } }); } private void AddTask(Object client, Task task) { lock (this.taskMap) { if (!this.enable) { throw new InvalidOperationException(); } if (!this.taskMap.ContainsKey(client)) { this.taskMap.Add(client, new List<Task>()); } // 同一 ID のタスクが投入されても,そのまま Add する this.taskMap[client].Add(task); } } // 指定された client が post したタスク(idで指定)をキャンセルする. public void Cancel(Object client, params String[] ids) { Task[] tasks = new Task[0]; lock (this.taskMap) { if (!this.taskMap.ContainsKey(client)) { return; } tasks = FindAll(client, ids); } CallCancel(tasks); } private Task[] FindAll(Object client, String[] ids) { Task[] tasks = new Task[0]; lock (this.taskMap) { if (0 < ids.Length) { List<Task> targets = this.taskMap[client].FindAll(delegate(Task t) { foreach (String id in ids) { if (t.Id.Equals(id)) { return true; } } return false; }); tasks = targets.ToArray(); } else { tasks = this.taskMap[client].ToArray(); } } return tasks; } private void CallCancel(Task[] tasks) { foreach (Task task in tasks) { task.Cancel(); } } // すべてのタスクを終了させる. public void CleanUp() { Task[] tasks = new Task[0]; lock (this.taskMap) { this.enable = false; tasks = CollectAllTasks(); } CallCancel(tasks); } private Task[] CollectAllTasks() { List<Task> tasks = new List<Task>(); foreach (List<Task> list in this.taskMap.Values) { tasks.AddRange(list); } return tasks.ToArray(); } }
無駄かな?もっと良い方法があるのかな?
テストコード
/// <summary> /// Main /// </summary> class Program { static void Main(string[] args) { Person p1 = new Person("client-1"); p1.Accept(new TaskA("A1")); p1.Accept(new TaskB("B1")); System.Console.ReadLine(); p1.CancelTask("A1", "B1"); System.Console.WriteLine("Cancel A1 and B1"); p1.Accept(new TaskB("B2")); System.Console.WriteLine("Request B2"); System.Console.ReadLine(); p1.CancelTask(); System.Console.WriteLine("Cancel B2"); p1.Accept(new TaskB("B3")); p1.Accept(new TaskB("B4")); p1.Accept(new TaskB("B5")); System.Console.WriteLine("Request B2"); System.Console.ReadLine(); TaskManager.Instance.CleanUp(); try { p1.Accept(new TaskB("B6")); } catch (InvalidOperationException) { System.Console.WriteLine("TaskManager.enable == false."); } System.Console.WriteLine("Clean up"); System.Console.ReadLine(); } } /// <summary> /// 5秒タスク /// </summary> public class TaskA : Task { public TaskA(string id) : base(id) { } protected override void Main() { if (this.cancel) { System.Console.WriteLine( "TaskA(id=" + this.id + "): cancel"); return; } System.Console.WriteLine("TaskA(id=" + this.id + ")"); try { Thread.Sleep(5000); } catch (ThreadInterruptedException) { System.Console.WriteLine( "TaskA(id=" + this.id + "): throw ThreadInterruptedException"); } System.Console.WriteLine("TaskA(id=" + this.id + "): end thread"); } } /// <summary> /// 無限タスク /// </summary> public class TaskB : Task { public TaskB(string id) : base(id) { } protected override void Main() { while (true) { if (this.cancel) { System.Console.WriteLine( "TaskB(id=" + this.id + "): cancel"); break; } System.Console.WriteLine("TaskB(id=" + this.id + ")"); try { Thread.Sleep(1000); } catch (ThreadInterruptedException) { System.Console.WriteLine( "TaskB(id=" + this.id + "): throw ThreadInterruptedException"); break; } } System.Console.WriteLine("TaskB(id=" + this.id + "): end thread"); } }