読者です 読者をやめる 読者になる 読者になる

ruby の並行・並列ってどんな感じなのかな、とか。

ruby

 ユーザーストリームに接続して、タイムラインに流れる呟きを逐次取得するような関数 each_post() を作りました。中の実装はともかくとして↓こんな感じで使います。

each_post() {|st|
  if st['text'] then
    puts st['text'] # 呟きを表示
  end
}

 で、これを使って ruby の並列・並行処理*1とか、ちょっと触ってみようかなと。というわけで「5 秒に一度、直近の 5 秒間に発生した呟きを表示する」プログラムを書いていました。

1. スレッド
 まぁとにかく最初は標準の Thread を使ってみましょということで。
 スレッド 2 本作ります。ひとつは呟き収集スレッド、もうひとつは呟き表示スレッドです。
 

require 'thread'

q = Queue.new

collect_tw_t = Thread.new do
  each_post() {|st|
     if st['text'] then
       q.push( st['text'] )
    end
  }	
end

puts_tw_t = Thread.new do
  loop do
    sleep 5
    while ! q.empty? do
      puts q.pop()
    end
  end
end

collect_tw_t.join
puts_tw_t.join

 実行すると、5 秒に一度呟きがダンプされます。Queue がそのままグローバルに置いてあるとか、何だかナマイので嫌ですね。ここは mutex を使って Queue を安全に扱うべきですが、まぁ今回は省略。

2. Revactor
 試してみたんですが、うまく動いてくれませんでした。
 ↓ここのサイトでも苦戦した跡が書かれていますね。
 『Ruby: Revactor』hisak71の日記
 公式サイトは↓ここかな。
 『Revactor』

3. Celluloid
 日本語の記事は少ないですが、英語の記事はたくさん見つかりますね。「1. スレッド」で作ったプログラムは呟きを保持するキューがむき出しのままグローバル空間にありました。あまり嬉しくないので、これを Celluloid を Mix-in したクラスで包みます。

 ↑この Cellluloid の部分、いわゆる Actor ですね。

require 'thread'
require 'celluloid'

class Tweets_a
  include Celluloid

  def initialize()
    @q = Queue.new
  end

  def add(tw)
    @q.push(tw)
  end

  def flush()
    while ! @q.empty? do
      puts @q.pop()
    end
  end
end

Celluloid::Actor[:tweets] = Tweets_a.new

collect_tw_t = Thread.new do
  each_post() {|st|
    # 呟きを Actor に通知
    if st['text'] then
      Celluloid::Actor[:tweets].add!(st['text'])
    end
  }	
end

puts_tw_t = Thread.new do
  # 5 秒に一度、「呟きを表示してね」シグナルを送る。
  loop do
    sleep 5
    Celluloid::Actor[:tweets].flush!()
  end
end

collect_tw_t.join
puts_tw_t.join

 「1. スレッド」のときと比較してデータへのアクセスが安全になるうえ、Thread の do の中がシンプルになりました。
 Celluloid の Github は↓こちら。
 『celluloid / celluloid』github

4. その他の選択肢
 今回試したもの以外にもいくつか選択肢があるようです。例えば Rubinius には Actors ライブラリがあります。Rubinius は高速に動作する Ruby 実装だそうですけども、今回は試していません。あと、Actor といえば Scala ですけども、Akka の Actor がよく知られていますね。JRuby を利用すれば Akka の Actor の恩恵に預かれるそうです。これも今回は試していません。あと、並列・並行という文脈でいうと Ruby 1.9 から搭載された Fiber を使ったライブラリ NeverBlock というものがあります。これはノンブロッキング I/O を実現するためのもののようです。

5. 雑感
 いろいろな選択肢があるので、まぁ適材適所という状況は他の言語と同じですね。僕は Celluloid があればたいてい潰しが効くような気がするのでまぁ現状で満足です。
 今回、僕は「Revactor, Celluloid, Rubinius Actors というものがある」ということを gihyo.jp の RubyKaigi2011 まとめで知りました。最後に貼っておきますね。
 『RubyKaigi2011 スペシャルレポート』 gihyo.jp

*1:並列と並行の違いを正しく理解していないので本稿で両方書いてます。