Ruby Fiber指南(三)過(guò)濾器
Posted on 2010-03-11 23:49 dennis 閱讀(4538) 評(píng)論(1) 編輯 收藏 所屬分類: 動(dòng)態(tài)語(yǔ)言 、my open-sourceRuby Fiber指南(一)基礎(chǔ)
Ruby Fiber指南(二)參數(shù)傳遞
Ruby Fiber指南(三)過(guò)濾器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)實(shí)現(xiàn)Actor
在學(xué)習(xí)了Fiber的基礎(chǔ)知識(shí)之后,可以嘗試用Fiber去做一些比較有趣的事情。這一節(jié)將講述如何使用Fiber來(lái)實(shí)現(xiàn)類似unix系統(tǒng)中的管道功能。在unix系統(tǒng)中,可以通過(guò)管道將多個(gè)命令組合起來(lái)做一些強(qiáng)大的功能,最常用的例如查找所有的java進(jìn)程:
ps aux|grep java
通過(guò)組合ps和grep命令來(lái)實(shí)現(xiàn),ps的輸出作為grep的輸入,如果有更多的命令就形成了一條過(guò)濾鏈。過(guò)濾器本質(zhì)上還是生產(chǎn)者和消費(fèi)者模型,前一個(gè)過(guò)濾器產(chǎn)生結(jié)果,后一個(gè)過(guò)濾器消費(fèi)這個(gè)結(jié)果并產(chǎn)生新的結(jié)果給下一個(gè)過(guò)濾器消費(fèi)。因此我們就從最簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)說(shuō)起。我們要展示的這個(gè)例子場(chǎng)景是這樣:生產(chǎn)者從標(biāo)準(zhǔn)輸入讀入用戶輸入并發(fā)送給消費(fèi)者,消費(fèi)者打印這個(gè)輸入,整個(gè)程序是由消費(fèi)者驅(qū)動(dòng)的,消費(fèi)者喚醒生存者去讀用戶輸入,生產(chǎn)者讀到輸入后讓出執(zhí)行權(quán)給消費(fèi)者去打印,整個(gè)過(guò)程通過(guò)生產(chǎn)者和消費(fèi)者的協(xié)作完成。
生產(chǎn)者發(fā)送是通過(guò)yield返回用戶輸入給消費(fèi)者(還記的上一節(jié)嗎?):
def send(x)
Fiber.yield(x)
end
而消費(fèi)者的接收則是通過(guò)喚醒生產(chǎn)者去生產(chǎn):
def receive(prod)
prod.resume
end
prod.resume
end
生產(chǎn)者是一個(gè)Fiber,它的任務(wù)就是等待用戶輸入并發(fā)送結(jié)果給消費(fèi)者:
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
消費(fèi)者負(fù)責(zé)驅(qū)動(dòng)生產(chǎn)者,并且在接收到結(jié)果的時(shí)候打印,消費(fèi)者是root fiber:
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
最終的調(diào)用如下:
consumer(producer())
完整的程序如下:#生產(chǎn)者消費(fèi)者
require 'fiber'
def send(x)
Fiber.yield(x)
end
def receive(prod)
prod.resume
end
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
if $0==__FILE__
consumer(producer())
end
require 'fiber'
def send(x)
Fiber.yield(x)
end
def receive(prod)
prod.resume
end
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
if $0==__FILE__
consumer(producer())
end
讀者可以嘗試在ruby1.9下運(yùn)行這個(gè)程序,每次程序都由消費(fèi)者驅(qū)動(dòng)生產(chǎn)者去等待用戶輸入,用戶輸入任何東西之后回車,生產(chǎn)者開始運(yùn)行并將讀到的結(jié)果發(fā)送給消費(fèi)者并讓出執(zhí)行權(quán)(通過(guò)yield),消費(fèi)者在接收到y(tǒng)ield返回的結(jié)果后打印這個(gè)結(jié)果,因此整個(gè)交互過(guò)程是一個(gè)echo的例子。
最終的調(diào)用consumer(producer())已經(jīng)有過(guò)濾器的影子在了,如果我們希望在producer和consumer之間插入其他過(guò)程對(duì)用戶的輸入做處理,也就是安插過(guò)濾器,那么新的過(guò)濾器也將作為fiber存在,新的fiber消費(fèi)producer的輸出,并輸出新的結(jié)果給消費(fèi)者,例如我們希望將用戶的輸入結(jié)果加上行號(hào)再打印,那么就插入一個(gè)稱為filter的fiber:
def filter(prod)
return Fiber.new do
line=1
while true
value=receive(prod)
value=sprintf("%5d %s",line,value)
send(value)
line=line.succ
end
end
end
return Fiber.new do
line=1
while true
value=receive(prod)
value=sprintf("%5d %s",line,value)
send(value)
line=line.succ
end
end
end
最終組合的調(diào)用如下:
consumer(filter(producer()))
類似unix系統(tǒng)那樣,簡(jiǎn)單的加入新的fiber組合起來(lái)就可以為打印結(jié)果添加行號(hào)。類似consumer(filter(producer()))的調(diào)用方式盡管已經(jīng)很直觀,但是我們還是希望能像unix系統(tǒng)那樣調(diào)用,也就是通過(guò)豎線作為管道操作符:
producer | filter | consumer
這樣的調(diào)用方式更將透明直觀,清楚地表明整個(gè)過(guò)濾器鏈的運(yùn)行過(guò)程。幸運(yùn)的是在Ruby中支持對(duì)|方法符的重載,因此要實(shí)現(xiàn)這樣的操作符并非難事,只要對(duì)Fiber做一層封裝即可,下面給出的代碼來(lái)自《Programming ruby》的作者Dave Thomas的blog:class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
@source.resume
end
def output(value)
Fiber.yield(value)
end
end
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
@source.resume
end
def output(value)
Fiber.yield(value)
end
end
這段代碼非常巧妙,將Fiber和Ruby的功能展示的淋漓盡致。大致解說(shuō)下,PipelineElement作為任何一個(gè)過(guò)濾器的父類,其中封裝了一個(gè)fiber,這個(gè)fiber默認(rèn)執(zhí)行process,在process方法中可以看到上面生產(chǎn)者和消費(fèi)者例子的影子,input類似receive方法調(diào)用前置過(guò)濾器(source),output則將本過(guò)濾器處理的結(jié)果作為參數(shù)傳遞給yield并讓出執(zhí)行權(quán),讓這個(gè)過(guò)濾器的調(diào)用者(也就是后續(xù)過(guò)濾器)得到結(jié)果并繼續(xù)處理。PipelineElement實(shí)現(xiàn)了“|”方法,用于組合過(guò)濾器,將下一個(gè)過(guò)濾器的前置過(guò)濾器設(shè)置為本過(guò)濾器,并返回下一個(gè)過(guò)濾器。整個(gè)過(guò)濾鏈的驅(qū)動(dòng)者是最后一個(gè)過(guò)濾器。
有了這個(gè)封裝,那么上面生產(chǎn)者消費(fèi)者的例子可以改寫為:
class Producer < PipelineElement
def process
while true
value=readline.chomp
handle_value(value)
end
end
end
class Filter < PipelineElement
def initialize
@line=1
super()
end
def handle_value(value)
value=sprintf("%5d %s",@line,value)
output(value)
@line=@line.succ
end
end
class Consumer < PipelineElement
def handle_value(value)
puts value
end
end
def process
while true
value=readline.chomp
handle_value(value)
end
end
end
class Filter < PipelineElement
def initialize
@line=1
super()
end
def handle_value(value)
value=sprintf("%5d %s",@line,value)
output(value)
@line=@line.succ
end
end
class Consumer < PipelineElement
def handle_value(value)
puts value
end
end
現(xiàn)在的調(diào)用方式可以跟unix的管道很像了:
producer=Producer.new
filter=Filter.new
consumer=Consumer.new
pipeline = producer | filter | consumer
pipeline.resume
如果你打印pipeline對(duì)象,你將看到一條清晰的過(guò)濾鏈,filter=Filter.new
consumer=Consumer.new
pipeline = producer | filter | consumer
pipeline.resume
#<Consumer:0x8f08bf4 @fiber_delegate=#<Fiber:0x8f08a88>, @source=#<Filter:0x8f08db4 @line=1, @fiber_delegate=#<Fiber:0x8f08d60>, @source=#<Producer:0x8f09054 @fiber_delegate=#<Fiber:0x8f09038>>>>