マルチスレッドプログラミング

ThreadPool に仕事を投げ込むタイプのマルチスレッドプログラミングです.
とりあえず簡単なサンプルを書いてテストすることに.


やりたいことは,

  • Person は Task を TaskManager に Post する
  • TaskManager は Task を ThreadPool に投げる
  • Person は自分が頼んだ Task をキャンセルすることができる

ぐらいだろうか.


BackgroundWorker で良いんじゃないか?とも思いましたが,Pool にポイポイ投げ込む感じがほしかったので,今回は使いませんでした.

/// <summary>
/// 丸投げ
/// </summary>
public class Person
{
    private readonly string id;

    public Person(string id)
    {
        this.id = id;
    }

    public string Id
    {
        get { return this.id; }
    }

    // タスクを受ける.
    public void Accept(Task task)
    {
        TaskManager.Instance.Post(this, task);
    }

    // 指定された ID のタスクをキャンセルする.
    public void CancelTask(params String[] ids)
    {
        TaskManager.Instance.Cancel(this, ids);
    }
}

/// <summary>
/// タスク抽象
/// </summary>
public abstract class Task
{
    protected readonly string id;

    protected volatile Thread thread = null;

    protected volatile Exception exception = null;

    protected volatile bool cancel = false;

    protected Task(string id)
    {
        this.id = id;
    }

    public string Id
    {
        get { return this.id; }
    }

    public Exception ThrownException
    {
        set { this.exception = value; }
        get { return this.exception; }
    }

    // タスク実行
    public virtual void Execute()
    {
        this.thread = Thread.CurrentThread;
        if (this.cancel)
        {
            // 事前にキャンセルされた場合
            return;
        }

        // 実行
        Main();
    }

    // タスクのメイン処理
    protected abstract void Main();

    // タスクが完了したときに呼び出す.
    // Main が例外を throw するかもしれないので別メソッド
    public virtual void Finished()
    { }

    // タスクをキャンセルする.
    // すぐには止まらないけど...
    public virtual void Cancel()
    {
        this.cancel = true;

        if (this.thread == null)
        {
            return;
        }
        if (!this.thread.ManagedThreadId.Equals(
            Thread.CurrentThread.ManagedThreadId))
        {
            this.thread.Interrupt();
        }
    }
}

/// <summary>
/// 一応管理クラス
/// </summary>
public sealed class TaskManager
{
    private static readonly TaskManager self = new TaskManager();

    private readonly Dictionary<Object, List<Task>> taskMap;

    private volatile Boolean enable;

    private TaskManager()
    {
        this.taskMap = new Dictionary<Object, List<Task>>();
        this.enable = true;
    }

    public static TaskManager Instance
    {
        get { return self; }
    }

    // client の task として post する.
    public void Post(Object client, Task task)
    {
        if (task == null)
        {
            return;
        }
        AddTask(client, task);

        ThreadPool.QueueUserWorkItem(delegate(Object o)
        {
            try
            {
                task.Execute();
            }
            catch (Exception e)
            {
                task.ThrownException = e;
            }
            finally
            {
                lock (this.taskMap)
                {
                    this.taskMap[client].Remove(task);
                    if (this.taskMap[client].Count < 1)
                    {
                        this.taskMap.Remove(client);
                    }
                }
                task.Finished();
            }
        });
    }

    private void AddTask(Object client, Task task)
    {
        lock (this.taskMap)
        {
            if (!this.enable)
            {
                throw new InvalidOperationException();
            }
            if (!this.taskMap.ContainsKey(client))
            {
                this.taskMap.Add(client, new List<Task>());
            }
            // 同一 ID のタスクが投入されても,そのまま Add する
            this.taskMap[client].Add(task);
        }
    }

    // 指定された client が post したタスク(idで指定)をキャンセルする.
    public void Cancel(Object client, params String[] ids)
    {
        Task[] tasks = new Task[0];
        lock (this.taskMap)
        {
            if (!this.taskMap.ContainsKey(client))
            {
                return;
            }
            tasks = FindAll(client, ids);
        }
        CallCancel(tasks);
    }

    private Task[] FindAll(Object client, String[] ids)
    {
        Task[] tasks = new Task[0];
        lock (this.taskMap)
        {
            if (0 < ids.Length)
            {
                List<Task> targets = this.taskMap[client].FindAll(delegate(Task t)
                {
                    foreach (String id in ids)
                    {
                        if (t.Id.Equals(id))
                        {
                            return true;
                        }
                    }
                    return false;
                });
                tasks = targets.ToArray();
            }
            else
            {
                tasks = this.taskMap[client].ToArray();
            }
        }
        return tasks;
    }

    private void CallCancel(Task[] tasks)
    {
        foreach (Task task in tasks)
        {
            task.Cancel();
        }
    }

    // すべてのタスクを終了させる.
    public void CleanUp()
    {
        Task[] tasks = new Task[0];
        lock (this.taskMap)
        {
            this.enable = false;
            tasks = CollectAllTasks();
        }
        CallCancel(tasks);
    }

    private Task[] CollectAllTasks()
    {
        List<Task> tasks = new List<Task>();
        foreach (List<Task> list in this.taskMap.Values)
        {
            tasks.AddRange(list);
        }
        return tasks.ToArray();
    }
}

無駄かな?もっと良い方法があるのかな?


テストコード

/// <summary>
/// Main
/// </summary>
class Program
{
    static void Main(string[] args)
    {
        Person p1 = new Person("client-1");
        p1.Accept(new TaskA("A1"));
        p1.Accept(new TaskB("B1"));

        System.Console.ReadLine();

        p1.CancelTask("A1", "B1");
        System.Console.WriteLine("Cancel A1 and B1");

        p1.Accept(new TaskB("B2"));
        System.Console.WriteLine("Request B2");

        System.Console.ReadLine();

        p1.CancelTask();
        System.Console.WriteLine("Cancel B2");

        p1.Accept(new TaskB("B3"));
        p1.Accept(new TaskB("B4"));
        p1.Accept(new TaskB("B5"));
        System.Console.WriteLine("Request B2");

        System.Console.ReadLine();

        TaskManager.Instance.CleanUp();
        try
        {
            p1.Accept(new TaskB("B6"));
        }
        catch (InvalidOperationException)
        {
            System.Console.WriteLine("TaskManager.enable == false.");
        }
        System.Console.WriteLine("Clean up");

        System.Console.ReadLine();
    }
}

/// <summary>
/// 5秒タスク
/// </summary>
public class TaskA : Task
{
    public TaskA(string id)
        : base(id)
    { }

    protected override void Main()
    {
        if (this.cancel)
        {
            System.Console.WriteLine(
                "TaskA(id=" + this.id + "): cancel");
            return;
        }
        System.Console.WriteLine("TaskA(id=" + this.id + ")");
        try
        {
            Thread.Sleep(5000);
        }
        catch (ThreadInterruptedException)
        {
            System.Console.WriteLine(
                "TaskA(id=" + this.id + "): throw ThreadInterruptedException");
        }
        System.Console.WriteLine("TaskA(id=" + this.id + "): end thread");
    }
}

/// <summary>
/// 無限タスク
/// </summary>
public class TaskB : Task
{
    public TaskB(string id)
        : base(id)
    { }

    protected override void Main()
    {
        while (true)
        {
            if (this.cancel)
            {
                System.Console.WriteLine(
                    "TaskB(id=" + this.id + "): cancel");
                break;
            }
            System.Console.WriteLine("TaskB(id=" + this.id + ")");
            try
            {
                Thread.Sleep(1000);
            }
            catch (ThreadInterruptedException)
            {
                System.Console.WriteLine(
                    "TaskB(id=" + this.id + "): throw ThreadInterruptedException");
                break;
            }
        }
        System.Console.WriteLine("TaskB(id=" + this.id + "): end thread");
    }
}