前回までの続き。
- Unixプロセスとリソースの基礎を再確認した - えいのうにっき
- プロセスとの情報のやりとりについて再確認した - えいのうにっき
- プロセスの適切な扱い方を再確認した - えいのうにっき
- Unixプロセスとシグナルの基礎をRubyで再確認した - えいのうにっき
今回は、「パイプ」と「ソケット」を使った複数のプロセス間で情報をやりとりする方法について。「プロセス間通信」(IPC : Inter Process Communication)と呼ばれる分野の話。
- パイプを用いたプロセス間での情報のやりとり
- 「ストリーム」と「メッセージ」、そして「ソケット」
読んでいるのは なるほどUnixプロセス ― Rubyで学ぶUnixの基礎 - 達人出版会 。
パイプを用いたプロセス間での情報のやりとり
プロセス間での情報のやりとりを行うための方法のひとつめ、「パイプ」。「パイプを開く」とは、プロセスの片方の端を、別のプロセスの片方の端に繋ぐことを指す。これにより、パイプを通じてデータを流すことができるようになるが、その方向は単方向に限られる。
Ruby では、以下のようなコードによりパイプを作ることができるらしい。
reader, writer = IO.pipe # => [#<IO:fd 5>, #<IO:fd 6>]
なるほど。パイプをつくると、読み込み用と書き込み用の、ふたつのファイルディスクリプタが得られるわけか。入り口と出口。ちなみに IO.pipe
は pipe(2)
に対応している。
Ruby だと、このふたつのオブジェクトはいずれも IO オブジェクトなので、ファイルとかを扱うときと同じインタフェースを使うことができる。こういう↓かんじで。
reader, writer = IO.pipe writer.write("Into the pipe I go...") writer.close puts reader.read # => Into the pipe I go...
なるほど。ちなみに「パイプを用いた通信は単方向」なので、
reader.write("Trying to get the reader to write something") # => IOError: not opened for writing
双方向となる通信を行おうとするとエラーになる。
もひとつちなみに writer.close
は、「これがパイプを通しての出力の終わりですよ」と reader 側に伝える役割があるそうだ。これをしないと、reader は無限に読み込みを続けようとする、ってことかな。
パイプの概要はわかったけど、このままでは全然おもしろくない。ので、複数のプロセスでパイプを扱ってみる。
reader, writer = IO.pipe fork do # fork ブロックの中なので、ここは子プロセスが行う処理。 # なので、親プロセスと同じファイルディスクリプタを持っている reader.close 10.times do writer.puts "Another one bites the dust" end end writer.close while message = reader.gets('e') $stdout.puts message end
このコードの実行結果は、もちろん Another one bites the dust
が10回表示されるわけだけど、気をつけるべき点が1点ある。コード中のコメントにも書いているけれど、fork
により作った子プロセスは、その時点での親プロセスのもつファイルディスクリプタの複製を持つ。なので、一行目の reader, writer = IO.pipe
で開かれたふたつのファイルディスクリプタももちろんそう。親子間で共有しているイメージ。
ここでは、パイプへ書き出す側は子プロセス・パイプから読み出す側は親プロセスなので、それぞれ使用しない方のディスクリプタを close
している。これは意識して読んでいないと「はっ?」となりそう。
「ストリーム」と「メッセージ」、そして「ソケット」
ここまでパイプを用いて行ってきた通信の方式は「ストリーム」というらしい。「開始と終了の概念を持たないデータの読み書き」が「ストリーム」なんだそうだ(ただ「本書では」という言葉が添えられているので、実際のところはもっと難しい定義なのかもしれない)。
...うーん、close
が"終了"っぽいかんじもしないでもないけど、「開始と終了の概念を持たないデータ」というのは「データの始まりと終わり」について、という意味なのかな。
「ストリーム」と表されるようなデータの流れを、読み込み側はどう読み込むのか。さきほどのコードだと reader.gets
の部分。少々のデータ量ならいちどに全てを読みこんでも大丈夫なのかもしれないけど、常に安全なデータ量とは限らないし、なんらかの単位で区切って読み込む必要がある。
その区切りの目安として、先程のコードであれば「改行」が使われている。というのが、reader.gets
は、区切り文字(デリミタ)が現れるまでのデータをひとかたまりとして、一度に読み込むようになっている。とはいえ、改行がデリミタになっているのはあくまでデフォルトで、reader.gets
に任意の文字を渡せばそれをデリミタとしてくれる(instance method IO#gets (Ruby 2.5.0))、ので読み込む単位を変えることができる。
試しに while message = reader.gets
を while message = reader.gets('e')
に変えて実行してみると、出力結果も以下のように変わる。
Anothe r one bite s the dust Anothe r one ...
うんうん。(dust
のところでも区切られているように見えるけど、ここはたぶん dust\nAnothe
を一つの塊として出力してるだけだと思う)
ここまで見てきたのが「ストリーム」。一方で、「開始と終了の概念を持つデータの読み書き」により通信を行う方法もある。それが「メッセージ」。メッセージの場合、パイプは使えないとのことだが、その代わりに「Unixソケット」なるものを使うらしい。以下がその実装例。
require 'socket' child_socket, parent_socket = Socket.pair(:UNIX, :DGRAM, 0) maxlen = 1000 fork do parent_socket.close 4.times do instruction = child_socket.recv(maxlen) child_socket.send("#{instruction} accomplished!", 0) end end child_socket.close 2.times do parent_socket.send("Heavy lifting", 0) end 2.times do parent_socket.send("Feather lifting", 0) end 4.times do $stdout.puts parent_socket.recv(maxlen) end
child_socket, parent_socket = Socket.pair(:UNIX, :DGRAM, 0)
で作られているのがソケット。socketpair(2)
に対応している。
引数はそれぞれ、ドメイン
タイプ
プロトコル
を指定するためのものっぽいんだけど、どんなドメインやタイプがあって、プロトコルが...というのを知るには、どうやら「ソケット」そのものにダイブする必要がありそうな雰囲気を察した。
例えばドメインは、ここでは :UNIX
とすることで Unix ソケットを作っているけど、:INET
っていう指定もできて、そうればインターネットドメインソケットを作れるみたい。プロトコルは 0
だと TCP になるっぽい。
こんなかんじなので、今回ソケットそのものについては「こういうもん」ってことにして先に進む。
先程のコードに話を戻す。親プロセスの処理では、2回+2回の計4回、ソケットに対して何かを send
し、その後4回、親プロセス用のソケットでの受信を待ち受けている。
一方の子プロセスの処理では、いきなり4回、子プロセス用のソケットでの受信を待ち受けている。で、受け取れたものに文字列を付与して send
しなおしている。
このコードの出力結果は↓こう。
Heavy lifting accomplished! Heavy lifting accomplished! Feather lifting accomplished! Feather lifting accomplished!
このコードでいう parent_socket
は、child_socket
に繋がっている。parent_socket
に送った情報は、child_socket
で受信ができる。パイプと違う点は「逆もしかり」なところで、child_socket
に対して情報を送り、parent_socket
でそれを受け取ることもできる。ソケットはプロセス間での双方向での通信を行うための手段、ということになる。
ストリームとの違いだった「開始と終了の概念を持つデータ」だけど、きっと parent_socket.send("Heavy lifting", 0)
こんなかんじで、送りたいデータ単位での送信ができる、これがメッセージだ、ということなんだろう。データが開始と終了の情報を持っているというよりは「一回の送信」がそのままデータの区切りになる、ってかんじかな。
ここでまたちょっと好奇心がでてきたのでいろいろ試してみる。まずは、親プロセスの最後のループ、4.times do
を 5.times do
に変えてみる。
Heavy lifting accomplished! Heavy lifting accomplished! Feather lifting accomplished! Feather lifting accomplished! Errno::ECONNRESET: Connection reset by peer - recvfrom(2)
なるほど。子プロセスが send
するよりも多く待ち受けてみようと思ったんだけど、5個目を待ち受けようにももう子プロセスは終了していて、child_socket
ももう close されたものしか存在しないからエラーになってしまった...みたいなかんじだろうか。
もうひとつ。子プロセスの中のループにおいて、send
するごとに1秒スリープさせてみた。
4.times do instruction = child_socket.recv(maxlen) child_socket.send("#{instruction} accomplished!", 0) sleep 1 end
こうすると、親プロセスの方での文字列の出力も1秒おきにされるようになった。なるほどこの recv
、呼ぶともう片方からの受信を待ち続けるっぽい。
以上
正直言って今回のサンプルコードはあんまし面白くなかったんだけど、「これを使ってどんなことができるか?」っていう観点では、特にソケットは面白そうだなと思った。たとえばチャットとか。
でも今回のはあくまでプロセス間での通信だから、この仕組みでチャットをやろうと思ったらユーザーごとにプロセスを持たなくちゃいけなくなる(よね?)。ハイそこで WebSocket!ってな感じだろうか。
WebSocket、その言葉が出てきたくらいのタイミングで、その名前とそれを使えばどんなことができるのかについては少し知っていたんだけど、「ソケット」というもの自体にちゃんと向き合ったのは今回のこれまでしてこなかったな。よくない。
僕にはまだまだこういうところが多いと思うので、引き続きやっていこう。