えいのうにっき

a-knowの日記です

パイプとソケットでのプロセス間通信を Ruby で再確認した

前回までの続き。

今回は、「パイプ」と「ソケット」を使った複数のプロセス間で情報をやりとりする方法について。「プロセス間通信」(IPC : Inter Process Communication)と呼ばれる分野の話。

  • パイプを用いたプロセス間での情報のやりとり
  • 「ストリーム」と「メッセージ」、そして「ソケット」

読んでいるのは なるほどUnixプロセス ― Rubyで学ぶUnixの基礎 - 達人出版会

tatsu-zine.com

パイプを用いたプロセス間での情報のやりとり

プロセス間での情報のやりとりを行うための方法のひとつめ、「パイプ」。「パイプを開く」とは、プロセスの片方の端を、別のプロセスの片方の端に繋ぐことを指す。これにより、パイプを通じてデータを流すことができるようになるが、その方向は単方向に限られる

Ruby では、以下のようなコードによりパイプを作ることができるらしい。

reader, writer = IO.pipe
# => [#<IO:fd 5>, #<IO:fd 6>]

なるほど。パイプをつくると、読み込み用と書き込み用の、ふたつのファイルディスクリプタが得られるわけか。入り口と出口。ちなみに IO.pipepipe(2) に対応している。

Ruby だと、このふたつのオブジェクトはいずれも IO オブジェクトなので、ファイルとかを扱うときと同じインタフェースを使うことができる。こういう↓かんじで。

reader, writer = IO.pipe
writer.write("Into the pipe I go...")
writer.close
puts reader.read
# => Into the pipe I go...

なるほど。ちなみに「パイプを用いた通信は単方向」なので、

reader.write("Trying to get the reader to write something")
# => IOError: not opened for writing

双方向となる通信を行おうとするとエラーになる。

もひとつちなみに writer.close は、「これがパイプを通しての出力の終わりですよ」と reader 側に伝える役割があるそうだ。これをしないと、reader は無限に読み込みを続けようとする、ってことかな。

パイプの概要はわかったけど、このままでは全然おもしろくない。ので、複数のプロセスでパイプを扱ってみる。

reader, writer = IO.pipe

fork do
  # fork ブロックの中なので、ここは子プロセスが行う処理。
  # なので、親プロセスと同じファイルディスクリプタを持っている
  reader.close
  
  10.times do
    writer.puts "Another one bites the dust"
  end
end

writer.close
while message = reader.gets('e')
  $stdout.puts message
end

このコードの実行結果は、もちろん Another one bites the dust が10回表示されるわけだけど、気をつけるべき点が1点ある。コード中のコメントにも書いているけれど、fork により作った子プロセスは、その時点での親プロセスのもつファイルディスクリプタの複製を持つ。なので、一行目の reader, writer = IO.pipe で開かれたふたつのファイルディスクリプタももちろんそう。親子間で共有しているイメージ。

ここでは、パイプへ書き出す側は子プロセス・パイプから読み出す側は親プロセスなので、それぞれ使用しない方のディスクリプタclose している。これは意識して読んでいないと「はっ?」となりそう。

「ストリーム」と「メッセージ」、そして「ソケット」

ここまでパイプを用いて行ってきた通信の方式は「ストリーム」というらしい。「開始と終了の概念を持たないデータの読み書き」が「ストリーム」なんだそうだ(ただ「本書では」という言葉が添えられているので、実際のところはもっと難しい定義なのかもしれない)。

...うーん、close が"終了"っぽいかんじもしないでもないけど、「開始と終了の概念を持たないデータ」というのは「データの始まりと終わり」について、という意味なのかな。

「ストリーム」と表されるようなデータの流れを、読み込み側はどう読み込むのか。さきほどのコードだと reader.gets の部分。少々のデータ量ならいちどに全てを読みこんでも大丈夫なのかもしれないけど、常に安全なデータ量とは限らないし、なんらかの単位で区切って読み込む必要がある。

その区切りの目安として、先程のコードであれば「改行」が使われている。というのが、reader.gets は、区切り文字(デリミタ)が現れるまでのデータをひとかたまりとして、一度に読み込むようになっている。とはいえ、改行がデリミタになっているのはあくまでデフォルトで、reader.gets に任意の文字を渡せばそれをデリミタとしてくれる(instance method IO#gets (Ruby 2.5.0))、ので読み込む単位を変えることができる。

試しに while message = reader.getswhile message = reader.gets('e') に変えて実行してみると、出力結果も以下のように変わる。

Anothe
r one
 bite
s the
 dust
Anothe
r one
...

うんうん。(dust のところでも区切られているように見えるけど、ここはたぶん dust\nAnothe を一つの塊として出力してるだけだと思う)

ここまで見てきたのが「ストリーム」。一方で、「開始と終了の概念を持つデータの読み書き」により通信を行う方法もある。それが「メッセージ」。メッセージの場合、パイプは使えないとのことだが、その代わりに「Unixソケット」なるものを使うらしい。以下がその実装例。

require 'socket'

child_socket, parent_socket = Socket.pair(:UNIX, :DGRAM, 0)
maxlen = 1000

fork do
  parent_socket.close
  4.times do
    instruction = child_socket.recv(maxlen)
    child_socket.send("#{instruction} accomplished!", 0)
  end
end

child_socket.close

2.times do
  parent_socket.send("Heavy lifting", 0)
end
2.times do
  parent_socket.send("Feather lifting", 0)
end

4.times do
  $stdout.puts parent_socket.recv(maxlen)
end

child_socket, parent_socket = Socket.pair(:UNIX, :DGRAM, 0) で作られているのがソケット。socketpair(2) に対応している。

引数はそれぞれ、ドメイン タイプ プロトコル を指定するためのものっぽいんだけど、どんなドメインやタイプがあって、プロトコルが...というのを知るには、どうやら「ソケット」そのものにダイブする必要がありそうな雰囲気を察した。

例えばドメインは、ここでは :UNIX とすることで Unix ソケットを作っているけど、:INET っていう指定もできて、そうればインターネットドメインソケットを作れるみたい。プロトコル0 だと TCP になるっぽい。

こんなかんじなので、今回ソケットそのものについては「こういうもん」ってことにして先に進む。

先程のコードに話を戻す。親プロセスの処理では、2回+2回の計4回、ソケットに対して何かを send し、その後4回、親プロセス用のソケットでの受信を待ち受けている。

一方の子プロセスの処理では、いきなり4回、子プロセス用のソケットでの受信を待ち受けている。で、受け取れたものに文字列を付与して send しなおしている。

このコードの出力結果は↓こう。

Heavy lifting accomplished!
Heavy lifting accomplished!
Feather lifting accomplished!
Feather lifting accomplished!

このコードでいう parent_socket は、child_socket に繋がっている。parent_socket に送った情報は、child_socket で受信ができる。パイプと違う点は「逆もしかり」なところで、child_socket に対して情報を送り、parent_socket でそれを受け取ることもできる。ソケットはプロセス間での双方向での通信を行うための手段、ということになる。

ストリームとの違いだった「開始と終了の概念を持つデータ」だけど、きっと parent_socket.send("Heavy lifting", 0) こんなかんじで、送りたいデータ単位での送信ができる、これがメッセージだ、ということなんだろう。データが開始と終了の情報を持っているというよりは「一回の送信」がそのままデータの区切りになる、ってかんじかな。

ここでまたちょっと好奇心がでてきたのでいろいろ試してみる。まずは、親プロセスの最後のループ、4.times do5.times do に変えてみる。

Heavy lifting accomplished!
Heavy lifting accomplished!
Feather lifting accomplished!
Feather lifting accomplished!
Errno::ECONNRESET: Connection reset by peer - recvfrom(2)

なるほど。子プロセスが send するよりも多く待ち受けてみようと思ったんだけど、5個目を待ち受けようにももう子プロセスは終了していて、child_socket ももう close されたものしか存在しないからエラーになってしまった...みたいなかんじだろうか。

もうひとつ。子プロセスの中のループにおいて、send するごとに1秒スリープさせてみた。

4.times do
  instruction = child_socket.recv(maxlen)
  child_socket.send("#{instruction} accomplished!", 0)
  sleep 1
end

こうすると、親プロセスの方での文字列の出力も1秒おきにされるようになった。なるほどこの recv 、呼ぶともう片方からの受信を待ち続けるっぽい。

以上

正直言って今回のサンプルコードはあんまし面白くなかったんだけど、「これを使ってどんなことができるか?」っていう観点では、特にソケットは面白そうだなと思った。たとえばチャットとか。

でも今回のはあくまでプロセス間での通信だから、この仕組みでチャットをやろうと思ったらユーザーごとにプロセスを持たなくちゃいけなくなる(よね?)。ハイそこで WebSocket!ってな感じだろうか。

WebSocket、その言葉が出てきたくらいのタイミングで、その名前とそれを使えばどんなことができるのかについては少し知っていたんだけど、「ソケット」というもの自体にちゃんと向き合ったのは今回のこれまでしてこなかったな。よくない。

僕にはまだまだこういうところが多いと思うので、引き続きやっていこう。

チラ見してみたページとか



follow us in feedly