本日は引き続き、前回に読み進めてみた、調べものをしているときに見つけて気になった Concurrency
による並行処理と セマフォ
についての技術ブログの続きを眺めていきます。前回はその材料的なところを確認していくところを読み終えたので、今日は実際の具体的な実装についての思考を読み進めながら、書かれている事柄を理解していけたらいいなと思ってます。どうぞよろしくお願いしますね。
——————————————————————————————————————————— 熊谷さんのやさしい Swift 勉強会 #202
00:00 開始 00:10 今回の展望 01:00 セマフォをスレッドのブロックに使うのはアリ? 04:30 ロック 06:11 Concurrency にセマフォは必要? 07:00 セマフォ実装の方向性 08:35 CheckedContinuation は Sendable 12:30 これでセマフォは作れそう? 14:15 複数のタスクを待つ場面を想定 16:04 セマフォの状態管理まわりの実装 17:52 wait の実装 19:55 中断ポイントの特性に注意 21:49 Actor で通常はプロパティーを外から変更できない 23:23 処理タスクを継承する特例 25:11 isolated 環境における不自然さの検証 27:30 Actor のプロパティーの編集操作は isolated で保護される対象 28:18 isolated 環境の暗黙継承 29:15 アンダースコアの関数はリリースで使わないのが良さそう 30:03 クロージング ———————————————————————————————————————————
Transcription & Summarize : 熊谷さんのやさしい Swift 勉強会 #202
では始めていきますね。要は前回の続きで、興味が湧いて読み始めたブログを引き続き読み進めていきたいと思っています。前回は、このコンカレンシー(並行性)周りの基本的なセマフォを作る上での材料となり得るものと、その雰囲気を眺めるという内容のブログを読み終えました。今回は実際にそれらを使ってセマフォを作っていけるのではないかと思います。このビデオを読み進めていけばまだしっかり読んでいないのでわかりませんが。
前回お話ししていてとても気になったのは、自分のセマフォの使い方が特殊な可能性があるということです。それは、ズバリその使い方が特殊だと指摘されたからです。確かに、違和感は感じていたのですが、間違っているかどうかはわかりません。要はブロックするためにセマフォを使うという感じでやっていて、前回も紹介したコードがあります。
例えば、Dispatchを使ってセマフォを操作します。以下のようにコードを書きます。
let semaphore = DispatchSemaphore(value: 0)
DispatchQueue.global().async {
Thread.sleep(forTimeInterval: 5) // スレッドを5秒間スリープさせる
semaphore.signal()
}
semaphore.wait() // セマフォを待つ
このようにしてスレッドをブロックし、結果を得るというつもりでした。以下のように、結果を持っておいて、処理が終わったら結果を表示します。
var result = 0
DispatchQueue.global().async {
Thread.sleep(forTimeInterval: 5)
result = 100
semaphore.signal()
}
semaphore.wait() // セマフォを待つ
print(result) // 5秒後に100が表示される
これが私が頻繁に使っていたセマフォの使い方ですが、これは普通なのでしょうか、それとも異常なのでしょうか、この点がわからなくなってしまいました。このような使い方が比較的されているようですが、注意深く教えてもらいたいと思います。
確かに、前回伺ったようなリソース管理で「このリソースはどこまで使える」という形での使用はセマフォでは行っていなかったと思います。もともとセマフォを使い始めたのは、マルチスレッドでロックをかけたいときに探して見つけたのがきっかけでした。それ以来、この使い方が定着してしまいました。他にもロックでNSLock
などがありますが、忘れてしまいました。例えば、以下のように書きます。
let lock = NSLock()
lock.lock()
// クリティカルセクションの操作
lock.unlock()
これがプロパティのオプションをするときの使い方です。同様にセマフォでは1からスタートするパターンもあり、例えば以下のように他のスレッドでsignal
を呼ぶときも使います。
let semaphore = DispatchSemaphore(value: 1)
DispatchQueue.global().async {
semaphore.wait()
// クリティカルセクション
semaphore.signal()
}
これはロックされる形になります。セマフォが1からスタートし、バイナリセマフォの形になる感じですね。この中で共有リソースを操作したり、異なる操作をすることができます。これは昔の同期化の手法とも似ています。例えば、Objective-Cでよく使いました。
@synchronized (self) {
// クリティカルセクションの操作
}
懐かしいですね。このようにして同期化を行っていました。
今回はここまでです。また次回も同様に進めていきたいと思います。ありがとうございました。 とりあえず、セマフォについてですが、ウェイトとシグナルが実現できれば、その書き方はどうであれ、リソースの扱い方が重要です。また、非同期処理でこれを行う場合、セマフォをわざわざ作成する必要があるのか疑問に思うこともあります。それならファクターを利用するのがよいかもしれませんね。これについて考えながら読み進めていきたいです。
何だかちょっと方向性が見えなくなってきましたが、とにかく読んで実装部分に取り掛かりましょう。その前に前提についておさらいします。「チェックド・コンティニューション」について前回話しましたよね。これをもう一度確認しましょう。既存のコードをSwiftコンパイラに移行するときによく使います。確かに、コールバックをawait
に変換するツールとして便利です。
セマフォとして使えるかどうか、シンプルなケースについて考えます。セマフォのウェイト
側でawait
コンティニューションを使い、シグナル
側でresume
を呼べたら、実装できる可能性があります。
要は、resume
ともう一つ、何でしたっけ。普通、await
のブロック内でresume
を使いますが、これを別の場所で使いたいということですかね。それは少し無理があるかもしれませんが。
チェックド・コンティニューション
についてもう一度調べます。この型そのものではなく、withCheckedContinuation
がボディに渡され、これがセンダブルなのかということですね。別のタスクに渡してよいのでしょうか。
何もブロックしないコードだけ書き始めますが、まずコンティニューションでこの部分をタスクに渡せるかどうか見てみます。 現在、エラーが出ているのはawait
が抜けているからと思われます。これは不要かもしれません。
withCheckedContinuation
を分けましたが、再度書き直します。これでコンティニューションがT
のNever
から始まります。リターンポイントを指定すれば、Tが適切に推論されるはずです。
ここでタスクを飛ばし、await
のところで再度チェックします。タスクが終了するまで待つ必要がありますね。これでセマフォが適切に動作するはずです。
具体的なコードとして、以下のようになります:
withCheckedContinuation { continuation in
Task {
try await Task.sleep(nanoseconds: 2_000_000_000)
continuation.resume()
}
}
これで、タスクが飛んでいる状態で、withCheckedContinuation
は待ち状態となります。このタスクが終了すると、resume
が呼ばれる仕組みです。プリントで確認すると、2秒後に待ち状態が解除されるはずです。 1秒後にBが出てすぐCが出るという感じでしょうか。そうなんですね、コンティニューションを別のトラックに投げられるんです。1000Wだから全然問題ないですね。もうこれでいいんじゃないですかね。要は、タスクを立ち上げてその中でいろいろやって「リデウム」するまで待つというのを取ればいいんです。要するに、セマフォで作っちゃえばいいんじゃないかなと思います。例えば、プラスプロトコルいいか、プロトコルセマフォでやって、それでプライベート変数にコンティニューションをプリントし、ボイドやネバーのイニシャライザーじゃないですね。ここはオプショナルにしました。これでファンクションとして「ウェイト」を使ったときに、「ウェイト」の場合は待って「Async」ファンクションにしないといけないですね。理解が追いついてないところもありますが、この辺りどうやって実装を合わせていくんでしょうかね。
少し読んでみましょう。「ウェイト」側でwithCheckedContinuation
を呼びますね。シグナル側でリターンの「リデウム」を呼ぶので実装できそうです。ただ、「ウェイト」でどうブロックするのかが問題です。もしawait
メソッドで作っちゃったとしたら、自分の理解が追いついてないかもしれませんが、まず読み進めてみましょう。傾向でさまざまなタスクからアクセスされても、セマフォの内部状態である配列と値……何を言っているんだろう、読み飛ばしたかな。
同時に複数のタスクが待つ可能性があるので、一つのリソースに対して複数のタスクから同時アクセスを試みるといった話ですね。そういうときには一つのCheckedContinuation
が足りませんが、いろんなところから「ウェイト」するので、その「ウェイト」を配列で扱えば良いでしょう。カウンタブルなセマフォを作ろうとしているので、そのカウンターが全部ゼロになったときにすべての待ちに対して「ウェイト」をパスさせる、要は「ウェイト」をパスするのです。
なるほど、これをするには、コンティニューションの管理タスクも必要になってくるのかな。いずれにしても、複数のコンティニューションを配列で管理すれば、これらすべてを待たせることができるんです。その配列、たくさんあるコンティニューションを矛盾なく管理するためにはアクターが向いていそうです。でもアクターだけじゃ駄目なのか、まあいいや、そんな疑問は後にして先に進みましょう。
では実装してみましょう。状態はとりあえず待機中のコンティニューションの配列とセマフォが必要です。コンティニューションセマフォをアクターで実装して、waiters
がコンティニューションのリストですね。
出力機器が空ってセマフォの値、リソースの数ですね。なるほど。イニシャライザーはこのカウントを保持して、すごくしっかりアサートを使っています。その上で「有効な状態」はセマフォの値が0以上であって、waiters
が……アサートでやっているんですね。ウェイターズがエンプティだから0以上なんですね。ウェイターズがいないか、またはウェイターズカウントがマイナス値であってはならない。ああ、そういうことか。カウントがマイナスになると待ち状態じゃないのに待ってる人がいるみたいな感じになるんですね。
これは今適当に理解したけど合ってるのかな。マイナスだとロック状態で、ウェイターズがいる状態ですね。これはそこで通知するんだ。「有効な状態」がちょっとしたサポート関数みたいな感じなのかな。実際に使うのかもしれませんが、次に進みましょう。
「ウェイト」の実装がやっぱりですね。まず見てみますかね。「ウェイト」関数がasync
になっていて、バリューが1ある、そこで0以下だったらブロックする状態になるわけですね。0未満だったら、0だったときにウェイトで引いてたからそうですね。そうしたときにawait
でコンティニューションを「ウェイターズ」にアップエンドして、バリデーションを取る。あと出せればいいよね、って確かにそうですね。 その辺の管理はプログラマーがやればいいのですが、バリデーションについては気をつけるべきです。バリデーションが何をしているのか、ウェイターがどうなっているかということの方が重要でしょう。
例えば、コードの中で await
を使って特定のポイントを待っているとします。await
で待っている間に処理が中断され、再開されるタイミングというのは注意が必要です。コンティニュエーションの中で、例えば「ボタンがクリックされた」というイベントを待つとします。
それでは具体的な例を見てみましょう。ある箇所で以下のように await
を使うとします。
// `await someAsyncFunction()` を呼ぶ
let result = await someAsyncFunction()
ここで処理が中断され、再開された時点で特定の変数 value
が更新されている可能性があります。それによってバリューが狂ってしまう場合があります。そのため、並行処理や非同期処理を行う際には特に注意が必要です。
コードの実装が大丈夫かどうかを確認する際には、以下のようなポイントを考慮します。
await
を呼んだ後に別のタスクに移る可能性がある。value
が変わる可能性があり、それが非同期処理や並行処理に影響する。
次に、アクターが登場する場合の話に移ります。アクターにとって、あるメソッドや関数がそのアクターのアイソレイティブに属するかどうかは重要です。特定のアクター内で関数が実装される場合、その関数は await
を使わずにそのアクターのコンテキストで動作します。しかし、アクター外で動作する場合には、その関数は非同期に動作します。
例えば、以下のようにアクターのプロパティを変更するためには、適切なコンテキストで操作される必要があります。
actor MyActor {
var value: Int = 0
func updateValue(newValue: Int) {
self.value = newValue
}
func asyncUpdateValue(newValue: Int) async {
self.value = newValue
}
}
上記のコードでは、updateValue
関数はアクターのコンテキストで動作するので await
は不要ですが、asyncUpdateValue
関数は非同期処理を行うため await
が必要です。
結論として、非同期処理や並行処理を行う際に特に注意すべきなのは、それぞれの処理がどのタイミングで動作し、変数がどのように影響を受けるかをちゃんと管理することです。これにより、安全で効率的なコードを書くことができます。 今回のビットチェックとコンティニューエーションは、アイソレイティブをパラメータで使っていないのにおかしいね、という話がされていました。こうした掘り下げの姿勢には好感が持てますね。コンパイルレベルを上げてエラーをしっかり確認する手順がブログにまとめられていると、とても助かります。
また、シンプルなケースについての説明もありました。例えば、非非同期な関数の中で何も引数を取らないクロージャを実行する場合、アクターの中で普通の関数を呼び出すことは問題ありません。しかし、これを非同期関数にしてみると、非同期メソッドの中で関数を呼び出す際にawait
を使用する必要があります。
その際、アクター全体のアイソレイトを超える可能性があり、別のタスクやアクターを超えて関数が実行される可能性があるため、Sendable
でないと警告が出ます。クロージャが別のタスクに渡される可能性があるため、Sendable
を付けないとエラーが発生します。
さらに、Sendable
を付けるとエラーになる部分についてですが、例えばミュータブルな状態(mutated
)を持っている場合です。アクターがアイソレイティブであるため、クロージャを別の環境から渡すとアイソレイティブではなくなります。そのため、メソッドなどを通じて一単位で処理させる必要があります。
このようにして作成された関数は、ビットチェックコンティニューエーションのようになりますが、ここではawait
は必要ないという話になります。そして、Sendable
は付いていませんが、@unsafeInheritExecutor
という属性が付いています。この属性を使うと、非同期関数であってもアクターから呼ばれる際に実行環境(エグゼキュータ)が提唱されます。したがって、Sendable
やスケーピングではなく、そのままシフトしていく感じです。
リリースで使う場合にはこの属性に注意する必要があります。通常、リリースではアンセーフなものは避けるべきですが、個人の楽しみとしては使用しても良いとのことです。また、このブログのパターンによると、アンセーフは注意すべきとのことも述べられています。
勉強会の最後に、await
関数を詳しく見ていく段階に進むところで時間になったため、今回はこれぐらいにしておきます。また次回、await
の実装やシグナルについてさらに詳しく見ていければと思います。
今日はこれで勉強会を終わりにします。お疲れ様でした。ありがとうございました。