英語版
このページの英語版を見る

std.parallelism

std.parallelismSMP並列処理のための高レベルな基本要素を実装する。 これには、並列foreach、並列reduce、並列eager map、パイプライン処理、 およびfuture/promise並列処理が含まれる。 std.parallelismは、 異なるデータに対して同じ操作を並列に実行する場合や、 関数をバックグラウンドスレッドで実行し、その結果を メインスレッドに返す場合などに推奨される。任意のスレッド間の通信については、std.concurrency を参照のこと。
std.parallelismTask の概念に基づいている。Task は このライブラリにおける基本的な作業単位を表すオブジェクトであり、 他のTask と並列に実行される可能性がある。Taskを直接使用することで、 future/promiseパラダイムによるプログラミングが可能になる。 サポートされているその他の並列処理パラダイム(parallel foreach、map、reduce、pipelining)は、Task に対する追加の抽象化レベルを表す。 自動的に1つ以上のTask オブジェクト、または密接に関連する型を 作成します。これらは概念的には同一ですが、パブリックAPIの一部ではありません。
作成後、Task は新しいスレッドで実行されるか、TaskPool に送信されて実行される。TaskPool はタスクキューと そのワーカー・スレッドをカプセル化する。 その目的は、多数のTaskを少数のスレッドに効率的にマッピングすることである。 タスクキューはTask オブジェクトの FIFO キューであり、TaskPool に送信されて実行待ちの状態にある。 ワーカー・スレッドは 1つのタスクキューに関連付けられている。 キューに処理可能な作業がある場合、Task をキューの先頭で実行し、 作業がない場合は待機する。 各タスクキューは、0個以上のワーカー・スレッドに関連付けられている。 ワーカー・スレッドによる実行が開始される前にTask の結果が必要な場合は、Task をタスク・キューから削除して、 結果が必要なスレッドで即座に実行することができる。

警告 @trusted または@safe としてマークされていない場合、このモジュール内のアーティファクトは スレッド間の暗黙的なデータ共有を許可し、 クライアントコードが低レベルのデータ競合から解放されていることを保証できない。

著者 David Simcha

struct Task(alias fun, Args...);
Task作業の基本単位を表す。 Task他のものと並行して実行される可能性がある。 Task。 この構造体を直接使用することで 将来的な並列処理が可能になる。 このパラダイムでは、関数(またはデリゲート、 またはその他の呼び出し可能関数)は、呼び出し元のスレッドとは別のスレッドで実行される。 関数が実行されている間、呼び出し元のスレッドはブロックされない。workForceyieldForce 、またはspinForce への呼び出しは、 Task実行が完了したことを確認し、戻り値がある場合はそれを取得する。 これらの関数とdone は完全なメモリバリアとしても機能し、 つまり、実行スレッドでメモリへの書き込みが行われた場合、 Task これらの関数のいずれかが返却された後、呼び出し元のスレッドでその書き込みが確実に可視化されることが保証される。
std.parallelism.taskおよび std.parallelism.scopedTask関数を使用して、 この構造体のインスタンスを作成することができる。使用例については、task を参照のこと。
関数の結果は、yieldForcespinForceworkForce から参照渡しされる。fun が参照渡しされる場合、その参照はfun の返された参照を指す。そうでない場合は、 この構造体のフィールドを指す。
この構造体のコピーは無効である。なぜなら、有用なセマンティクスを提供しないからだ。 この構造体を渡したい場合は、参照またはポインタで渡す必要がある。 "Struct"構造体
Bugs:
refout の引数に対する変更は、 コールサイトには伝播されず、この構造体のargs のみに伝播される。
alias args = _args[1 .. __dollar];
関数が呼び出されたときの引数。outref の引数に変更が加えられると、 ここにも反映される。
alias ReturnType = typeof(fun(_args));
このTask によって呼び出された関数の戻り値の型。これはvoid である可能性もある。
@property ref @trusted ReturnType spinForce();
Task がまだ開始されていない場合は、現在のスレッドで実行する。 すでに実行されている場合は、その戻り値がある場合はそれを返す。実行中である場合は、 それが完了するまでビジースピンし、完了後に戻り値を返す。例外が発生した場合は、 その例外を再スローする。
この関数は、Task の結果が OS のコンテキストスイッチよりも短い時間で利用可能になると予想される場合に 使用すべきである。
@property ref @trusted ReturnType yieldForce();
Task がまだ開始されていない場合は、現在のスレッドで実行する。 終了している場合は、もしあればその戻り値を返す。実行中の場合は、 条件変数で待機する。例外が発生した場合は、その例外を再スローする
この関数は、条件変数で待機すると遅延が発生するものの、CPUサイクルの無駄を回避できるため、コストのかかる関数に対して使用すべきである。
@property ref @trusted ReturnType workForce();
このTask がまだ開始されていない場合は、現在のスレッドで実行する。 終了している場合は、その結果を返す。実行中の場合は、TaskTaskPool インスタンスから実行する。Task が送信されたインスタンスから、 この が終了するまで。 例外が発生した場合は、その例外を再スローする。 他のタスクが利用できない場合、またはTaskexecuteInNewThread を使用して実行された場合は、 条件変数で待機する。
@property @trusted bool done();
Task の実行が完了した場合は、true を返す。
Throws:
Task の実行中にスローされた例外を再スローする。
@trusted void executeInNewThread();

@trusted void executeInNewThread(int priority);
このTask を実行するための新しいスレッドを作成し、 新しく作成したスレッドで実行し、その後スレッドを終了する。これは、 将来の/約束された並列処理に使用できる。Task に明示的な優先度を指定できる。 指定された場合、その値は core.thread.Thread.prioritystd.parallelism.task使用例については こちらを参照のこと。
auto task(alias fun, Args...)(Args args);
GCヒープ上にエイリアスを呼び出すTask を作成する。これは、Task.executeInNewThread 経由で、またはstd.parallelism.TaskPool。グローバルにアクセス可能なTaskPool のインスタンスは、 によって提供される std.parallelism.taskPool
Returns: xml-ph-0000@deepl.internal へのポインタ。
Task へのポインタ。

// 同時に2つのファイルをメモリに読み込む。
import std.file;

void main()
{
    // foo.txtを読み込むタスクを作成し、
    // 実行する。
    auto file1Task = task!read("foo.txt");
    file1Task.executeInNewThread();

    // 並行してbar.txtを読む。
    auto file2Data = read("bar.txt");

    // foo.txtの読み込み結果を取得する。
    auto file1Data = file1Task.yieldForce;
}
// 並列クイックソートアルゴリズムを使って配列をソートする。
// 最初のパーティションはシリアルに行われる。その後、
// 両方の再帰分岐が並列に実行される。
//
// Athlon 64 X2デュアルコアマシンで1,000,000ダブルスの
// 配列をソートする際のタイミング:
//
// この実装:               176 ミリ秒。
// 同等のシリアル実装:      280 ミリ秒
void parallelSort(T)(T[] data)
{
    // 小さなサブアレイをシリアルにソートする。
    if (data.length < 100)
    {
         std.algorithm.sort(data);
         return;
    }

    // 配列を分割する。
    swap(data[$ / 2], data[$ - 1]);
    auto pivot = data[$ - 1];
    bool lessThanPivot(T elem) { return elem < pivot; }

    auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
    swap(data[$ - greaterEqual.length - 1], data[$ - 1]);

    auto less = data[0..$ - greaterEqual.length - 1];
    greaterEqual = data[$ - greaterEqual.length..$];

    // 両方の再帰分岐を並列に実行する。
    auto recurseTask = task!parallelSort(greaterEqual);
    taskPool.put(recurseTask);
    parallelSort(less);
    recurseTask.yieldForce;
}

auto task(F, Args...)(F delegateOrFp, Args args)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);
GCヒープ上にTask を作成し、関数ポインタ、デリゲート、または オーバーロードされたopCallを持つクラス/構造体を呼び出す。

// 再び2つのファイルを同時に読み込むが、
// 今回はstd.file.read.を表すエイリアスではなく
// 関数ポインタを使用する。
import std.file;

void main()
{
    // foo.txtを読み込むタスクを作成し、
    // 実行する。
    auto file1Task = task(&read!string, "foo.txt", size_t.max);
    file1Task.executeInNewThread();

    // 並行してbar.txtを読む。
    auto file2Data = read("bar.txt");

    // foo.txtの読み込み結果を取得する。
    auto file1Data = file1Task.yieldForce;
}

注釈 この関数は非スコープデリゲートを受け取る。つまり、 クロージャと併用できる。スタック上のオブジェクトがスコープ付きで破棄されるためにクロージャを割り当てられない場合は、scopedTask を参照すること。これはスコープ付きデリゲートを受け取る。

@trusted auto task(F, Args...)(F fun, Args args)
if (is(typeof(fun(args))) && isSafeTask!F);
xml-ph-0000@deepl.internal のコードで使用できるバージョン。 task@safe のコードから使用できる。使用方法は @safeでない場合と同一だが、安全性を確保するためにいくつかの制限が設けられている。
  1. fun@safeまたは@trustedでなければならない。
  2. F で定義されている非共有エイリアスを一切持たないこと std.traits.hasUnsharedAliasing。つまり、 共有されていないデリゲートや、オーバーロードされたopCall を持つ共有されていないクラスまたは構造体であってはならない。 また、テンプレートエイリアスパラメータの受け入れもできない。
  3. Args 共有されていないエイリアスを使用してはならない。
  4. fun参照を返してはならない。
  5. 戻り値の型は、共有されていないエイリアシングを持たないようにしなければならない。ただし、 funpure であるか、またはTaskexecuteInNewThread 経由で実行される場合を除き、TaskPool を使用しない。
auto scopedTask(alias fun, Args...)(Args args);

auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);

@trusted auto scopedTask(F, Args...)(F fun, Args args)
if (is(typeof(fun(args))) && isSafeTask!F);
これらの関数により、Task オブジェクトをGCヒープではなくスタック上に作成することが可能となる。Task の有効期間は、 scopedTask そのスコープの寿命を超えることはできません。
scopedTasktask よりも好まれる場合がある。
  1. デリゲートを呼び出すTask が作成される際に、スタック上のオブジェクトがスコープ外の破壊状態にあるためクロージャを割り当てることができない場合、 デリゲートのオーバーロードは scopedTaskscope デリゲートを受け取る。
  2. マイクロ最適化として、task またはクロージャの作成に関連するヒープの割り当てを回避する。
それ以外はtask と全く同じである。

注釈Task オブジェクトは、 を使用して作成される scopedTask自動的に 必要に応じてデストラクタ内でTask.yieldForce を呼び出し、Task が完了したことを確認してから、そのオブジェクトが存在するスタックフレームが破棄されます。

alias totalCPUs = __lazilyInitializedConstant!(immutable(uint), 4294967295u, totalCPUsImpl).__lazilyInitializedConstant;
オペレーティングシステムが報告する、現在のマシンで利用可能なCPUコアの総数。
class TaskPool;
このクラスは、タスクキューとワーカー・スレッドのセットをカプセル化する。その目的は、 多数のTaskを少数のスレッドに効率的にマッピングすることである。 タスクキューは、Task オブジェクトのFIFOキューであり、 TaskPool実行待ちの状態にある。 ワーカー・スレッドは、 キューの先頭にあるTask を実行するスレッドであり、 実行可能な状態になると起動され、キューが空になると停止する。
通常、このクラスはグローバルなインスタンス化を通じて使用されるべきである。 std.parallelism.taskPool。 時には明示的にインスタンス化することが有用な場合もある TaskPool
  1. TaskPool複数の優先度を持つインスタンスが必要な場合、例えば 優先度の低いプールと優先度の高いプールなど。
  2. グローバルタスクプールのスレッドが同期プリミティブ(例えば、mutex)で待機しており、 これらのスレッドが再開される前に実行する必要があるコードを並列化したい場合。
注 このプール内のワーカー スレッドは、 xml-ph-0000@deepl.internal または xml-ph-0001@deepl.internal が呼び出されるまで停止しません。メイン スレッドが

注釈 このプール内のワーカー スレッドは、 stop またはfinish が呼び出されるまで停止しない。たとえメイン スレッドが すでに終了していても同様である。これにより、プログラムが 終了しなくなる可能性がある。この動作を望まない場合は、isDaemonを true に設定できる。

@trusted this();
TaskPooltotalCPUs - 1 ワーカー・スレッドで初期化するデフォルトのコンストラクタ。マイナス 1 が含まれているのは、 メイン・スレッドも作業に使用されるためである。

注釈 シングルコアのマシンでは、TaskPoolが提供するプリミティブは シングルスレッドモードで透過的に動作する。

@trusted this(size_t nWorkers);
ワーカー スレッドの数をカスタマイズできる。
ParallelForeach!R parallel(R)(R range, size_t workUnitSize);

ParallelForeach!R parallel(R)(R range);
範囲にわたる並列 foreach ループを実装する。これは、 各ワーカー スレッドごとに 1 つのTaskTaskPool に暗黙的に作成および送信することで動作する。 作業単位は、 range他のスレッドとの通信の間にワーカー・スレッドによって処理される。 ワーク・ユニットごとに処理される要素の数は、パラメータによって制御される workUnitSize。より小さいワークユニットは負荷分散に優れているが、 より大きいワークユニットは、次のワークユニットを取得するために他のスレッドと頻繁に通信するオーバーヘッドを回避できる。 また、大きなワークユニットは、範囲が変更される場合に誤った共有を回避できる。 ループの1回の反復にかかる時間が短いほど、 workUnitSize。ループ本体が非常に高価な場合は、 workUnitSize1にすべきである。デフォルトの作業単位サイズを選択するオーバーロードも 利用可能である。

// 1から10_000_000までのすべての数の対数を
// 並列に求める。
auto logs = new double[10_000_000];

// 並列foreachはインデックス変数があってもなくても
// 動作する。もしrange.frontがrefで返すなら、
// refで反復できる。

// サイズ100のワークユニットを使ってログを反復処理する。
foreach (i, ref elem; taskPool.parallel(logs, 100))
{
    elem = log(i + 1.0);
}

// 同じことだが、デフォルトのワークユニットサイズを使用する。
//
//  Athlon 64 X2デュアルコアマシンのタイミング:
//
// 並列foreach:     388ミリ秒
// 通常のforeach:   619ミリ秒
foreach (i, ref elem; taskPool.parallel(logs))
{
    elem = log(i + 1.0);
}

注釈 この実装のメモリ使用量は、 一定であることが保証されている range.length

break、ラベル付きbreak、ラベル付きcontinue、return、またはgoto文を使用して並列foreachループを中断すると、 "ParallelForeachError"が発生する。
ランダムアクセス範囲以外の場合は、並列foreachは遅延して " workUnitSizeループの並列部分を実行する前に 。 例外として、並列 foreach が asyncBuf またはmap によって返された範囲で実行される場合は、コピー処理が省略され 、バッファが単に交換される。 この場合、 workUnitSizeは無視され、ワークユニットのサイズはバッファサイズに設定される range
ループの終了時にはメモリバリアが確実に実行されるため、 すべてのスレッドで生成された結果が呼び出し元のスレッドで確認できる。
Exception Handling :
並列 foreach ループ内で少なくとも 1 つの例外がスローされた場合、Task オブジェクトの追加の送信は、 非決定論的な方法でできるだけ早く終了する。実行中または キューに入れられたすべてのワークユニットは完了が許可される。その後、Throwable.next を使用して、すべてのワークユニットでスローされたすべての例外が連結され、 再スローされる。例外の連結の順序は非決定論的である。

template amap(functions...)
auto amap(Args...)(Args args)
if (isRandomAccessRange!(Args[0]));
eager parallel map。この関数のeagerな性質は、 遅延評価されるTaskPool.map よりもオーバーヘッドが少なく、 eagerな性質によるメモリ要件が許容できる場合は、こちらが優先されるべきである。functions は評価される関数であり、テンプレート化された std.algorithm.iteration.map。 最初の引数はランダムアクセス可能な範囲でなければならない。パフォーマンス上の理由から、 amapは範囲の要素がまだ初期化されていないと仮定する。 要素はデストラクタを呼び出したり代入を行なわずに上書きされる。そのため、範囲には意味のあるデータを含めてはならない。 :初期化されていないオブジェクト、または.init 状態のオブジェクト。
auto numbers = iota(100_000_000.0);

// 数字の平方根を求めよ。
//
// Athlon 64 X2デュアルコアマシンのタイミング:
//
// 並列のeager map:                      0.802 s
// 同等のシリアル実装:                   1.768 s
auto squareRoots = taskPool.amap!sqrt(numbers);
range引数の直後に、オプションのwork unit size引数を指定できる。 ここで使用されるwork unitは、 amapパラレル foreach で定義されているものと同じである。 ワークユニットサイズが指定されていない場合は、 デフォルトのワークユニットサイズが使用される。
// 同じことだが、作業単位のサイズを100にする。
auto squareRoots = taskPool.amap!sqrt(numbers, 100);
結果を返す出力範囲は、最後の引数として指定できる。 指定しない場合は、適切な型の配列が ガベージコレクション対象のヒープ上に割り当てられる。指定する場合は、 割り当て可能な要素を持つランダムアクセス範囲でなければならず、 。異なるスレッドから隣接する要素への書き込みは 安全でなければならない。
"@safe"@safe
// 同じことだが、結果を返す配列を明示的に
// に割り当てる。配列の要素型は、
// 関数によって返される正確な型、
// または暗黙的な変換対象のどちらでもよい。
//
auto squareRoots = new float[numbers.length];
taskPool.amap!sqrt(numbers, squareRoots);

// 複数の関数、明示的な出力範囲、
// 明示的な作業単位サイズ。
auto results = new Tuple!(float, real)[numbers.length];
taskPool.amap!(sqrt, log)(numbers, 100, results);
注 メモリバリアは、すべての結果が書き込まれた後 、呼び出し元のスレッドですべてのスレッドが生成した結果が可視となるように、 呼び出し元が返される前に実行されることが保証されている。

注釈 メモリバリアは、すべての結果が書き込まれた後 、呼び出し元のスレッドですべてのスレッドが生成した結果が可視となるように、呼び出し元が返される前に実行されることが保証されている。

ヒント マッピング操作をその場で実行するには、入力と出力の範囲に同じ範囲を指定する。

評価に時間がかかる要素を含む範囲のコピーを配列に並列化するには、アイデンティティ関数(引数に何を与えても同じ値を返す関数)を amap
Exception Handling :
マップ関数内部から少なくとも1つの例外がスローされた場合、Task オブジェクトの追加の送信は、 非決定論的な方法でできるだけ早く終了する。現在実行中または キューに入れられたすべてのワークユニットは完了が許可される。その後、すべてのワークユニットからスローされたすべての例外はThrowable.next を使用して連結され、 再スローされる。例外の連結の順序は非決定論的である。

template map(functions...)
auto map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
if (isInputRange!S);
パイプライン処理に使用できる準レイジーな並列マップ。マップ 関数は最初の要素に対して評価され bufSize要素が評価され、 バッファに格納され、popFront で利用可能になる。その間、 バックグラウンドで同じサイズの2番目のバッファが満たされる。最初の バッファが使い果たされると、2番目のバッファと入れ替えられ、 元々は2番目のバッファであった値が読み込まれる間に満たされる。この 実装により、各書き込みに対してアトミック操作や同期を行う必要なく、 マッピング関数を並列で効率的に評価することが可能になる。
mapamapで使用されているよりシンプルな手順よりもオーバーヘッドは大きいものの、 すべての結果を同時にメモリに保持する必要がなく、 ランダムアクセスではない範囲でも動作する。
Parameters:
S source マッピングされる入力範囲sourceランダムアクセスでない場合は、 遅延バッファリングにより、 bufSize前に マップ関数が評価される。(このルールに対する例外については、"注釈:"を参照のこと。)
size_t bufSize 評価された要素を格納するバッファのサイズ。
size_t workUnitSize 1回のTask で評価する要素の数。 この値は、 bufSize、 かつ、 bufSizeすべてのワーカー・スレッドが使用できる ようにしなければならない。size_t.maxのデフォルト値を使用する場合は、workUnitSizeは プール全体のデフォルト値に設定される。
Returns:
マップの結果を表す入力範囲。この範囲は、 長さを持つ場合、 source長さを持つ。

注釈 が返した範囲、またはxml-ph-0000@deepl.internalが mapまたはasyncBuf によって返された範囲が入力として使用される場合、 map場合、最適化のため、最初の範囲の出力バッファから 2番目の範囲の入力バッファへのコピーは省略される。 たとえ、 mapasyncBuf が返す範囲はランダムアクセス可能な範囲ではない。 つまり、 bufSize現在の呼び出しに渡されたパラメータは map無視され、バッファのサイズは のバッファサイズとなる source

// パイプラインはファイルを読み込み、各行を
// 数値に変換し、その数値の対数を取り、
// 対数の合計を求めるために
// 必要な加算を行う。

auto lineRange = File("numberList.txt").byLine();
auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
auto nums = taskPool.map!(to!double)(dupedLines);
auto logs = taskPool.map!log10(nums);

double sum = 0;
foreach (elem; logs)
{
    sum += elem;
}
Exception Handling :
反復処理中またはマップ関数の計算中にスローされた例外は、 source またはマップ関数の計算中にスローされた例外は、popFront への呼び出し時に再スローされるか、 または構築中にスローされた場合は、単に呼び出し側に伝搬される。 マップ関数の計算中にスローされた例外の場合、 例外はTaskPool.amap のように連結される。

auto asyncBuf(S)(S source, size_t bufSize = 100)
if (isInputRange!S);
source反復処理がコスト高になる範囲が与えられた場合、 入力範囲を返します。 その内容は非同期でバッファに source要素のバッファに bufSize要素のバッファに 非同期で内容をバッファリングし、 bufSize、返されたオブジェクトの範囲インターフェースを介して利用可能にする。 返された範囲の長さは、hasLength!S の場合にのみ有効である。 asyncBufこれは、例えば、 ディスクやネットワーク上のデータを表す範囲の要素に対して高価な操作を実行する際に

import std.conv, std.stdio;

void main()
{
    // 以前に取得した行を処理しながら、
    // バックグラウンドスレッドでファイルの行を取得し、
    // すべての行を積極的に複製することで、
    // byLineのバッファのリサイクルに対応する。
    auto lines = File("foo.txt").byLine();
    auto duped = std.algorithm.map!"a.idup"(lines);

    // メモリに読み込んだ行をダブルの行列に処理している間、
    // バックグラウンドで
    // さらに多くの行を読み込む。
    double[][] matrix;
    auto asyncReader = taskPool.asyncBuf(duped);

    foreach (line; asyncReader)
    {
        auto ls = line.split("\t");
        matrix ~= to!(double[])(ls);
    }
}
Exception Handling :
反復処理中にスローされた例外はすべて、 sourcepopFront への呼び出し時に再スローされます。 または、構築中にスローされた場合は、 単に呼び出し側に伝播されます。

auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
if (is(typeof(C2.init()) : bool) && (Parameters!C1.length == 1) && (Parameters!C2.length == 0) && isArray!(Parameters!C1[0]));
コール可能なオブジェクトが nextユーザーが提供したバッファに書き込みを行うオブジェクトと、 2番目の呼び出し可能なオブジェクトが emptyより多くのデータが 書き込み可能かどうかを判断し、 next、入力範囲を返す。 非同期で呼び出しを行う nextバッファのサイズのセットで nBuffersバッファのセットを 非同期で呼び出し、 返されたオブジェクトの入力範囲インターフェースを介して取得された順番に結果を利用できるようにする。 入力範囲のオーバーロードと同様に、 asyncBuf、バッファの前半分は 範囲インターフェイス経由で利用可能になり、後半分が 埋められ、その逆も同様である。
Parameters:
C1 next 単一の引数を受け取り、その引数は変更可能な要素を持つ配列でなければならないコール可能なオブジェクト。 コールされた場合、 next呼び出し元が提供した配列にデータを書き込む 。
C2 empty 引数を取らず、暗黙的にbool に変換できる型を返すコール可能オブジェクト。 これは、 これ以上コールしてもデータが取得できないことを示すために使用される next
size_t initialBufSize 各バッファの初期サイズ。 next配列を 参照として取得している場合、バッファのサイズを変更できる。
size_t nBuffers 呼び出し時に順に処理するバッファの数 next

// 以前に取得した行を処理しながら、
// バックグラウンドスレッドでファイルの行を取得し、
// 行を重複させない。
auto file = File("foo.txt");

void next(ref char[] buf)
{
    file.readln(buf);
}

// メモリに読み込んだ行をダブルの行列に処理している間、
// バックグラウンドで
// さらに多くの行を読み込む。
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(&next, &file.eof);

foreach (line; asyncReader)
{
    auto ls = line.split("\t");
    matrix ~= to!(double[])(ls);
}
Exception Handling :
range を繰り返し処理中にスローされた例外は、popFront をコールした際に再スローされる。

警告 この関数によって返された範囲を並列 foreach ループで使用すると、 バッファがキューに入れられている間に上書きされる可能性があるため、動作しない。 これはコンパイル時にチェックされ、"static assert" の失敗につながる。

template reduce(functions...)
auto reduce(Args...)(Args args);
ランダムアクセス範囲での並列 reduce。特に注釈がない限り、 使用法は std.algorithm.iteration.reduce。 また、 fold異なるパラメータ順序で同じことを行うものもある 。
この関数は、削減される範囲をワークユニットに分割することで動作する。 ワークユニットは、並列に削減されるスライスである。すべてのワークユニットの結果が 計算されると、最終的な結果を計算するために、これらの結果に対して最終的なシリアル削減が実行される。 したがって、シード値を適切に選択する必要がある。
並列で減算が実行されるため、functionsは 連想配列でなければならない。表記を簡潔にするため、# を functions を表す中置演算子とする。 すると、(a # b) # c は a # (b # c) と等しくなければならない。浮動小数点数の加算は、 正確な算術における加算は連想配列ではないが、 浮動小数点数の合計をこの関数を使用して計算すると、 。しかし、多くの実用的な目的では浮動小数点数の加算は 結合性があるものとして扱うことができる。
functions は結合性があることが想定されているため、 還元アルゴリズムの直列部分に追加の最適化が施されている。 これらは、このモジュールの残りの部分が利用するスレッドレベルの並列処理に加えて、 最新のCPUの命令レベルの並列処理を活用する。 これにより、 std.algorithm.iteration.reduce。特に ドット積のような細粒度のベンチマークではその傾向が強い。
明示的なシード値を最初の引数として指定できる。 指定した場合は、すべてのワークユニットと、すべてのワークユニットからの結果の最終的な 削減のシードとして使用される。したがって、 実行する演算の値が std.algorithm.iteration.reduce、または 使用されるワークユニットの数によって異なる結果が生成される可能性があります。次の引数は、 縮小される範囲でなければなりません。
// 明示的なシードを使用して、
// 範囲の平方和を並列で求める。
//
// Athlon 64 X2デュアルコアマシンのタイミング:
//
// 並列のreduce:                        72ミリ秒
// 代わりにstd.algorithm.reduceを使う:  181ミリ秒
auto nums = iota(10_000_000.0f);
auto sumSquares = taskPool.reduce!"a + b"(
    0.0, std.algorithm.map!"a * a"(nums)
);
明示的なシードが提供されない場合、各ワークユニットの最初の要素が シードとして使用される。最後の削減では、最初のワークユニットの結果が シードとして使用される。
// 各ワークユニットの最初の要素をシードとして使用し、
// 並列に範囲の合計を求める。
auto sum = taskPool.reduce!"a + b"(nums);
明示的なワークユニットのサイズは最後の引数として指定できる。 ワークユニットのサイズを小さくしすぎると、事実上、 各ワークユニットの結果の最終的な削減が計算時間を支配するため、 削減がシリアル化される。このインスタンスのTaskPool.size がゼロの場合、 このパラメータは無視され、1つのワークユニットが使用される。
// ワークユニットのサイズは100とする。
auto sum2 = taskPool.reduce!"a + b"(nums, 100);

// ワークユニットサイズ100と明示的なシード。
auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
並列リダクションは、複数の関数をサポートしている。 std.algorithm.reduce
// numsの最小値と最大値の両方を見つける。
auto minMax = taskPool.reduce!(min, max)(nums);
assert(minMax[0] == reduce!min(nums));
assert(minMax[1] == reduce!max(nums));
Exception Handling :
この関数の実行が終了すると、スローされた例外は Throwable.next 経由で連結され、再スローされる。 連結の 順序は非決定論的である。
See Also:
fold関数と機能的に同等であるが、 reduceただし、 範囲パラメータが最初に来るため、 tuple複数のシードを使用する必要がない。
template fold(functions...)
auto fold(Args...)(Args args);
同音異義関数(accumulatecompressinjectfoldl とも呼ばれる)を実装する。これは、関数型のさまざまなプログラミング言語に存在する。
fold関数的に同等である。ただし、 reduceただし、範囲 パラメータが最初に来るため、 tuple複数のシードを使用する必要がない。
適用する呼び出し可能なエンティティ(functions 引数)が1つ以上ある可能性がある。
Parameters:
Args args 折り返す範囲のみ、または範囲と関数ごとの1つのシード 、または範囲、関数ごとの1つのシード、および ワークユニットサイズ
Returns: 単一関数に対する単一値としての累積結果、および複数の関数に対する値のタプルとして
単一関数に対する単一値としての累積結果、および 複数の関数に対する値のタプルとして
See Also:
と同様、 std.algorithm.iteration.foldfoldラッパーである。 reduce

static int adder(int a, int b)
{
    return a + b;
}
static int multiplier(int a, int b)
{
    return a * b;
}

// rangeだけ
auto x = taskPool.fold!adder([1, 2, 3, 4]);
assert(x == 10);

// 範囲とシード(以下0と1; またこの例では
// 複数の関数にも注目)
auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
assert(y[0] == 10);
assert(y[1] == 24);

// 範囲、シード(0)、ワークユニットサイズ(20)
auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
assert(z == 10);

const nothrow @property @safe size_t workerIndex();
このTaskPool に対する現在のスレッドのインデックスを取得する。 このプールに属さないスレッドはすべてインデックス0を受け取る。このプール内のワーカー・スレッドは、 1からthis.size までの一意のインデックスを受け取る。
この関数は、ワーカーローカルのリソースを管理するのに役立つ。

// 0から999までのすべての数値と42の
// 最大公約数を計算するループを
// 並列に実行する。結果を各スレッドごとに
// ファイルに書き出す。これにより、
// synchronizedを使用せずに結果を書き出すことができる。

import std.conv, std.range, std.numeric, std.stdio;

void main()
{
    auto filesHandles = new File[taskPool.size + 1];
    scope(exit) {
        foreach (ref handle; fileHandles)
        {
            handle.close();
        }
    }

    foreach (i, ref handle; fileHandles)
    {
        handle = File("workerResults" ~ to!string(i) ~ ".txt");
    }

    foreach (num; parallel(iota(1_000)))
    {
        auto outHandle = fileHandles[taskPool.workerIndex];
        outHandle.writeln(num, '\t', gcd(num, 42));
    }
}

struct WorkerLocalStorage(T);
ワーカーローカルストレージを作成するための構造体。ワーカーローカルストレージは、 スレッドローカルストレージであり、特定のワーカースレッドのみに存在する。TaskPool とプール外の単一スレッドである。これは、 "ガベージコレクション"ヒープ上に割り当てられ、"誤った共有"を回避する。 必ずしも、どのスレッド内でもグローバルスコープを持つとは限らない。TaskPool 内の任意のワーカースレッドからアクセスでき、 このTaskPool の外にあるスレッドもアクセスできる。 ワーカーローカルストレージのインスタンスを作成したプール外にあるすべてのスレッドは、
この構造体の基礎となるデータはヒープ上に割り当てられるため、この構造体は 関数間で受け渡しされる際に参照セマンティクスを持つ。
主な使用例は次のとおりである。 WorkerLocalStorage次のとおりである。
  1. 関数型プログラミングとは対照的に、命令型で並列的な集約を行う。 この場合、 WorkerLocalStorageアルゴリズムの並列処理部分のみ、各スレッドのローカルとして 扱うことが
  2. 並列 foreach ループの反復処理で一時バッファを再利用する。

// 概要の例のように円周率を計算するが、
// 関数スタイルではなく命令型を使用する。
immutable n = 1_000_000_000;
immutable delta = 1.0L / n;

auto sums = taskPool.workerLocalStorage(0.0L);
foreach (i; parallel(iota(n)))
{
    immutable x = ( i - 0.5L ) * delta;
    immutable toAdd = delta / ( 1.0 + x * x );
    sums.get += toAdd;
}

// 各ワーカースレッドからの結果を合計する。
real pi = 0;
foreach (threadResult; sums.toRange)
{
    pi += 4.0L * threadResult;
}
現在のスレッドのインスタンスを取得する。参照渡しで返す。 呼び出しに注意すること。

@property ref auto get(this Qualified)();
現在のスレッドのインスタンスを取得する。参照渡しで返す。 注釈: getこのインスタンスを作成したTaskPool 以外のスレッドから呼び出すと、 同じ参照が返されることに注意すること。 そのため、ワーカーローカルストレージのインスタンスは、 それを生成したプール外の1つのスレッドからのみアクセスすべきである。この ルールが破られた場合、"未定義の動作"となる。
アサーションが有効で、toRange が呼び出された場合、この WorkerLocalStorage インスタンスはもはやワーカーローカルではなくなり、 このメソッドを呼び出すとアサーションエラーが発生する。これは、 パフォーマンス上の理由でアサーションが無効になっている場合はチェックされない。
@property void get(T val);
現在のスレッドのインスタンスに値を代入する。この関数には、 オーバーロードされた関数と同じ注意点がある。
@property WorkerLocalStorageRange!T toRange();
すべてのスレッドの値の範囲ビューを返す。これは、 アルゴリズムの並列部分を実行した後に各スレッドの結果をさらに処理する ために使用できる。このメソッドをアルゴリズムの並列部分で使用してはならない 。
この関数を呼び出すと、この構造体がワーカーローカルではなくなることを示すフラグが設定され、get メソッドを再度使用しようとすると、 アサーションが有効になっている場合はアサーションエラーが発生する。
struct WorkerLocalStorageRange(T);
ワーカーローカルストレージ用の範囲プリミティブ。この目的は、 複数のスレッドからワーカーローカルストレージを使用しなくなった場合に、 各ワーカースレッドが生成した結果を単一のスレッドからアクセスできるようにすることである。 この構造体をアルゴリズムの並列部分で使用しないこと。
このオブジェクトをインスタンス化する適切な方法は、WorkerLocalStorage.toRange を呼び出すことである。インスタンス化されると、このオブジェクトは 割り当て可能な"l値"要素を持つ有限のランダムアクセス範囲として動作し、TaskPool 内のワーカー・スレッドの数に1を加えた長さを持つ。
WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init);
与えられた値で初期化されたワーカーローカルストレージのインスタンスを作成する。 値はlazy なので、例えば、 各ワーカーごとにクラスのインスタンスを簡単に作成できる。使用例については、WorkerLocalStorage 構造体を参照のこと。
@trusted void stop();
ワーカーのスレッドすべてに、現在のTask が完了次第、またはTask を実行していない場合は即時に終了するよう通知する。 キューに入っているTaskは、 Task.workForceTask.yieldForce 、またはTask.spinForceへの呼び出しによって実行されない限り、実行されない。
Task をすべて待っており、キューが空であることを確認している場合、または 推測でいくつかのタスクを実行し、もはや結果が必要ない場合にのみ使用する。
@trusted void finish(bool blocking = false);
キューが空になったときにワーカー・スレッドを終了させる。
ブロック引数が true の場合、すべてのワーカー スレッドが終了するまで待機してから 戻る。 このオプションは、 タスクの結果が決して消費されないアプリケーションで使用される。例えば、TaskPool が 戻り値以外の方法で通信を行うタスクの基本的なスケジューラとして採用されている場合などである 。

警告 この関数を、blocking = true を指定して、 呼び出されるワーカースレッドと同じTaskPool のメンバーである finishデッドロックが発生します。

const pure nothrow @property @safe size_t size();
プール内のワーカー スレッドの数を返す。
void put(alias fun, Args...)(ref Task!(fun, Args) task)
if (!isSafeReturn!(typeof(task)));

void put(alias fun, Args...)(Task!(fun, Args)* task)
if (!isSafeReturn!(typeof(*task)));
Task オブジェクトをタスクキューの最後尾に追加する。Task オブジェクトはポインタまたは参照で渡すことができる。

import std.file;

// タスクを作成する。
auto t = task!read("foo.txt");

// 実行キューに追加する。
taskPool.put(t);

注釈: この関数の"@trusted"オーバーロードは、Taskstd.traits.hasUnsharedAliasingTask の戻り値の型が false である場合、またはTask が実行する関数がpure である場合 Task オブジェクトは、 @trusted のオーバーロードで指定されている他のすべての要件を満たす taskscopedTask は作成 され、@safe コードからTask.executeInNewThread を介して実行できるが、TaskPool を介しては実行できない。

この関数はスタック上に存在する可能性のある変数のアドレスを取得するが、 一部のオーバーロードは@trustedとしてマークされている。Task には、タスクが完了するまで待機するデストラクタが含まれており、 割り当てられたスタックフレームを破棄する前にタスクが完了する。したがって、 タスクが完了する前にスタックフレームが破棄されることは不可能であり、TaskPool によって参照されなくなることもない。

@property @trusted bool isDaemon();

@property @trusted void isDaemon(bool newVal);
これらのプロパティは、ワーカースレッドがデーモンスレッドであるかどうかを制御する。 デーモンスレッドは、非デーモンスレッドがすべて終了すると自動的に終了する。 非デーモンスレッドは、 プログラムが終了するまで、プログラムの終了を妨げる。
非デーモン・スレッドを持つTaskPool がアクティブな場合、プログラムを終了させる前に、stop またはfinish を呼び出す必要がある。
taskPool プロパティによって返されるTaskPool インスタンスのワーカー・スレッドは、 デフォルトでデーモンとなる。手動でインスタンス化されたタスク・プールのワーカー・スレッドは、 デフォルトで非デーモンとなる。

注釈 サイズゼロのプールでは、ゲッターは任意で true を返し、 セッターは効果がない。

@property @trusted int priority();

@property @trusted void priority(int newPriority);
これらの関数は、このTaskPool 内のワーカー・スレッドのOSスケジューリング優先度の取得と設定を許可する。 これらは core.thread.Thread.priorityので、ここで指定する優先度値はcore.thread における同一の優先度値と同じ意味を持つ。

注釈 サイズゼロのプールでは、ゲッターは任意でcore.thread.Thread.PRIORITY_MIN を返し、セッターは効果を持たない。

@property @trusted TaskPool taskPool();
TaskPool の遅延初期化されたグローバルなインスタンスを返す。 この関数は、複数の非ワーカー・スレッドから同時に安全に呼び出すことができる。 このプール内のワーカー・スレッドはデーモン・スレッドであり、 メイン・スレッドを終了する前にTaskPool.stop またはTaskPool.finish を呼び出す必要はない。
@property @trusted uint defaultPoolThreads();

@property @trusted void defaultPoolThreads(uint newVal);
これらのプロパティは、TaskPoolインスタンスのワーカースレッドの数を取得および設定する。taskPool が返す。 デフォルト値はtotalCPUs - 1である。taskPool への最初の呼び出し後にセッターを呼び出しても、taskPool が返すインスタンスのワーカースレッドの数は変更されない。
ParallelForeach!R parallel(R)(R range);

ParallelForeach!R parallel(R)(R range, size_t workUnitSize);
転送する便利な関数 taskPool.parallel。 これらの目的は、並列 foreach をより簡潔で読みやすくすることである。

// デフォルトのTaskPoolインスタンスを使用して、
// 1から1_000_000までの
// すべての数の対数を並列に求める
auto logs = new double[1_000_000];

foreach (i, ref elem; parallel(logs))
{
    elem = log(i + 1.0);
}
example例