这里我们会使用这个设计模式开发一个示例,分析F#函数式编程语言中的反馈进度事件,其中部分示例代码来自于F# JAOO Tutorial。
- type AsyncWorker<'T>(jobs: seq
>) = - // This declares an F# event that we can raise
- let jobCompleted = new Event
() - /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.Start(work |> Async.Ignore)
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
- type SynchronizationContext with
- /// A standard helper extension method to raise an event on the GUI thread
- member syncContext.RaiseEvent (event: Event<_>) args =
- syncContext.Post((fun _ -> event.Trigger args),state=null)
- /// A standard helper extension method to capture the current synchronization context.
- /// If none is present, use a context that executes work in the thread pool.
- static member CaptureCurrent () =
- match SynchronizationContext.Current with
- | null -> new SynchronizationContext()
- | ctxt -> ctxt
- 您现在便可以使用这个组件来管理一系列CPU密集型异步任务:
- let rec fib i = if i < 2 then 1 else fib (i-1) + fib (i-2)
- let worker =
- new AsyncWorker<_>( [ for i in 1 .. 100 -> async { return fib (i % 40) } ] )
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result)
- worker.Start()
- job 1 completed with result 1
- job 2 completed with result 2
- ...
- job 39 completed with result 102334155
- job 77 completed with result 39088169
- job 79 completed with result 102334155
- open System.IO
- open System.Net
- open Microsoft.FSharp.Control.WebExtensions
- /// Fetch the contents of a web page, asynchronously.
- let httpAsync(url:string) =
- async { let req = WebRequest.Create(url)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let text = reader.ReadToEnd()
- return text }
- let urls =
- [ "http://www.live.com";
- "http://news.live.com";
- "http://www.yahoo.com";
- "http://news.yahoo.com";
- "http://www.google.com";
- "http://news.google.com"; ]
- let jobs = [ for url in urls -> httpAsync url ]
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.Start()
- job 5 completed with result 8521
- job 6 completed with result 155767
- job 3 completed with result 117778
- job 1 completed with result 16490
- job 4 completed with result 175186
- job 2 completed with result 70362
- open System
- open System.Threading
- open System.IO
- open Microsoft.FSharp.Control.WebExtensions
- type AsyncWorker<'T>(jobs: seq
>) = - // Each of these lines declares an F# event that we can raise
- let allCompleted = new Event<'T[]>()
- let error = new Event
() - let canceled = new Event
() - let jobCompleted = new Event
() - let cancellationCapability = new CancellationTokenSource()
- /// Start an instance of the work
- member x.Start() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // Mark up the jobs with numbers
- let jobsjobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
- let work =
- Async.Parallel
- [ for (job,jobNumber) in jobs ->
- async { let! result = job
- syncContext.RaiseEvent jobCompleted (jobNumber,result)
- return result } ]
- Async.StartWithContinuations
- ( work,
- (fun res -> raiseEventOnGuiThread allCompleted res),
- (fun exn -> raiseEventOnGuiThread error exn),
- (fun exn -> raiseEventOnGuiThread canceled exn ),
- cancellationCapability.Token)
- member x.CancelAsync() =
- cancellationCapability.Cancel()
- /// Raised when a particular job completes
- member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- member x.Error = error.Publish我们可以使用最普通的做法来响应这些额外的事件,例如:
- let worker = new AsyncWorker<_>(jobs)
- worker.JobCompleted.Add(fun (jobNumber, result) ->
- printfn "job %d completed with result %A" jobNumber result.Length)
- worker.AllCompleted.Add(fun results ->
- printfn "all done, results = %A" results )
- worker.Start()
如上,这个监视中异步工作流可以支持任务的取消操作。反馈进度的事件模式可用于相当部分需要全程汇报进度的场景。在下一个示例中,我们使用这个模式来封装后台对于一系列Twitter采样消息的读取操作。运行这个示例需要一个Twitter帐号和密码。在这里只会发起一个事件,如果需要的话您也可以在某些情况下发起更多事件。F# JAOO Tutorial中也包含了这个示例。
- // F# Twitter Feed Sample using F# Async Programming and Event processing
- //
- #r "System.Web.dll"
- #r "System.Windows.Forms.dll"
- #r "System.Xml.dll"
- open System
- open System.Globalization
- open System.IO
- open System.Net
- open System.Web
- open System.Threading
- open Microsoft.FSharp.Control.WebExtensions
- /// A component which listens to tweets in the background and raises an
- /// event each time a tweet is observed
- type TwitterStreamSample(userName:string, password:string) =
- let tweetEvent = new Event<_>()
- let streamSampleUrl = "http://stream.twitter.com/1/statuses/sample.xml?delimited=length"
- /// The cancellation condition
- let mutable group = new CancellationTokenSource()
- /// Start listening to a stream of tweets
- member this.StartListening() =
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- // Capture the synchronization context to allow us to raise events back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- /// The background process
- let listener (syncContext: SynchronizationContext) =
- async { let credentials = NetworkCredential(userName, password)
- let req = WebRequest.Create(streamSampleUrl, Credentials=credentials)
- use! resp = req.AsyncGetResponse()
- use stream = resp.GetResponseStream()
- use reader = new StreamReader(stream)
- let atEnd = reader.EndOfStream
- let rec loop() =
- async {
- let atEnd = reader.EndOfStream
- if not atEnd then
- let sizeLine = reader.ReadLine()
- let size = int sizeLine
- let buffer = Array.zeroCreate size
- let _numRead = reader.ReadBlock(buffer,0,size)
- let text = new System.String(buffer)
- syncContext.RaiseEvent tweetEvent text
- return! loop()
- }
- return! loop() }
- Async.Start(listener, group.Token)
- /// Stop listening to a stream of tweets
- member this.StopListening() =
- group.Cancel();
- group <- new CancellationTokenSource()
- /// Raised when the XML for a tweet arrives
- member this.NewTweet = tweetEvent.Publish在Twitter的标准采样消息流中每出现一条消息便会触发一个事件,并同时提供消息的内容。我们可以这样监听事件流:
- let userName = "..." // set Twitter user name here
- let password = "..." // set Twitter user name here
- let twitterStream = new TwitterStreamSample(userName, password)
- twitterStream.NewTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()
- twitterStream.StopListening()
程序运行后便会不断打印出每条消息的XML数据。您可以从Twitter API页面中来了解采样消息流的使用方式。如果您想同时解析这些消息,以下便是这一工作的示例代码。不过,也请关注Twitter API页面中的指导准则。例如,如果需要构建一个高可靠性的系统,您最好在处理前进行保存,或是使用消息队列。
- #r "System.Xml.dll"
- #r "System.Xml.Linq.dll"
- open System.Xml
- open System.Xml.Linq
- let xn (s:string) = XName.op_Implicit s
- /// The results of the parsed tweet
- type UserStatus =
- { UserName : string
- ProfileImage : string
- Status : string
- StatusDate : DateTime }
- /// Attempt to parse a tweet
- let parseTweet (xml: string) =
- let document = XDocument.Parse xml
- let node = document.Root
- if node.Element(xn "user") <> null then
- Some { UserName = node.Element(xn "user").Element(xn "screen_name").Value;
- ProfileImage = node.Element(xn "user").Element(xn "profile_image_url").Value;
- Status = node.Element(xn "text").Value |> HttpUtility.HtmlDecode;
- StatusDate = node.Element(xn "created_at").Value |> (fun msg ->
- DateTime.ParseExact(msg, "ddd MMM dd HH:mm:ss +0000 yyyy",
- CultureInfo.CurrentCulture)); }
- else
- None基于事件流还可以使用组合式的编程:
- twitterStream.NewTweet
- |> Event.choose parseTweet
- |> Event.add (fun s -> printfn "%A" s)
- twitterStream.StartListening()或是收集统计数据:
- let addToMultiMap key x multiMap =
- let prev = match Map.tryFind key multiMap with None -> [] | Some v -> v
- Map.add x.UserName (x::prev) multiMap
- /// An event which triggers on every 'n' triggers of the input event
- let every n (ev:IEvent<_>) =
- let out = new Event<_>()
- let count = ref 0
- ev.Add (fun arg -> incr count; if !count % n = 0 then out.Trigger arg)
- out.Publish
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display the average of #tweets/user
- |> Event.add (fun s ->
- let avg = s |> Seq.averageBy (fun (KeyValue(_,d)) -> float d.Length)
- printfn "#users = %d, avg tweets = %g" s.Count avg)
- #users = 19, avg tweets = 1.05263
- #users = 39, avg tweets = 1.02564
- #users = 59, avg tweets = 1.01695
- #users = 79, avg tweets = 1.01266
- #users = 99, avg tweets = 1.0101
- #users = 118, avg tweets = 1.01695
- #users = 138, avg tweets = 1.01449
- #users = 158, avg tweets = 1.01266
- #users = 178, avg tweets = 1.01124
- #users = 198, avg tweets = 1.0101
- #users = 218, avg tweets = 1.00917
- #users = 237, avg tweets = 1.01266
- #users = 257, avg tweets = 1.01167
- #users = 277, avg tweets = 1.01083
- #users = 297, avg tweets = 1.0101
- #users = 317, avg tweets = 1.00946
- #users = 337, avg tweets = 1.0089
- #users = 357, avg tweets = 1.0084
- #users = 377, avg tweets = 1.00796
- #users = 396, avg tweets = 1.0101
- #users = 416, avg tweets = 1.00962
- #users = 435, avg tweets = 1.01149
- #users = 455, avg tweets = 1.01099
- #users = 474, avg tweets = 1.01266
- #users = 494, avg tweets = 1.01215
- #users = 514, avg tweets = 1.01167
- #users = 534, avg tweets = 1.01124
- #users = 554, avg tweets = 1.01083
- #users = 574, avg tweets = 1.01045
- #users = 594, avg tweets = 1.0101
- open System.Drawing
- open System.Windows.Forms
- let form = new Form(Visible = true, Text = "A Simple F# Form", TopMost = true, SizeSize = Size(600,600))
- let data = new DataGridView(Dock = DockStyle.Fill, Text = "F# Programming is Fun!",
- Font = new Font("Lucida Console",12.0f),
- ForeColor = Color.DarkBlue)
- form.Controls.Add(data)
- data.DataSource <- [| (10,10,10) |]
- data.Columns.[0].Width <- 200
- data.Columns.[2].Width <- 500
- twitterStream.NewTweet
- |> Event.choose parseTweet
- // Build up the table of tweets indexed by user
- |> Event.scan (fun z x -> addToMultiMap x.UserName x z) Map.empty
- // Take every 20’ˉth index
- |> every 20
- // Listen and display those with more than one tweet
- |> Event.add (fun s ->
- let moreThanOneMessage = s |> Seq.filter (fun (KeyValue(_,d)) -> d.Length > 1)
- data.DataSource <-
- moreThanOneMessage
- |> Seq.map (fun (KeyValue(user,d)) -> (user, d.Length, d.Head.Status))
- |> Seq.filter (fun (_,n,_) -> n > 1)
- |> Seq.sortBy (fun (_,n,_) -> -n)
- |> Seq.toArray)
- /// Raised when a particular job completes
- [
] - member x.JobCompleted = jobCompleted.Publish
- /// Raised when all jobs complete
- [
] - member x.AllCompleted = allCompleted.Publish
- /// Raised when the composition is cancelled successfully
- [
] - member x.Canceled = canceled.Publish
- /// Raised when the composition exhibits an error
- [
] - member x.Error = error.Publish模式的限制
反馈进度的事件模式会有一些假设:并行处理组件的使用者是那些GUI应用程序(如Windows Forms),服务器端应用程序(如ASP.NET)或其他一些能够将事件交由监控方使用场景。我们也可以调整这一模式中发起事件的方式,例如将消息发送给一个MailboxProcessor或简单地记录它们。然而这里还是有一些假设,需要有个主线程或是其他某个监控者来监听这些事件,或是合理的保存它们。
如果您对于.NET 4.0中的IObservable接口较为熟悉,您可能会考虑让TwitterStreamSample类型实现这个接口。然而,对于最终数据源来说,这个做法的好处不大。例如,以后TwitterStreamSample类型可能会需要提供更多种事件,例如在发生错误并自动重建连接时汇报,或是汇报暂停或延迟状况。在这样的场景中,发起.NET事件就够了,部分原因是为了让更多.NET程序员熟悉这个对象。在F#种,所有发布出去的IEvent<_>对象会自动实现IObservable,这样其他人在使用时便可以直接使用Observable组合器。
所有的JavaScript,ASP.NET以及GUI框架的程序员(如Windows Forms)都明白,框架的单线程特性既是优势也是劣势──问题变得简单了(没有数据竞争),但并行和异步编程却变得很困难。在.NET编程中,I/O和繁重的CPU计算必须交由后台线程去处理。上面的设计模式可以同时给您两个世界的优势:您得到了独立的,可以互操作的,通信丰富的后台处理组件,其中包括了对I/O及并行计算的支持,同时还在您的大部分代码中保留了单线程GUI编程的简单性。正如之前表现的那样,这些组件还保持了很高的通用性及可复用性,这使得独立的单元测试也变得非常容易。
