diff --git a/src/FSharp.Control.Reactive.Testing/Marbles.fs b/src/FSharp.Control.Reactive.Testing/Marbles.fs index ba3e80e..be4e92f 100644 --- a/src/FSharp.Control.Reactive.Testing/Marbles.fs +++ b/src/FSharp.Control.Reactive.Testing/Marbles.fs @@ -2,7 +2,7 @@ /// Reactive Marbles representation to make working with recorded test notifications like visualising the notifications as marbles. module Marbles = - + open System open Microsoft.Reactive.Testing open FSharp.Control.Reactive.Testing @@ -20,8 +20,8 @@ module Marbles = | Error (t, ex) -> Error (f t, ex) | Done t -> Done (f t) - let private toNotifications xs = - List.fold (fun (time, ms) x -> + let private toNotifications xs = + List.fold (fun (time, ms) x -> match x with | Next (t, v) -> time + t, onNext (time + t) v | Error (t, ex) -> time + t, onError (time + t) ex @@ -31,53 +31,53 @@ module Marbles = |> List.rev type private Temperature = Hot | Cold - + let private frameMultiplier = 10L - - let private frsToTime xs = + + let private frsToTime xs = List.map (fun _ -> frameMultiplier) xs |> List.fold (+) 0L - + let private pframes = many (pchar '-') - + let private parseMarbles temp txt = let txt = txt |> String.filter (fun c -> c <> ' ') let pvalueOne = noneOf [ '#'; '^'; '|'; '('; ')' ] - let pvalueMultiple = - between - (pchar '(') - (pchar ')') + let pvalueMultiple = + between + (pchar '(') + (pchar ')') (many1 pvalueOne |>> (Array.ofList >> String)) let pvaluewrap = (pvalueOne |>> string) <|> pvalueMultiple - let pframevalue = + let pframevalue = pframes .>>. pvaluewrap |>> fun (frs, x) -> Next (frsToTime frs, x) - let pframeError = - pframes .>> pchar '#' + let pframeError = + pframes .>> pchar '#' |>> fun frs -> Error (frsToTime frs, exn "error") - let pemit = + let pemit = attempt pframevalue <|> attempt pframeError - - let pdone = - attempt (pframes .>> pchar '|' |>> (frsToTime >> Some)) + + let pdone = + attempt (pframes .>> pchar '|' |>> (frsToTime >> Some)) <|> (pframes |>> fun _ -> None) - let pemitsdone = - many pemit .>>. pdone - |>> fun (cs, c) -> - match c with + let pemitsdone = + many pemit .>>. pdone + |>> fun (cs, c) -> + match c with | Some x -> cs @ [Done x] | None -> cs - let psub = - pframes .>> pchar '^' + let psub = + pframes .>> pchar '^' |>> (frsToTime >> (+) frameMultiplier) - + let pemitssub = (many pemit) .>> psub .>>. pemitsdone |>> fun (xs, ys) -> @@ -94,7 +94,7 @@ module Marbles = let private parseMarblesAsSubscription txt = let txt = txt |> String.filter (fun c -> c <> ' ') - let psub = pframes .>> pchar '^' |>> frsToTime + let psub = pframes .>> pchar '^' |>> frsToTime let punsub = pframes .>> pchar '!' |>> (frsToTime >> (+) frameMultiplier) let psubscriptions = psub .>>. punsub |>> fun (s, u) -> Subscription (s, s + u) @@ -104,7 +104,7 @@ module Marbles = /// Creates from the given marble representation a Cold Observable /// using the specified `TestScheduler`. - /// Note that the marble representation can't have a subscription marker (`^`), + /// Note that the marble representation can't have a subscription marker (`^`), /// only Hot Observables can have these. /// /// ## Parameters @@ -125,7 +125,7 @@ module Marbles = parseMarbles Cold txt /// Verifies that the given marble representation is indeed the same as the observed messages found in the given test observer. - let expectMessages sch txt obs = + let expectMessages sch txt obs = TestSchedule.subscribeTestObserverStart sch obs |> TestObserver.messages =! (parseMarbles Cold txt) @@ -136,9 +136,9 @@ module Marbles = let subscriptions = List.map parseMarblesAsSubscription /// Verifies that the given marble representation is indeed the same as the subscription found in the given test observable sequence. - let expectSubscription txt xs = + let expectSubscription txt xs = subscription txt |> (=!) xs /// Verifies that the given marble representation is indeed the same as the subscriptions found in the given test observable sequence. - let expectSubscriptions txt xs = + let expectSubscriptions txt xs = subscriptions txt |> (=!) xs \ No newline at end of file diff --git a/src/FSharp.Control.Reactive.Testing/TestNotifications.fs b/src/FSharp.Control.Reactive.Testing/TestNotifications.fs index 8553bbe..1494cea 100644 --- a/src/FSharp.Control.Reactive.Testing/TestNotifications.fs +++ b/src/FSharp.Control.Reactive.Testing/TestNotifications.fs @@ -9,10 +9,10 @@ type TestNotifications<'a> = TestNotifications of TestNotification<'a> list /// The Reactive module provides operators for working with TestObserver<_> in F#. [] -module TestNotification = +module TestNotification = open System - let wrap = TestNotifications + let wrap = TestNotifications /// Unwraps the 'TestNotifications' pattern into a 'TestNotification list'. let unwrap (TestNotifications xs) = xs @@ -37,9 +37,9 @@ module TestNotification = /// Converts the incoming TestNotification to an Result with: /// 'OnNext()' notifications as Some and 'OnError()' and 'OnCompleted()' as Error. - let toResult = function - | Next x -> Result.Ok x - | Error ex -> Result.Error ex + let toResult = function + | Next x -> Result.Ok x + | Error ex -> Result.Error ex | _ -> Result.Error (exn "'OnCompleted'") /// Filters the incoming TestNotifications on a given predicate function. @@ -61,7 +61,7 @@ module TestNotification = let onCompleted t = ReactiveTest.OnCompleted t /// Filters the OnNexts recorded notifiation values of the specified list. - let nexts (TestNotifications xs) = + let nexts (TestNotifications xs) = List.choose (function | Next x -> Some x | _ -> None) xs /// Maps the OnNext recorded notification values of the specified list to other values. @@ -80,12 +80,12 @@ open TestNotification type GenTestNotification = static member GenNotifications<'a> () = - let nexts_errors = - Gen.frequency [ + let nexts_errors = + Gen.frequency [ (1, Gen.constant (fun t _ -> onError t (new System.Exception ()))) - (2, Gen.constant onNext) ] + (2, Gen.constant onNext) ] |> Gen.listOf - + let emits = Gen.constant (fun h t -> h :: t) <*> Gen.constant (fun t _ -> onCompleted t) @@ -93,17 +93,17 @@ type GenTestNotification = |> Gen.map List.rev let realisticMs = 100L - let growingNumbers l = + let growingNumbers l = Arb.generate |> Gen.map (fun (x, y) -> (abs x) + realisticMs, y) |> Gen.listOfLength l |> Gen.map (List.sortBy fst) - let zipGrowingNumbers es = + let zipGrowingNumbers es = growingNumbers (List.length es) |> Gen.map (List.zip es) - - emits + + emits >>= zipGrowingNumbers |> Gen.map (List.map (fun (f, (x, y)) -> f x y) >> TestNotifications) |> Arb.fromGen diff --git a/src/FSharp.Control.Reactive.Testing/TestObserver.fs b/src/FSharp.Control.Reactive.Testing/TestObserver.fs index 3c7fa48..8d68fc2 100644 --- a/src/FSharp.Control.Reactive.Testing/TestObserver.fs +++ b/src/FSharp.Control.Reactive.Testing/TestObserver.fs @@ -7,7 +7,7 @@ open Microsoft.Reactive.Testing /// Gets recorded timestamped notification messages received by the observer. [] -let all (o : ITestableObserver<'a>) = +let all (o : ITestableObserver<'a>) = o.Messages |> Seq.toList |> TestNotifications /// Gets recorded timestamped notification messages received by the observer. @@ -15,7 +15,7 @@ let messages (o : ITestableObserver<'a>) = o.Messages |> Seq.toList |> TestNotifications /// Gets recorded timestamped "OnNext" notification messages received by the observer. -let nexts (o : ITestableObserver<'a>) = +let nexts (o : ITestableObserver<'a>) = o.Messages |> Seq.toList |> TestNotifications |> TestNotification.nexts /// Gets recorded timestamped "OnError" notification messages received by the observer. diff --git a/src/FSharp.Control.Reactive.Testing/TestSchedule.fs b/src/FSharp.Control.Reactive.Testing/TestSchedule.fs index 985dd0b..8b94e5b 100644 --- a/src/FSharp.Control.Reactive.Testing/TestSchedule.fs +++ b/src/FSharp.Control.Reactive.Testing/TestSchedule.fs @@ -7,21 +7,21 @@ open FSharp.Control.Reactive open Microsoft.Reactive.Testing /// Virtual time scheduler used for testing applications and libraries built using Reactive Extensions. -let usage f = +let usage f = f <| TestScheduler () - + /// Creates a cold observable using the specified timestamped notification messages. let coldObservable (s : TestScheduler) (TestNotifications ms) = s.CreateColdObservable<'a> (ms |> List.toArray) /// Creates a hot observable using the specified timestamped notification messages. -let hotObservable (s : TestScheduler) (TestNotifications ms) = +let hotObservable (s : TestScheduler) (TestNotifications ms) = s.CreateHotObservable (ms |> List.toArray) - + /// Advances the scheduler's clock by the specified relative time, running all work scheduled for that timespan. let advanceBy (s : TestScheduler) time = s.AdvanceBy time; s - + /// Advances the scheduler's clock to the specified time, running all work till that point. let advanceTo (s : TestScheduler) time = s.AdvanceTo time; s diff --git a/src/FSharp.Control.Reactive/Observable.fs b/src/FSharp.Control.Reactive/Observable.fs index b4d1598..33d691f 100644 --- a/src/FSharp.Control.Reactive/Observable.fs +++ b/src/FSharp.Control.Reactive/Observable.fs @@ -79,7 +79,7 @@ module Builders = [] member __.Nth (s:IObservable<'a>, index ) = s.ElementAt(index) [] - member inline __.SumBy (s:IObservable<_>,[] valueSelector : _ -> _) = s.Select(valueSelector).Aggregate(Unchecked.defaultof<_>, new Func<_,_,_>( fun a b -> a + b)) + member inline __.SumBy (s:IObservable<_>,[] valueSelector : _ -> _) = s.Select(valueSelector).Aggregate(Unchecked.defaultof<_>, new Func<_,_,_>( fun a b -> a + b)) [] member __.GroupBy (s:IObservable<_>,[] keySelector : _ -> _) = s.GroupBy(new Func<_,_>(keySelector)) [] @@ -126,7 +126,7 @@ module Observable = type Observable with /// Creates an observable sequence from the specified Subscribe method implementation. static member Create (subscribe: IObserver<'T> -> unit -> unit) = - let subscribe o = + let subscribe o = let m = subscribe o Action(m) Observable.Create(subscribe) @@ -134,7 +134,7 @@ module Observable = /// Creates an observable sequence from the specified asynchronous Subscribe method implementation. static member CreateAsync (subscribe: IObserver<'T> -> Async) = Observable.Create(subscribe >> Async.StartAsTask) - + type IObservable<'T> with /// Subscribes to the Observable with just a next-function. member this.Subscribe(onNext: 'T -> unit) = @@ -143,7 +143,7 @@ module Observable = /// Subscribes to the Observable with a next and an error-function. member this.Subscribe(onNext: 'T -> unit, onError: exn -> unit) = this.Subscribe(Action<_> onNext, Action onError) - + /// Subscribes to the Observable with a next and a completion callback. member this.Subscribe(onNext: 'T -> unit, onCompleted: unit -> unit) = this.Subscribe(Action<_> onNext, Action onCompleted) @@ -158,9 +158,9 @@ module Observable = ***************************************************************) - /// Applies an accumulator function over an observable sequence, returning the + /// Applies an accumulator function over an observable sequence, returning the /// result of the aggregation as a single element in the result sequence - let aggregate accumulator source = + let aggregate accumulator source = Observable.Aggregate(source, Func<_,_,_> accumulator ) /// Determines whether all elements of an observable satisfy a predicate @@ -181,11 +181,11 @@ module Observable = /// Determines whether an observable sequence contains any elements - let any (source:IObservable<'Source>) : IObservable = + let any (source:IObservable<'Source>) : IObservable = Observable.Any(source) - /// Hides the identy of an observable sequence + /// Hides the identy of an observable sequence let asObservable source : IObservable<'Source>= Observable.AsObservable( source ) @@ -196,77 +196,77 @@ module Observable = /// Lifts the values of f and m and applies f to m, returning an IObservable of the result. let apply f m = f |> bind (fun f' -> m |> bind (fun m' -> Observable.Return(f' m'))) - + /// Matches when both observable sequences have an available value - let both second first = Observable.And(first, second) + let both second first = Observable.And(first, second) // #region Buffers - let buffer (bufferClosingSelector:IObservable<'BufferClosing>) source = + let buffer (bufferClosingSelector:IObservable<'BufferClosing>) source = Observable.Buffer(source, bufferClosingSelector) - - /// Projects each element of an observable sequence into + + /// Projects each element of an observable sequence into /// consequtive non-overlapping buffers based on a sequence of boundary markers - let bufferBounded (boundaries:IObservable<'BufferClosing>) source : IObservable>= + let bufferBounded (boundaries:IObservable<'BufferClosing>) source : IObservable>= Observable.Buffer(source, boundaries) - /// Projects each element of an observable sequence into + /// Projects each element of an observable sequence into /// consequtive non-overlapping buffers produced based on count information - let bufferCount (count:int) source = + let bufferCount (count:int) source = Observable.Buffer(source, count) /// Projects each element of an observable sequence into zero or more buffers /// which are produced based on element count information - let bufferCountSkip (count:int) (skip:int) source = + let bufferCountSkip (count:int) (skip:int) source = Observable.Buffer(source,count, skip) - /// Projects each element of an observable sequence into + /// Projects each element of an observable sequence into /// consequtive non-overlapping buffers produced based on timing information - let bufferSpan (timeSpan:TimeSpan) source = + let bufferSpan (timeSpan:TimeSpan) source = Observable.Buffer(source, timeSpan) - - /// Projects each element of an observable sequence into consecutive non-overlapping buffers + + /// Projects each element of an observable sequence into consecutive non-overlapping buffers /// which are produced based on timing information, using the specified scheduler to run timers. let bufferSpanOn (scheduler:IScheduler) timeSpan source = Observable.Buffer (source, timeSpan, scheduler) - + /// Projects each element of an observable sequence into a buffer that goes /// sent out when either it's full or a specific amount of time has elapsed /// Analogy - A boat that departs when it's full or at its scheduled time to leave - let bufferSpanCount (timeSpan:TimeSpan) (count:int) source = + let bufferSpanCount (timeSpan:TimeSpan) (count:int) source = Observable.Buffer(source, timeSpan, count) - - /// Projects each element of an observable sequence into a buffer that's sent out + + /// Projects each element of an observable sequence into a buffer that's sent out /// when either it's full or a given amount of time has elapsed, using the specified scheduler to run timers. - /// Analogy - A ferry leaves the dock when all the seats are taken, or at the scheduled time or departure, + /// Analogy - A ferry leaves the dock when all the seats are taken, or at the scheduled time or departure, /// whichever event occurs first. let bufferSpanCountOn (scheduler:IScheduler) (timeSpan:TimeSpan) (count:int) source = Observable.Buffer(source, timeSpan, count, scheduler) - /// Projects each element of an observable sequence into zero of more buffers. + /// Projects each element of an observable sequence into zero of more buffers. /// bufferOpenings - observable sequence whose elements denote the opening of each produced buffer /// bufferClosing - observable sequence whose elements denote the closing of each produced buffer - let bufferFork ( bufferOpenings:IObservable<'BufferOpening>) - ( bufferClosingSelector: 'BufferOpening ->IObservable<'T> ) source = + let bufferFork ( bufferOpenings:IObservable<'BufferOpening>) + ( bufferClosingSelector: 'BufferOpening ->IObservable<'T> ) source = Observable.Buffer( source, bufferOpenings,Func<_,_> bufferClosingSelector) - /// Projects each element of an observable sequence into + /// Projects each element of an observable sequence into /// zero or more buffers produced based on timing information - let bufferSpanShift (timeSpan:TimeSpan) (timeShift:TimeSpan) source = + let bufferSpanShift (timeSpan:TimeSpan) (timeShift:TimeSpan) source = Observable.Buffer(source, timeSpan, timeShift) - + /// Projects each element of an observable sequence into /// zero or more buffers which are produced based on timing information, /// using the specified scheduler to run timers. @@ -275,7 +275,7 @@ module Observable = // #endregion - + /// Converts the elements of the sequence to the specified type let cast<'CastType> (source) = Observable.Cast<'CastType>(source) @@ -292,7 +292,7 @@ module Observable = let caseDefault selector (defaulSource:IObservable<'Result>) (sources:IDictionary<'Value,IObservable<'Result>>) = Observable.Case( Func<'Value> selector, sources, defaulSource ) - + /// Uses selector to determine which source in sources to return, /// choosing an empty sequence on the specified scheduler if no match is found. let caseOn (scheduler:IScheduler) selector sources = @@ -301,8 +301,8 @@ module Observable = /// Continues an observable sequence that is terminated /// by an exception with the next observable sequence. let catch (second: IObservable<'T>) first = - Observable.Catch(first, second) - + Observable.Catch(first, second) + /// Continues an observable sequence that is terminated by an exception /// with an optional as result type. @@ -313,7 +313,7 @@ module Observable = /// Continues an observable sequence that is terminated by an exception of - /// the specified type with the observable sequence produced by the handler, + /// the specified type with the observable sequence produced by the handler, /// wrapped in a 'Result' type. let catchResult handler (source : IObservable<_>) = let normal = source.Select(Func<_, _> Result.Ok) @@ -338,20 +338,20 @@ module Observable = /// Produces an enumerable sequence of consequtive (possibly empty) chunks of the source observable - let chunkify<'Source> source : seq> = + let chunkify<'Source> source : seq> = Observable.Chunkify<'Source>( source ) - /// Concatenates the observable sequences obtained by applying the map for each element in the given enumerable + /// Concatenates the observable sequences obtained by applying the map for each element in the given enumerable let collect ( map )( source:seq<'Source> ) : IObservable<'Result> = Observable.For( source, Func<'Source, IObservable<'Result>> map ) - + /// Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations. /// merge - Merges a sequence element with the current collector /// newCollector - Factory to create a new collector object. - let collectMerge newCollector merge source = + let collectMerge newCollector merge source = Observable.Collect( source, Func<_> newCollector,Func<_,_,_> merge ) @@ -360,7 +360,7 @@ module Observable = /// getNewCollector - Factory to replace the current collector by a new collector /// getInitialCollector - Factory to create the initial collector object. let collectMergeInit getInitialCollector merge getNewCollector source = - Observable.Collect( source , Func<_> getInitialCollector , + Observable.Collect( source , Func<_> getInitialCollector , Func<_,_,_> merge, Func<_,_> getNewCollector ) // #region CombineLatest Functions @@ -371,8 +371,8 @@ module Observable = let combineLatest ( source1 : IObservable<'T1> ) ( source2 : IObservable<'T2> ) = Observable.CombineLatest(source1, source2, fun t1 t2 -> (t1, t2) ) - /// Merges the specified observable sequences into one observable sequence by - /// emmiting a list with the latest source elements of whenever any of the + /// Merges the specified observable sequences into one observable sequence by + /// emmiting a list with the latest source elements of whenever any of the /// observable sequences produces an element. let combineLatestSeq (source :seq> ) : IObservable> = Observable.CombineLatest( source ) @@ -381,7 +381,7 @@ module Observable = /// Merges the specified observable sequences into one observable sequence by applying the map /// whenever any of the observable sequences produces an element. let combineLatestArray (source :IObservable<'T>[] ) = - Observable.CombineLatest( source ) + Observable.CombineLatest( source ) /// Merges the specified observable sequences into one observable sequence by applying the map @@ -391,28 +391,28 @@ module Observable = /// Concatenates the second observable sequence to the first observable sequence - /// upn the successful termination of the first + /// upn the successful termination of the first let concat (second: IObservable<'T>) (first: IObservable<'T>) = Observable.Concat(first, second) - + /// Concatenates all observable sequences within the sequence as long as - /// the previous observable sequence terminated successfully + /// the previous observable sequence terminated successfully let concatSeq (sources:seq>) : IObservable<'T> = Observable.Concat(sources) /// Concatenates all of the specified observable sequences as long as - /// the previous observable sequence terminated successfully + /// the previous observable sequence terminated successfully let concatArray (sources:IObservable<'T>[]) = Observable.Concat(sources) /// Concatenates all of the inner observable sequences as long as - /// the previous observable sequence terminated successfully + /// the previous observable sequence terminated successfully let concatInner (sources: IObservable>) = Observable.Concat( sources ) - + /// Concatenates all task results as long as /// the previous taskterminated successfully @@ -422,36 +422,36 @@ module Observable = /// Connects the observable wrapper to its source. All subscribed /// observers will recieve values from the underlying observable - /// sequence as long as the connection is established. + /// sequence as long as the connection is established. /// ( publish an Observable to get a ConnectableObservable ) - let connect ( source:Subjects.IConnectableObservable<_> ) = + let connect ( source:Subjects.IConnectableObservable<_> ) = source.Connect() - /// Determines whether an observable sequence contains a specified + /// Determines whether an observable sequence contains a specified /// element by using the default equality comparer. let contains value source = Observable.Contains( source, value ) - /// Determines whether an observable sequence contains a + /// Determines whether an observable sequence contains a /// specified element by using a specified EqualityComparer let containsCompare comparer value source = Observable.Contains( source, value, comparer ) /// Counts the elements - let count source = + let count source = Observable.Count(source) - /// Returns an observable sequence containing an int that represents how many elements + /// Returns an observable sequence containing an int that represents how many elements /// in the specified observable sequence satisfy a condition. - let countSatisfy predicate source = + let countSatisfy predicate source = Observable.Count( source, Func<_,_> predicate ) - /// Returns the elements of the specified sequence or the type parameter's default value + /// Returns the elements of the specified sequence or the type parameter's default value /// in a singleton sequence if the sequence is empty. let defaultIfEmpty ( source:IObservable<'Source> ): IObservable<'Source> = Observable.DefaultIfEmpty( source ) @@ -460,9 +460,9 @@ module Observable = /// Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty. let defaultIfEmptyIs (defaultValue:'Source )( source:IObservable<'Source> ) : IObservable<'Source> = Observable.DefaultIfEmpty( source, defaultValue ) - - /// Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes. + + /// Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes. let defer ( observableFactory: unit -> IObservable<'Result> ): IObservable<'Result> = Observable.Defer(Func> observableFactory ) @@ -482,7 +482,7 @@ module Observable = /// The relative time intervals between the values are preserved. let delayUntil ( source:IObservable<'Source> ) ( dueTime:DateTimeOffset ) : IObservable<'Source> = Observable.Delay(source, dueTime ) - + /// Time shifts the observable sequence to start propagating notifications at the specified absolute time, /// using the specified scheduler to run timers. /// The relative time intervals between the values are preserved. @@ -496,7 +496,7 @@ module Observable = /// Time shifts the observable sequence based on a subscription delay and a delay selector function for each element. let delayMapFilter ( delayDurationSelector : 'Source -> IObservable<'TDelay>) - ( subscriptionDelay : IObservable<'TDelay>) + ( subscriptionDelay : IObservable<'TDelay>) ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.Delay(source, subscriptionDelay, Func<'Source, IObservable<'TDelay>> delayDurationSelector) @@ -505,7 +505,7 @@ module Observable = let delaySubscription ( dueTime:TimeSpan) ( source:IObservable<'Source> ): IObservable<'Source> = Observable.DelaySubscription( source, dueTime ) - + /// Time shifts the observable sequence by delaying the subscription with the specified relative time duration, /// using the specified scheduler to run timers. let delaySubscriptionOn (scheduler:IScheduler) (dueTime:TimeSpan) source = @@ -515,7 +515,7 @@ module Observable = let delaySubscriptionUntil ( dueTime:DateTimeOffset) ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.DelaySubscription( source, dueTime ) - + /// Time shifts the observable sequence by delaying the subscription to the specified absolute time, /// using the specified scheduler to run timers. let delaySubscriptionUntilOn (scheduler:IScheduler) (dueTime:DateTimeOffset) source = @@ -523,11 +523,11 @@ module Observable = /// Dematerializes the explicit notification values of an observable sequence as implicit notifications. - let dematerialize source = + let dematerialize source = Observable.Dematerialize(source) - /// Returns an observable sequence that only contains distinct elements + /// Returns an observable sequence that only contains distinct elements let distinct ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.Distinct( source ) @@ -545,9 +545,9 @@ module Observable = /// Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer. let distinctKeyCompare ( keySelector:'Source -> 'Key )( comparer:IEqualityComparer<'Key>)( source:IObservable<'Source> ) : IObservable<'Source> = Observable.Distinct( source, Func<'Source,'Key> keySelector, comparer ) - - /// Returns an observable sequence that only contains distinct contiguous elements + + /// Returns an observable sequence that only contains distinct contiguous elements let distinctUntilChanged ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.DistinctUntilChanged(source) @@ -555,7 +555,7 @@ module Observable = /// Returns an observable sequence that contains only distinct contiguous elements according to the keySelector. let distinctUntilChangedKey ( keySelector:'Source -> 'Key )( source:IObservable<'Source> ) : IObservable<'Source> = Observable.DistinctUntilChanged( source, Func<'Source,'Key> keySelector ) - + /// Returns an observable sequence that contains only distinct contiguous elements according to the comparer. let distinctUntilChangedCompare ( comparer:IEqualityComparer<'Source> )( source:IObservable<'Source> ) : IObservable<'Source> = @@ -577,7 +577,7 @@ module Observable = Observable.ElementAt( source, index ) /// Ignores all elements in an observable sequence leaving only the completed/error notifications - let ignoreElements source = + let ignoreElements source = Observable.IgnoreElements (source) /// Returns an empty observable @@ -625,29 +625,29 @@ module Observable = /// Determines whether an observable sequence contains a specified value /// which satisfies the given predicate - let exists predicate source = + let exists predicate source = Observable.Any(source, Func<_,_> predicate) - /// Filters the observable elements of a sequence based on a predicate - let filter predicate (source: IObservable<'T>) = + /// Filters the observable elements of a sequence based on a predicate + let filter predicate (source: IObservable<'T>) = Observable.Where( source, Func<_,_> predicate ) - /// Filters the observable elements of a sequence based on a predicate by + /// Filters the observable elements of a sequence based on a predicate by /// incorporating the element's index - let filteri predicate (source: IObservable<'T>) = + let filteri predicate (source: IObservable<'T>) = Observable.Where( source, Func<_,_,_> (fun i x -> predicate x i) ) /// Invokes a specified action after the source observable sequence /// terminates gracefully or exceptionally let finallyDo finallyAction source = - Observable.Finally( source, Action finallyAction ) + Observable.Finally( source, Action finallyAction ) /// Returns the first element of an observable sequence - let first (source:IObservable<'T>) = + let first (source:IObservable<'T>) = source.FirstAsync() @@ -657,69 +657,69 @@ module Observable = source.FirstAsync( Func<_,_> predicate ) - /// Projects each element of an observable sequence to an observable sequence + /// Projects each element of an observable sequence to an observable sequence /// and merges the resulting observable sequences into one observable sequenc - let flatmap map source = + let flatmap map source = Observable.SelectMany(source, Func<'S,IObservable<'R>> map ) - /// Projects each element of an observable sequence to an observable sequence by incorporating the + /// Projects each element of an observable sequence to an observable sequence by incorporating the /// element's index and merges the resulting observable sequences into one observable sequence. -// let flatmapi map source = +// let flatmapi map source = // Observable.SelectMany(source,Func<'Source,int,seq<'Result>> map ) // - /// Projects each element of the source observable sequence to the other observable sequence + /// Projects each element of the source observable sequence to the other observable sequence /// and merges the resulting observable sequences into one observable sequence. let flatmapOther ( other:IObservable<'Other> ) ( source:IObservable<'Source> ): IObservable<'Other> = Observable.SelectMany( source, other ) - /// Projects each element of an observable sequence to an enumerable sequence and concatenates + /// Projects each element of an observable sequence to an enumerable sequence and concatenates /// the resulting enumerable sequences into one observable sequence. - let flatmapSeq map source = + let flatmapSeq map source = Observable.SelectMany(source, Func<'Source,seq<'Result>> map) - /// Projects each element of an observable sequence to an enumerable sequence by incorporating the + /// Projects each element of an observable sequence to an enumerable sequence by incorporating the /// element's index and concatenates the resulting enumerable sequences into one observable sequence. -// let flatmapSeqi map source = +// let flatmapSeqi map source = // Observable.SelectMany(source, Func<'Source,int,seq<'Result>> map) // -// +// /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence. let flatmapTask ( map ) ( source:IObservable<'Source> ) : IObservable<'Result> = - Observable.SelectMany( source, Func<'Source,Threading.Tasks.Task<'Result>> map ) + Observable.SelectMany( source, Func<'Source,Threading.Tasks.Task<'Result>> map ) ///Turns an F# async workflow into an observable - let ofAsync asyncOperation = + let ofAsync asyncOperation = Observable.FromAsync (fun (token : Threading.CancellationToken) -> Async.StartAsTask(asyncOperation, cancellationToken = token)) ///Helper function for turning async workflows into observables - let liftAsync asyncOperation = + let liftAsync asyncOperation = asyncOperation >> ofAsync /// Projects each element of an observable sequence to a async workflow and merges all of the async worksflow results into one observable sequence. - let flatmapAsync asyncOperation (source : IObservable<'Source>) = + let flatmapAsync asyncOperation (source : IObservable<'Source>) = source.SelectMany(fun item -> liftAsync asyncOperation item) - - /// Projects each element of an observable sequence to a task by incorporating the element's index + + /// Projects each element of an observable sequence to a task by incorporating the element's index /// and merges all of the task results into one observable sequence. // let flatmapTaski ( map ) ( source:IObservable<'Source> ) : IObservable<'Result> = -// Observable.SelectMany( source, Func<'Source,int,Threading.Tasks.Task<'Result>> map ) +// Observable.SelectMany( source, Func<'Source,int,Threading.Tasks.Task<'Result>> map ) - /// Applies an accumulator function over an observable sequence, returning the + /// Applies an accumulator function over an observable sequence, returning the /// result of the fold as a single element in the result sequence /// init is the initial accumulator value let fold accumulator init source = Observable.Aggregate(source, init, Func<_,_,_> accumulator) - /// Applies an accumulator function over an observable sequence, returning the + /// Applies an accumulator function over an observable sequence, returning the /// result of the fold as a single element in the result sequence /// init is the initial accumulator value, map is performed after the fold let foldMap accumulator init map source = @@ -731,59 +731,59 @@ module Observable = let fromEvent ( addHandler )( removeHandler ) : IObservable = Observable.FromEvent( Action<'Delegate> addHandler, Action<'Delegate> removeHandler ) - + /// Converts an Action-based .NET event to an observable sequence. Each event invocation is surfaced through an OnNext message in the resulting sequence. /// For conversion of events conforming to the standard .NET event pattern, use any of the FromEventPattern overloads instead. let fromEventOn scheduler addHandler removeHandler = Observable.FromEvent(Action<_> addHandler, Action<_> removeHandler, scheduler) - + /// Converts an generic Action-based .NET event to an observable sequence. Each event invocation is surfaced through an OnNext message in the resulting sequence. /// For conversion of events conforming to the standard .NET event pattern, use any of the FromEventPattern overloads instead. let fromEventGeneric addHandler removeHandler : IObservable<'TEventArgs> = Observable.FromEvent(Action<'TEventArgs -> unit> addHandler, Action<'TEventArgs -> unit> removeHandler) - + /// Converts an generic Action-based .NET event to an observable sequence. Each event invocation is surfaced through an OnNext message in the resulting sequence. /// For conversion of events conforming to the standard .NET event pattern, use any of the FromEventPattern overloads instead. let fromEventGenericOn scheduler addHandler removeHandler = Observable.FromEvent(Action<#EventArgs -> unit> addHandler, Action<#EventArgs -> unit> removeHandler, scheduler) - /// Converts a .NET event to an observable sequence, using a conversion function to obtain the event delegate. + /// Converts a .NET event to an observable sequence, using a conversion function to obtain the event delegate. /// Each event invocation is surfaced through an OnNext message in the resulting sequence. /// For conversion of events conforming to the standard .NET event pattern, use any of the FromEventPattern functions instead. - let fromEventConversion conversion addHandler removeHandler = + let fromEventConversion conversion addHandler removeHandler = Observable.FromEvent( - conversion = Func, unit> (fun action -> conversion (fun args -> action.Invoke(args))), - addHandler = Action<_> addHandler, + conversion = Func, unit> (fun action -> conversion (fun args -> action.Invoke(args))), + addHandler = Action<_> addHandler, removeHandler = Action<_> removeHandler ) - - /// Converts a .NET event to an observable sequence, using a conversion function to obtain the event delegate, using a specified scheduler to run timers. + + /// Converts a .NET event to an observable sequence, using a conversion function to obtain the event delegate, using a specified scheduler to run timers. /// Each event invocation is surfaced through an OnNext message in the resulting sequence. /// For conversion of events conforming to the standard .NET event pattern, use any of the FromEventPattern functions instead. let fromEventConversionOn scheduler conversion addHandler removeHandler = - Observable.FromEventPattern + Observable.FromEventPattern (Func, 'TDelegate> conversion, Action<'TDelegate> addHandler, Action<'TDelegate> removeHandler, scheduler) - /// Converts a .NET event to an observable sequence, using a supplied event delegate type. + /// Converts a .NET event to an observable sequence, using a supplied event delegate type. /// Each event invocation is surfaced through an OnNext message in the resulting sequence. let fromEventHandler addHandler removeHandler = Observable.FromEventPattern<#EventArgs> ( - Action> addHandler, + Action> addHandler, Action> removeHandler) - - /// Converts a .NET event to an observable sequence, using a supplied event delegate type on a specified scheduler. + + /// Converts a .NET event to an observable sequence, using a supplied event delegate type on a specified scheduler. /// Each event invocation is surfaced through an OnNext message in the resulting sequence. let fromEventHandlerOn (scheduler:IScheduler) addHandler removeHandler = Observable.FromEventPattern<#EventArgs> ( - Action> addHandler, - Action> removeHandler, + Action> addHandler, + Action> removeHandler, scheduler) @@ -791,24 +791,24 @@ module Observable = let fromEventPattern eventName (target:obj) = Observable.FromEventPattern( target, eventName ) - + /// Generates an observable sequence by running a state-driven loop producing the sequence's elements. - let generate initialState condition iterator resultMap = - Observable.Generate( initialState, - Func<'State,bool> condition , - Func<'State,'State> iterator , + let generate initialState condition iterator resultMap = + Observable.Generate( initialState, + Func<'State,bool> condition , + Func<'State,'State> iterator , Func<'State,'Result> resultMap ) /// Generates an observable sequence by running a state-driven loop producing the sequence's elements. let generateOn (scheduler:IScheduler) initialState condition iterator resultMap = Observable.Generate( - initialState, - Func<'State, bool> condition, - Func<'State, 'State> iterator, - Func<'State, 'TResult> resultMap, + initialState, + Func<'State, bool> condition, + Func<'State, 'State> iterator, + Func<'State, 'TResult> resultMap, scheduler) - + /// Generates an observable sequence by running a state-driven and temporal loop producing the sequence's elements. let generateTimed( initialState:'State ) @@ -816,9 +816,9 @@ module Observable = ( iterate ) ( resultMap ) ( timeSelector ) : IObservable<'Result> = - Observable.Generate( initialState , - Func<'State,bool> condition , - Func<'State,'State> iterate , + Observable.Generate( initialState , + Func<'State,bool> condition , + Func<'State,'State> iterate , Func<'State,'Result> resultMap , Func<'State,DateTimeOffset>timeSelector ) @@ -831,9 +831,9 @@ module Observable = ( iterate ) ( resultMap ) ( timeSelector ) : IObservable<'Result> = - Observable.Generate( initialState , - Func<'State,bool> condition , - Func<'State,'State> iterate , + Observable.Generate( initialState , + Func<'State,bool> condition , + Func<'State,'State> iterate , Func<'State,'Result> resultMap , Func<'State,DateTimeOffset>timeSelector , scheduler) @@ -859,7 +859,7 @@ module Observable = Observable.GroupBy( source,Func<'Source,'Key> keySelector ) - /// Groups the elements of an observable sequence with the specified initial + /// Groups the elements of an observable sequence with the specified initial /// capacity according to a specified key selector function. // let groupByCapacity ( keySelector ) // ( capacity:int ) @@ -876,12 +876,12 @@ module Observable = /// Groups the elements of an observable sequence and selects the resulting elements by using a specified function. let groupByElement ( keySelector ) - ( elementSelector ) + ( elementSelector ) ( source ) : IObservable> = Observable.GroupBy( source, Func<'Source,'Key> keySelector, Func<'Source,'Element> elementSelector ) - /// Groups the elements of an observable sequence with the specified initial capacity + /// Groups the elements of an observable sequence with the specified initial capacity /// according to a specified key selector function and comparer. // let groupByCapacityCompare // ( keySelector ) @@ -889,7 +889,7 @@ module Observable = // ( comparer : IEqualityComparer<'Key> ) // ( source : IObservable<'Source> ) : IObservable> = // Observable.GroupBy( source, Func<'Source,'Key> keySelector, capacity, comparer ) -// +// /// Groups the elements of an observable sequence with the specified initial capacity /// and selects the resulting elements by using a specified function. @@ -901,7 +901,7 @@ module Observable = Observable.GroupBy( source, Func<'Source,'Key> keySelector, Func<'Source,'Element> elementSelector ) - /// Groups the elements of an observable sequence according to a specified key selector function + /// Groups the elements of an observable sequence according to a specified key selector function /// and comparer and selects the resulting elements by using a specified function. let groupByCompareElement ( keySelector ) @@ -912,19 +912,19 @@ module Observable = - /// Groups the elements of an observable sequence with the specified initial capacity according to a + /// Groups the elements of an observable sequence with the specified initial capacity according to a /// specified key selector function and comparer and selects the resulting elements by using a specified function. // let groupByCapacityCompareElement // ( keySelector ) // ( capacity : int ) -// ( comparer : IEqualityComparer<'Key> ) -// ( elementSelector ) +// ( comparer : IEqualityComparer<'Key> ) +// ( elementSelector ) // ( source : IObservable<'Source> ) : IObservable> = // Observable.GroupBy( source, Func<'Source,'Key> keySelector, Func<'Source,'Element> elementSelector, capacity, comparer ) // /// Groups the elements of an observable sequence according to a specified key selector function. - /// A duration selector function is used to control the lifetime of groups. When a group expires, + /// A duration selector function is used to control the lifetime of groups. When a group expires, /// it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. let groupByUntil( keySelector ) @@ -934,32 +934,32 @@ module Observable = /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function. - /// A duration selector function is used to control the lifetime of groups. When a group + /// A duration selector function is used to control the lifetime of groups. When a group /// expires, it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. // let groupByCapacityUntil // ( keySelector ) -// ( capacity : int ) +// ( capacity : int ) // ( durationSelector ) // ( source : IObservable<'Source> ): IObservable> = // Observable.GroupByUntil( source, Func<'Source,'Key> keySelector,Func,IObservable<'TDuration>> durationSelector, capacity) // /// Groups the elements of an observable sequence according to a specified key selector function and comparer. - /// A duration selector function is used to control the lifetime of groups. When a group expires, + /// A duration selector function is used to control the lifetime of groups. When a group expires, /// it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. let groupByComparerUntil ( keySelector) - ( comparer: IEqualityComparer<'Key> ) + ( comparer: IEqualityComparer<'Key> ) ( durationSelector ) ( source:IObservable<'Source> ) : IObservable> = Observable.GroupByUntil( source, Func<'Source,'Key> keySelector, Func,IObservable<'TDuration>> durationSelector, comparer ) - /// Groups the elements of an observable sequence according to a specified key selector function + /// Groups the elements of an observable sequence according to a specified key selector function /// and selects the resulting elements by using a specified function. - /// A duration selector function is used to control the lifetime of groups. When a group expires, + /// A duration selector function is used to control the lifetime of groups. When a group expires, /// it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. let groupByElementUntil @@ -973,41 +973,41 @@ module Observable = /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. -// let groupByCapacityComparerUntil +// let groupByCapacityComparerUntil // ( keySelector ) -// ( durationSelector ) -// ( capacity : int ) +// ( durationSelector ) +// ( capacity : int ) // ( comparer : IEqualityComparer<'Key> ) // ( source : IObservable<'Source> ) : IObservable> = -// Observable.GroupByUntil( source, -// Func<'Source,'Key> keySelector, -// Func,IObservable<'TDuration>> durationSelector, -// capacity, +// Observable.GroupByUntil( source, +// Func<'Source,'Key> keySelector, +// Func,IObservable<'TDuration>> durationSelector, +// capacity, // comparer ) - /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key + /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key /// selector function and selects the resulting elements by using a specified function. - /// A duration selector function is used to control the lifetime of groups. When a group + /// A duration selector function is used to control the lifetime of groups. When a group /// expires, it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. -// let groupByCapacityElementUntil +// let groupByCapacityElementUntil // ( keySelector ) -// ( capacity : int ) +// ( capacity : int ) // ( elementSelector ) // ( durationSelector ) // ( source : IObservable<'Source> ) : IObservable> = // Observable.GroupByUntil( source, Func<'Source,'Key>keySelector, Func<'Source,'Element>elementSelector, Func,IObservable<'TDuration>> durationSelector, capacity ) - /// Groups the elements of an observable sequence according to a specified key selector function and + /// Groups the elements of an observable sequence according to a specified key selector function and /// comparer and selects the resulting elements by using a specified function. /// A duration selector function is used to control the lifetime of groups. When a group expires, /// it receives an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. let groupByComparerElementUntil ( keySelector ) - ( comparer:Collections.Generic.IEqualityComparer<'Key>) + ( comparer:Collections.Generic.IEqualityComparer<'Key>) ( elementSelector ) ( durationSelector ) ( source:IObservable<'Source> ) : IObservable> = @@ -1016,7 +1016,7 @@ module Observable = /// Groups the elements of an observable sequence with the specified initial capacity according to a specified /// key selector function and comparer and selects the resulting elements by using a specified function. - /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives + /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives /// an OnCompleted notification. When a new element with the same /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request. // let groupByCapacityComparerElementUntil @@ -1029,25 +1029,25 @@ module Observable = // Observable.GroupByUntil( source, Func<'Source,'Key>keySelector, Func<'Source,'Element>elementSelector, Func,IObservable<'TDuration>>durationSelector, capacity, comparer ) - /// Correlates the elements of two sequences based on overlapping + /// Correlates the elements of two sequences based on overlapping /// durations and groups the results - let groupJoin ( left : IObservable<'Left> ) - ( right : IObservable<'Right> ) - ( leftselect : 'Left -> IObservable<'a> ) - ( rightselect : 'Right-> IObservable<'b> ) - ( resultselect: 'Left -> IObservable<'Right>->'Result ) = - Observable.GroupJoin( left, right, - Func<'Left , IObservable<'a>> leftselect , - Func<'Right, IObservable<'b>> rightselect , + let groupJoin ( left : IObservable<'Left> ) + ( right : IObservable<'Right> ) + ( leftselect : 'Left -> IObservable<'a> ) + ( rightselect : 'Right-> IObservable<'b> ) + ( resultselect: 'Left -> IObservable<'Right>->'Result ) = + Observable.GroupJoin( left, right, + Func<'Left , IObservable<'a>> leftselect , + Func<'Right, IObservable<'b>> rightselect , Func<'Left , IObservable<'Right>,'Result>resultselect) /// Creates an observable that calls the specified function (each time) - /// after an observer is attached to the observable. This is useful to - /// make sure that events triggered by the function are handled. - let guard f (source:IObservable<'Args>) = - Observable.Create (fun observer -> - let disposable = source.Subscribe observer in f () + /// after an observer is attached to the observable. This is useful to + /// make sure that events triggered by the function are handled. + let guard f (source:IObservable<'Args>) = + Observable.Create (fun observer -> + let disposable = source.Subscribe observer in f () disposable ) @@ -1056,69 +1056,69 @@ module Observable = let head obs = Observable.FirstAsync(obs) /// Returns an observable sequence that produces a value after each period - let interval period = + let interval period = Observable.Interval( period ) /// Returns an observable sequence that produces a value on the specified scheduler after each period - let intervalOn (scheduler : IScheduler) period = + let intervalOn (scheduler : IScheduler) period = Observable.Interval( period, scheduler ) - /// IsEmpty returns an Observable that emits true if and only if the - /// source Observable completes without emitting any items. - let isEmpty source = + /// IsEmpty returns an Observable that emits true if and only if the + /// source Observable completes without emitting any items. + let isEmpty source = Observable.IsEmpty source - /// Invokes an action for each element in the observable sequence, and propagates all observer - /// messages through the result sequence. This method can be used for debugging, logging, etc. of query + /// Invokes an action for each element in the observable sequence, and propagates all observer + /// messages through the result sequence. This method can be used for debugging, logging, etc. of query /// behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. let iter ( onNext ) ( source:IObservable<'Source> ): IObservable<'Source> = Observable.Do( source, Action<'Source> onNext ) - - /// Invokes an action for each element in the observable sequence and invokes an action + + /// Invokes an action for each element in the observable sequence and invokes an action /// upon graceful termination of the observable sequence. This method can be used for debugging, /// logging, etc. of query behavior by intercepting the message stream to run arbitrary /// actions for messages on the pipeline. let iterEnd ( onNext )( onCompleted ) ( source:IObservable<'Source> ): IObservable<'Source> = - Observable.Do( source, Action<'Source> onNext, Action onCompleted ) + Observable.Do( source, Action<'Source> onNext, Action onCompleted ) - /// Invokes an action for each element in the observable sequence and invokes an action upon - /// exceptional termination of the observable sequence. This method can be used for debugging, - /// logging, etc. of query behavior by intercepting the message stream to run arbitrary + /// Invokes an action for each element in the observable sequence and invokes an action upon + /// exceptional termination of the observable sequence. This method can be used for debugging, + /// logging, etc. of query behavior by intercepting the message stream to run arbitrary /// actions for messages on the pipeline. let iterError ( onNext)( onError ) ( source:IObservable<'Source> ): IObservable<'Source> = - Observable.Do( source, Action<'Source> onNext, Action onError ) - + Observable.Do( source, Action<'Source> onNext, Action onError ) - /// Invokes an action for each element in the observable sequence and invokes an action + + /// Invokes an action for each element in the observable sequence and invokes an action /// upon graceful or exceptional termination of the observable sequence. - /// This method can be used for debugging, logging, etc. of query behavior by intercepting + /// This method can be used for debugging, logging, etc. of query behavior by intercepting /// the message stream to run arbitrary actions for messages on the pipeline. let iterErrorEnd ( onNext )( onError ) ( onCompleted ) ( source:IObservable<'Source> ): IObservable<'Source> = - Observable.Do( source, Action<'Source> onNext, Action onError, Action onCompleted ) - + Observable.Do( source, Action<'Source> onNext, Action onError, Action onCompleted ) + /// Invokes the observer's methods for each message in the source sequence. - /// This method can be used for debugging, logging, etc. of query behavior by intercepting + /// This method can be used for debugging, logging, etc. of query behavior by intercepting /// the message stream to run arbitrary actions for messages on the pipeline. let iterObserver ( observer:IObserver<'Source> ) ( source:IObservable<'Source> ): IObservable<'Source> = - Observable.Do( source,observer ) + Observable.Do( source,observer ) /// Correlates the elements of two sequences based on overlapping durations. let join right f left = Observable.Join( left, right, Func<_, _> Observable.Return, Func<_, _> Observable.Return, Func<_, _, _> f ) - + /// Correlates the elements of two sequences based on overlapping durations. let joinMap right fLeft fRight fResult left = Observable.Join( left, right, Func<_, _> fLeft, Func<_, _> fRight, Func<_, _, _> fResult ) /// Joins together the results from several patterns - let joinWhen (plans:seq>): IObservable<'T> = + let joinWhen (plans:seq>): IObservable<'T> = Observable.When( plans ) @@ -1127,25 +1127,25 @@ module Observable = Observable.LastAsync( source ) - /// Returns the last element of an observable sequence that satisfies the condition in the predicate + /// Returns the last element of an observable sequence that satisfies the condition in the predicate let lastIf ( predicate ) ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.LastAsync( source, Func<'Source,bool> predicate ) /// Returns an enumerable sequence whose enumeration returns the latest observed element in the source observable sequence. - /// Enumerators on the resulting sequence will never produce the same element repeatedly, + /// Enumerators on the resulting sequence will never produce the same element repeatedly, /// and will block until the next element becomes available. let latest source = Observable.Latest( source ) - /// Returns an observable sequence containing a int64 that represents - /// the total number of elements in an observable sequence + /// Returns an observable sequence containing a int64 that represents + /// the total number of elements in an observable sequence let longCount source = Observable.LongCount(source) - /// Returns an observable sequence containing an int that represents how many elements + /// Returns an observable sequence containing an int that represents how many elements /// in the specified observable sequence satisfy a condition. let longCountSatisfy predicate source = Observable.LongCount(source, Func<_,_> predicate) @@ -1155,7 +1155,7 @@ module Observable = let map f source = Observable.Select(source, Func<_,_>(f)) - /// Maps the given observable with the given function and the + /// Maps the given observable with the given function and the /// index of the element let mapi (f:int -> 'Source -> 'Result) (source:IObservable<'Source>) = Observable.Select (source, Func<_,_,_> (fun i x -> f x i)) @@ -1168,28 +1168,28 @@ module Observable = /// Combines 'map' and 'fold'. Builds an observable whose emits are the result of applying the given function to each of the emits of the source observable. /// The function is also used to accumulate a final value. let mapFold (f : 'TState -> 'T -> 'TResult * 'TState) (init : 'TState) source = - Observable.Aggregate(source, ([], init), - Func<_, _, _> (fun (ys, state) x -> + Observable.Aggregate(source, ([], init), + Func<_, _, _> (fun (ys, state) x -> let y, state = f state x y :: ys, state)) - + /// Maps every emission to a constant value. let mapTo x (source : IObservable<'T>) = Observable.Select (source, Func<_, _> (fun _ -> x)) - + /// Maps every emission to a constant lazy value. let mapToLazy (xLazy : Lazy<'T>) (source : IObservable<'T>) = Observable.Select (source, Func<_, _> (fun _ -> xLazy.Force ())) - - + + /// Materializes the implicit notifications of an observable sequence as /// explicit notification values - let materialize source = + let materialize source = Observable.Materialize( source ) - + @@ -1206,21 +1206,21 @@ module Observable = let mergeArray (sources:IObservable<'T>[]) = Observable.Merge(sources) - + /// Merges all the observable sequences into a single observable sequence, /// using a specified scheduler for enumeration of and subscription to the sources. let mergeArrayOn (scheduler:IScheduler) (sources:IObservable<'T>[]) = Observable.Merge(scheduler, sources) - /// Merges elements from all inner observable sequences + /// Merges elements from all inner observable sequences /// into a single observable sequence. let mergeInner (sources:IObservable>) = Observable.Merge(sources) - /// Merges elements from all inner observable sequences - /// into a single observable sequence limiting the number of concurrent + /// Merges elements from all inner observable sequences + /// into a single observable sequence limiting the number of concurrent /// subscriptions to inner sequences let mergeInnerMax (maxConcurrent:int) (sources:IObservable>) = Observable.Merge(sources, maxConcurrent) @@ -1242,7 +1242,7 @@ module Observable = let mergeSeqMax (maxConcurrent:int)(sources:seq>) = Observable.Merge(sources, maxConcurrent) - + /// Merges an enumerable sequence of observable sequences into an observable sequence, /// limiting the number of concurrent subscriptions to inner sequences, /// using a specified scheduler for enumeration of and subscription to the sources. @@ -1256,28 +1256,28 @@ module Observable = /// Returns the maximum element in an observable sequence. - let maxOf (source:IObservable<'T>) = + let maxOf (source:IObservable<'T>) = Observable.Max( source ) - /// Returns an enumerable sequence whose sequence whose enumeration returns the - /// most recently observed element in the source observable sequence, using - /// the specified - let mostRecent initialVal source = + /// Returns an enumerable sequence whose sequence whose enumeration returns the + /// most recently observed element in the source observable sequence, using + /// the specified + let mostRecent initialVal source = Observable.MostRecent( source, initialVal ) - /// Multicasts the source sequence notifications through the specified subject to - /// the resulting connectable observable. Upon connection of the connectable + /// Multicasts the source sequence notifications through the specified subject to + /// the resulting connectable observable. Upon connection of the connectable /// observable, the subject is subscribed to the source exactly one, and messages - /// are forwarded to the observers registered with the connectable observable. + /// are forwarded to the observers registered with the connectable observable. /// For specializations with fixed subject types, see Publish, PublishLast, and Replay. let multicast subject source = Observable.Multicast(source, subject) /// Multicasts the source sequence notifications through an instantiated subject into - /// all uses of the sequence within a selector function. Each subscription to the + /// all uses of the sequence within a selector function. Each subscription to the /// resulting sequence causes a separate multicast invocation, exposing the sequence /// resulting from the selector function's invocation. For specializations with fixed /// subject types, see Publish, PublishLast, and Replay. @@ -1285,31 +1285,31 @@ module Observable = Observable.Multicast(source, Func<_> subjectSelector, Func<_,_> selector) - /// Returns a non-terminating observable sequence, which can + /// Returns a non-terminating observable sequence, which can /// be used to denote an infinite duration (e.g. when using reactive joins). let infinite() = Observable.Never() - /// Returns a non-terminating observable sequence, which can be + /// Returns a non-terminating observable sequence, which can be /// used to denote an infinite duration (e.g. when using reactive joins). let neverWitness( witness ) = Observable.Never( witness ) /// Returns an observable sequence whose enumeration blocks until the next - /// element in the source observable sequence becomes available. + /// element in the source observable sequence becomes available. /// Enumerators on the resulting sequence will block until the next /// element becomes available. - let next source = - Observable.Next( source ) - + let next source = + Observable.Next( source ) + /// Returns the sequence as an observable let ofSeq<'Item>(source:'Item seq) : IObservable<'Item> = Observable.ToObservable source - + /// Returns the sequence as an observable, using the specified scheduler to run the enumeration loop let ofSeqOn<'Item>(scheduler:Concurrency.IScheduler) (items:'Item seq) : IObservable<'Item> = Observable.ToObservable (items, scheduler) @@ -1320,18 +1320,18 @@ module Observable = Observable.ObserveOn( source, scheduler ) - /// Wraps the source sequence in order to run its observer callbacks + /// Wraps the source sequence in order to run its observer callbacks /// on the specified synchronization context. let observeOnContext (context:SynchronizationContext) source = Observable.ObserveOn( source, context ) /// Filters the elements of an observable sequence based on the specified type - let ofType source = + let ofType source = Observable.OfType( source ) - /// Concatenates the second observable sequence to the first observable sequence + /// Concatenates the second observable sequence to the first observable sequence /// upon successful or exceptional termination of the first. let onErrorConcat ( second:IObservable<'Source> ) ( first:IObservable<'Source> ) : IObservable<'Source> = Observable.OnErrorResumeNext( first, second ) @@ -1342,7 +1342,7 @@ module Observable = Observable.OnErrorResumeNext( sources ) - /// Concatenates all observable sequences in the given enumerable sequence, even if the + /// Concatenates all observable sequences in the given enumerable sequence, even if the /// previous observable sequence terminated exceptionally. let onErrorConcatSeq ( sources:seq> ) : IObservable<'Source> = Observable.OnErrorResumeNext( sources ) @@ -1356,7 +1356,7 @@ module Observable = /// Logs the incoming emits with a given prefix to a specified target. let logTo prefix f source = let onNext = Action<_> (fun x -> f (sprintf "%s - OnNext(%A)" prefix x)) - let onError = Action (fun ex -> + let onError = Action (fun ex -> f (sprintf "%s - OnError:" prefix) f (sprintf "\t %A" ex)) let onCompleted = Action (fun () -> f (sprintf "%s - OnCompleted()" prefix)) @@ -1369,54 +1369,54 @@ module Observable = /// Invokes the finally action after source observable sequence terminates normally or by an exception. let performFinally f source = Observable.Finally(source, Action f) - + /// Returns a connectable observable sequence (IConnectableObsevable) that shares - /// a single subscription to the underlying sequence. This operator is a + /// a single subscription to the underlying sequence. This operator is a /// specialization of Multicast using a regular Subject - let publish source = + let publish source = Observable.Publish( source ) /// Returns a connectable observable sequence (IConnectableObsevable) that shares /// a single subscription to the underlying sequence and starts with the value /// initial. This operator is a specialization of Multicast using a regular Subject - let publishInitial (initial:'Source) (source:IObservable<'Source>) = + let publishInitial (initial:'Source) (source:IObservable<'Source>) = Observable.Publish( source, initial ) - /// Returns an observable sequence that is the result of invoking + /// Returns an observable sequence that is the result of invoking /// the selector on a connectable observable sequence that shares a - /// a single subscription to the underlying sequence. This operator is a + /// a single subscription to the underlying sequence. This operator is a /// specialization of Multicast using a regular Subject - let publishMap ( map:IObservable<'Source> -> IObservable<'Result> ) - ( source :IObservable<'Source> ) = + let publishMap ( map:IObservable<'Source> -> IObservable<'Result> ) + ( source :IObservable<'Source> ) = Observable.Publish( source, Func,IObservable<'Result>> map ) - /// Returns an observable sequence that is the result of + /// Returns an observable sequence that is the result of /// the map on a connectable observable sequence that shares a - /// a single subscription to the underlying sequence. This operator is a + /// a single subscription to the underlying sequence. This operator is a /// specialization of Multicast using a regular Subject let publishInitialMap ( initial : 'Source ) - ( map: IObservable<'Source> -> IObservable<'Result> ) - ( source : IObservable<'Source> ) = + ( map: IObservable<'Source> -> IObservable<'Result> ) + ( source : IObservable<'Source> ) = Observable.Publish( source, Func,IObservable<'Result>> map, initial ) - /// Returns an observable sequence that is the result of invoking - /// the selector on a connectable observable sequence containing - /// only the last notification This operator is a + /// Returns an observable sequence that is the result of invoking + /// the selector on a connectable observable sequence containing + /// only the last notification This operator is a /// specialization of Multicast using a regular Subject - let publishLast source = + let publishLast source = Observable.PublishLast( source ) - /// Returns an observable sequence that is the result of invoking + /// Returns an observable sequence that is the result of invoking /// the selector on a connectable observable sequence that shares a - /// a single subscription to the underlying sequence. This operator is a + /// a single subscription to the underlying sequence. This operator is a /// specialization of Multicast using a regular Subject - let publishLastMap ( map: IObservable<'Source> -> IObservable<'Result> ) source = + let publishLastMap ( map: IObservable<'Source> -> IObservable<'Result> ) source = Observable.PublishLast( source , Func,IObservable<'Result>> map ) @@ -1431,12 +1431,12 @@ module Observable = /// Reduces the observable let reduce f source = Observable.Aggregate(source, Func<_,_,_> f) - + /// Returns an observable that remains connected to the source as long - /// as there is at least one subscription to the observable sequence + /// as there is at least one subscription to the observable sequence /// ( publish an Observable to get a ConnectableObservable ) let refCount ( source )= - Observable.RefCount ( source ) + Observable.RefCount ( source ) /// Repeats the observable sequence indefinitely. @@ -1460,20 +1460,20 @@ module Observable = Observable.DoWhile( source, Func condition) - /// Returns a connectable observable sequence that shares a single subscription to the + /// Returns a connectable observable sequence that shares a single subscription to the /// underlying sequence replaying all notifications. - let replay ( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = - Observable.Replay( source ) + let replay ( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = + Observable.Replay( source ) - /// Returns a connectable observable sequence that shares a single subscription to the + /// Returns a connectable observable sequence that shares a single subscription to the /// underlying sequence replaying all notifications. let replayOn ( sch:IScheduler ) source = Observable.Replay( source, sch ) - /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence + /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence /// replaying notifications subject to a maximum element count for the replay buffer. let replayBuffer ( bufferSize:int )( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = - Observable.Replay( source, bufferSize ) + Observable.Replay( source, bufferSize ) /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence /// replaying notifications subject to a maximum element count for the replay buffer and using the specified @@ -1481,61 +1481,61 @@ module Observable = let replayBufferOn ( sch:IScheduler )( bufferSize:int )( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = Observable.Replay( source, bufferSize, sch ) - /// Returns an observable sequence that is the result of invoking the selector on a connectable observable + /// Returns an observable sequence that is the result of invoking the selector on a connectable observable /// sequence that shares a single subscription to the underlying sequence replaying all notifications. let replayMap ( map )( source:IObservable<'Source>) : IObservable<'Result> = - Observable.Replay( source, Func,IObservable<'Result>> map ) + Observable.Replay( source, Func,IObservable<'Result>> map ) - /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence + /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence /// replaying notifications subject to a maximum time length for the replay buffer. let replayWindow ( window:TimeSpan ) ( source:IObservable<'Source>): Subjects.IConnectableObservable<'Source> = - Observable.Replay( source, window ) + Observable.Replay( source, window ) - /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence + /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence /// replaying notifications subject to a maximum time length for the replay buffer. let replayWindowOn (scheduler:Concurrency.IScheduler) ( window:TimeSpan ) ( source:IObservable<'Source>): Subjects.IConnectableObservable<'Source> = - Observable.Replay( source, window, scheduler ) + Observable.Replay( source, window, scheduler ) /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence // replaying notifications subject to a maximum time length and element count for the replay buffer. let replayBufferWindow ( bufferSize:int )( window:TimeSpan )( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = - Observable.Replay( source, bufferSize, window ) + Observable.Replay( source, bufferSize, window ) /// Returns a connectable observable sequence that shares a single subscription to the underlying sequence // replaying notifications subject to a maximum time length and element count for the replay buffer. let replayBufferWindowOn (scheduler:Concurrency.IScheduler) ( bufferSize:int )( window:TimeSpan )( source:IObservable<'Source>) : Subjects.IConnectableObservable<'Source> = - Observable.Replay( source, bufferSize, window, scheduler ) - + Observable.Replay( source, bufferSize, window, scheduler ) + /// Returns an observable sequence that is the result of apply a map to a connectable observable sequence that /// shares a single subscription to the underlying sequence replaying notifications subject to - /// a maximum element count for the replay buffer. + /// a maximum element count for the replay buffer. let replayMapBuffer ( map ) ( bufferSize:int )( source:IObservable<'Source>) : IObservable<'Result> = - Observable.Replay( source, Func,IObservable<'Result>>map, bufferSize ) + Observable.Replay( source, Func,IObservable<'Result>>map, bufferSize ) /// Returns an observable sequence that is the result of apply a map to a connectable observable sequence that /// shares a single subscription to the underlying sequence replaying notifications subject to /// a maximum time length. let replayMapWindow ( map)( window:TimeSpan )( source:IObservable<'Source>) : IObservable<'Result> = - Observable.Replay( source,Func,IObservable<'Result>> map, window ) + Observable.Replay( source,Func,IObservable<'Result>> map, window ) /// Returns an observable sequence that is the result of apply a map to a connectable observable sequence that /// shares a single subscription to the underlying sequence replaying notifications subject to /// a maximum time length. let replayMapWindowOn (scheduler:Concurrency.IScheduler) ( map)( window:TimeSpan )( source:IObservable<'Source>) : IObservable<'Result> = - Observable.Replay( source,Func,IObservable<'Result>> map, window, scheduler ) + Observable.Replay( source,Func,IObservable<'Result>> map, window, scheduler ) /// Returns an observable sequence that is the result of apply a map to a connectable observable sequence that /// shares a single subscription to the underlying sequence replaying notifications subject to /// a maximum time length and element count for the replay buffer. let replayMapBufferWindow ( map )( bufferSize:int ) ( window:TimeSpan ) ( source:IObservable<'Source>): IObservable<'Result> = - Observable.Replay( source, Func, IObservable<'Result>> map, bufferSize, window ) + Observable.Replay( source, Func, IObservable<'Result>> map, bufferSize, window ) /// Returns an observable sequence that is the result of apply a map to a connectable observable sequence that /// shares a single subscription to the underlying sequence replaying notifications subject to /// a maximum time length and element count for the replay buffer. let replayMapBufferWindowOn (scheduler:Concurrency.IScheduler) ( map )( bufferSize:int ) ( window:TimeSpan ) ( source:IObservable<'Source>): IObservable<'Result> = - Observable.Replay( source, Func, IObservable<'Result>> map, bufferSize, window, scheduler ) + Observable.Replay( source, Func, IObservable<'Result>> map, bufferSize, window, scheduler ) /// Repeats the source observable sequence until it successfully terminates. let retry ( source:IObservable<'Source>) : IObservable<'Source> = @@ -1548,21 +1548,21 @@ module Observable = /// Returns an observable which emits a single value let result x : IObservable<_> = Observable.Return x - + /// Samples the observable at the given interval let sample (interval: TimeSpan) source = Observable.Sample(source, interval) - + /// Samples the observable sequence at each interval, using the specified scheduler to run sampling timers. - /// Upon each sampling tick, the latest element (if any) in the source sequence during the + /// Upon each sampling tick, the latest element (if any) in the source sequence during the /// last sampling interval is sent to the resulting sequence. let sampleOn scheduler interval source = Observable.Sample(source, interval, scheduler) /// Samples the source observable sequence using a samper observable sequence producing sampling ticks. - /// Upon each sampling tick, the latest element (if any) in the source sequence during the + /// Upon each sampling tick, the latest element (if any) in the source sequence during the /// last sampling interval is sent to the resulting sequence. let sampleWith (sampler:IObservable<'Sample>) (source:IObservable<'Source>) : IObservable<'Source> = Observable.Sample( source, sampler ) @@ -1573,7 +1573,7 @@ module Observable = Observable.Scan(source, Func<'a,'a,'a> accumulator ) - /// Applies an accumulator function over an observable sequence and returns each intermediate result. + /// Applies an accumulator function over an observable sequence and returns each intermediate result. /// The specified init value is used as the initial accumulator value. let scanInit (init:'TAccumulate) (accumulator) (source:IObservable<'Source>) : IObservable<'TAccumulate> = Observable.Scan( source, init, Func<'TAccumulate,'Source,'TAccumulate> accumulator ) @@ -1583,15 +1583,15 @@ module Observable = let selectIf condition thenSource = Observable.If( Func condition, thenSource ) - - /// If the condition evaluates true, select the "thenSource" sequence. + + /// If the condition evaluates true, select the "thenSource" sequence. /// Otherwise, return an empty sequence generated on the specified scheduler. let selectIfOn (scheduler:IScheduler) condition thenSource = Observable.If( Func condition, thenSource, scheduler) - /// If the condition evaluates true, select the "thenSource" sequence. Otherwise, select the else source - let selectIfElse condition ( elseSource : IObservable<'Result>) + /// If the condition evaluates true, select the "thenSource" sequence. Otherwise, select the else source + let selectIfElse condition ( elseSource : IObservable<'Result>) ( thenSource : IObservable<'Result>) = Observable.If( Func condition, thenSource, elseSource ) @@ -1614,7 +1614,7 @@ module Observable = let skipSpan (duration:TimeSpan ) (source:IObservable<'Source> ): IObservable<'Source> = Observable.Skip(source, duration) - + /// Skips elements for the specified duration from the start of the observable source sequence, /// using a specified scheduler to run timers. let skipSpanOn (scheduler:IScheduler) duration source = @@ -1622,7 +1622,7 @@ module Observable = /// Bypasses a specified number of elements at the end of an observable sequence. - let skipLast (count:int ) ( source:IObservable<'Source> ): IObservable<'Source> = + let skipLast (count:int ) ( source:IObservable<'Source> ): IObservable<'Source> = Observable.SkipLast (source, count ) @@ -1630,7 +1630,7 @@ module Observable = let skipLastSpan (duration:TimeSpan ) ( source:IObservable<'Source>) : IObservable<'Source> = Observable.SkipLast ( source, duration) - + /// Skips elements for the specified duration from the end of the observable source sequence, /// using the specified scheduler to run timers. let skipLastSpanOn (scheduler:IScheduler) duration source = @@ -1641,12 +1641,12 @@ module Observable = let skipUntil (startTime:DateTimeOffset ) ( source:IObservable<'Source> ) : IObservable<'Source> = Observable.SkipUntil(source, startTime ) - + /// Skips elements from the observable source sequence until the specified start time, /// using the specified scheduler to run timers. let skipUntilOn (scheduler:IScheduler) startTime source = Observable.SkipUntil(source, startTime, scheduler) - + /// Returns the elements from the source observable sequence only after the other observable sequence produces an element. let skipUntilOther ( other:IObservable<'Other> ) ( source:IObservable<'Source> ): IObservable<'Source> = @@ -1656,7 +1656,7 @@ module Observable = /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. let skipWhile ( predicate:'Source -> bool ) ( source:IObservable<'Source> ): IObservable<'Source> = - Observable.SkipWhile ( source, Func<'Source,bool> predicate ) + Observable.SkipWhile ( source, Func<'Source,bool> predicate ) /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. @@ -1665,12 +1665,12 @@ module Observable = Observable.SkipWhile ( source, Func<'Source,int,bool> predicate) - /// Prepends a sequence of values to an observable sequence. - let startWith (values: #seq<'T>) (source: IObservable<'T>) : IObservable<'T> = + /// Prepends a sequence of values to an observable sequence. + let startWith (values: #seq<'T>) (source: IObservable<'T>) : IObservable<'T> = // TODO: re-evaluate wrapping the overload that takes a params array when params are supported by F#. Observable.StartWith( source, values ) - + /// Prepends a sequence of values to an observable sequence. let startWithOn (scheduler:IScheduler) (values:#seq<'T>) source = Observable.StartWith(source, scheduler, values) @@ -1682,16 +1682,16 @@ module Observable = /// Subscribes to the Observable with a next and an error-function. - let subscribeWithError ( onNext : 'T -> unit ) - ( onError : exn -> unit ) + let subscribeWithError ( onNext : 'T -> unit ) + ( onError : exn -> unit ) ( observable : IObservable<'T> ) = observable.Subscribe( Action<_> onNext, Action onError ) - - + + /// Subscribes to the Observable with a next and a completion callback. let subscribeWithCompletion (onNext: 'T -> unit) (onCompleted: unit -> unit) (observable: IObservable<'T>) = observable.Subscribe(Action<_> onNext, Action onCompleted) - + /// Subscribes to the observable with all three callbacks let subscribeWithCallbacks onNext onError onCompleted (observable: IObservable<'T>) = @@ -1703,27 +1703,27 @@ module Observable = observable.Subscribe observer - /// Wraps the source sequence in order to run its subscription and unsubscription logic - /// on the specified scheduler. This operation is not commonly used; This only performs + /// Wraps the source sequence in order to run its subscription and unsubscription logic + /// on the specified scheduler. This operation is not commonly used; This only performs /// the side-effects of subscription and unsubscription on the specified scheduler. /// In order to invoke observer callbacks on a scheduler, use 'observeOn' let subscribeOn (scheduler:Reactive.Concurrency.IScheduler) (source:IObservable<'Source>) : IObservable<'Source> = Observable.SubscribeOn( source, scheduler ) - /// Wraps the source sequence in order to run its subscription and unsubscription logic - /// on the specified SynchronizationContext. This operation is not commonly used; This only performs + /// Wraps the source sequence in order to run its subscription and unsubscription logic + /// on the specified SynchronizationContext. This operation is not commonly used; This only performs /// the side-effects of subscription and unsubscription on the specified scheduler. /// In order to invoke observer callbacks on a scheduler, use 'observeOn' let subscribeOnContext (context:Threading.SynchronizationContext) (source:IObservable<'Source>) : IObservable<'Source> = Observable.SubscribeOn( source, context ) - + /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the /// Subscribe function to the observer's 'OnError channel. This function is typically used to write query operators. let subscribeSafe onNext (source : IObservable<_>) = source.SubscribeSafe (Observer.Create (Action<_> onNext, Action<_> ignore, Action ignore)) - + /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the /// Subscribe function to the observer's 'OnError channel. This function is typically used to write query operators. let subscribeSafeWithError onNext onError (source : IObservable<_>) = @@ -1748,22 +1748,22 @@ module Observable = source.SubscribeSafe (Observer.Create (Action<_> onNext, Action<_> onError, Action onCompleted)) - /// Transforms an observable sequence of observable sequences into an - /// observable sequence producing values only from the most recent + /// Transforms an observable sequence of observable sequences into an + /// observable sequence producing values only from the most recent /// observable sequence.Each time a new inner observable sequnce is recieved, /// unsubscribe from the previous inner sequence - let switch (sources:IObservable>) : IObservable<'Source>= + let switch (sources:IObservable>) : IObservable<'Source>= Observable.Switch(sources) - /// Transforms an observable sequence of tasks into an observable sequence + /// Transforms an observable sequence of tasks into an observable sequence /// producing values only from the most recent observable sequence. /// Each time a new task is received, the previous task's result is ignored. let switchTask (sources: IObservable>) : IObservable<'Source> = Observable.Switch( sources ) - /// Transforms an observable sequence of F# Asyncs into an observable sequence + /// Transforms an observable sequence of F# Asyncs into an observable sequence /// producing values only from the most recent Async. /// Each time a new Async is received, the previous Async is cancelled, /// and will not continue to run in the background. @@ -1772,29 +1772,29 @@ module Observable = /// Synchronizes the observable sequence so that notifications cannot be delivered concurrently - /// this overload is useful to "fix" an observable sequence that exhibits concurrent + /// this overload is useful to "fix" an observable sequence that exhibits concurrent /// callbacks on individual observers, which is invalid behavior for the query processor - let synchronize source : IObservable<'Source>= + let synchronize source : IObservable<'Source>= Observable.Synchronize( source ) - /// Synchronizes the observable sequence such that observer notifications - /// cannot be delivered concurrently, using the specified gate object.This - /// overload is useful when writing n-ary query operators, in order to prevent + /// Synchronizes the observable sequence such that observer notifications + /// cannot be delivered concurrently, using the specified gate object.This + /// overload is useful when writing n-ary query operators, in order to prevent /// concurrent callbacks from different sources by synchronizing on a common gate object. let synchronizeGate (gate:obj) (source:IObservable<'Source>): IObservable<'Source> = Observable.Synchronize( source, gate ) - + /// Bypasses the first element in an observable sequence and then returns the remaining elements. let tail source = skip 1 source - + /// Takes n elements (from the beginning of an observable sequence? ) - let take (n: int) source : IObservable<'Source> = - Observable.Take(source, n) + let take (n: int) source : IObservable<'Source> = + Observable.Take(source, n) + - /// Returns a specified number of contiguous elemenents from the start of an observable sequence, /// using the specified scheduler for the edge case of take(0). let takeOn (scheduler:IScheduler) (n:int) source = @@ -1805,7 +1805,7 @@ module Observable = let takeSpan (duration:TimeSpan) source = Observable.Take( source, duration ) - + /// Takes elements for a specified duration from the start of the observable source sequence, /// using the specified scheduler to run timers. let takeSpanOn (scheduler:IScheduler) (duration:TimeSpan) source = @@ -1813,7 +1813,7 @@ module Observable = /// Returns a specified number of contiguous elements from the end of an obserable sequence - let takeLast ( count:int ) source = + let takeLast ( count:int ) source = Observable.TakeLast(source, count) @@ -1827,7 +1827,7 @@ module Observable = let takeLastSpan ( duration:TimeSpan ) ( source:IObservable<'Source> ): IObservable<'Source> = Observable.TakeLast( source, duration ) - + /// Returns elements within the specified duration from the end of the observable source sequence, /// using the specified scheduler to run timers. let takeLastSpanOn (scheduler:IScheduler) (duration:TimeSpan) source = @@ -1838,7 +1838,7 @@ module Observable = let takeLastBuffer ( duration:TimeSpan )( source:IObservable<'Source> ): IObservable> = Observable.TakeLastBuffer( source, duration ) - + /// Returns a list with the elements within the specified duration from the end of the observable source sequence, /// using the specified scheduler to run timers. let takeLastBufferOn (scheduler:IScheduler) duration source = @@ -1900,21 +1900,21 @@ module Observable = Observable.Throw( ex, witness=witness ) - /// Returns an observable sequence that terminates with an exception, + /// Returns an observable sequence that terminates with an exception, /// using the specified scheduler to send out the single OnError message. let throwOn scheduler ex = Observable.Throw( ex, scheduler=scheduler ) - - /// Returns an observable sequence that terminates with an exception, + + /// Returns an observable sequence that terminates with an exception, /// using the specified scheduler to send out the single OnError message. let throwWitnessOn scheduler witeness ex = Observable.Throw( ex, witeness, scheduler ) - /// matches when the observable sequence has an available element and + /// matches when the observable sequence has an available element and /// applies the map - let thenMap map source = + let thenMap map source = Observable.Then( source, Func<'Source,'Result> map ) @@ -1934,7 +1934,7 @@ module Observable = let timeout ( timeout:System.DateTimeOffset ) ( source:IObservable<'Source>) = Observable.Timeout( source, timeout) - + /// Applies a timeout policy to the observable sequence based on an absolute time, using the specified scheduler to run timeout timers. /// If the sequence doesn't terminate before the specified absolute due time, a TimeoutException is propagated to the observer. let timeoutOn (scheduler:IScheduler) (timeout:DateTimeOffset) source = @@ -1942,15 +1942,15 @@ module Observable = /// Applies a timeout policy to the observable sequence based on an absolute time. - /// If the sequence doesn't terminate before the specified absolute due time, the other + /// If the sequence doesn't terminate before the specified absolute due time, the other /// observable sequence is used to produce future messages from that point on. let timeoutOther ( timeout:System.DateTimeOffset ) ( other:IObservable<'Source>) ( source:IObservable<'Source>) = Observable.Timeout( source, timeout, other) - + /// Applies a timeout policy to the observable sequence based on an absolute time, /// using the specified scheduler to run timeout timers. - /// If the sequence doesn't terminate before the specified absolute due time, the other + /// If the sequence doesn't terminate before the specified absolute due time, the other /// observable sequence is used to produce future messages from that point on. let timeoutOtherOn (scheduler:IScheduler) (timeout:DateTimeOffset) other source = Observable.Timeout( source, timeout, other, scheduler ) @@ -1961,24 +1961,24 @@ module Observable = /// predecessor, a TimeoutException is propagated to the observer. let timeoutSpan ( timeout:TimeSpan ) ( source:IObservable<'Source> ) = Observable.Timeout( source, timeout) - + /// Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. /// If the next element isn't received within the specified timeout duration starting from its /// predecessor, a TimeoutException is propagated to the observer. let timeoutSpanOn (scheduler:IScheduler) (timeout:TimeSpan) source = - Observable.Timeout( source, timeout, scheduler ) + Observable.Timeout( source, timeout, scheduler ) /// Applies a timeout policy for each element in the observable sequence. - /// If the next element isn't received within the specified timeout duration starting from + /// If the next element isn't received within the specified timeout duration starting from /// its predecessor, the other observable sequence is used to produce future messages from that point on. let timeoutSpanOther( timeout:TimeSpan ) ( other:IObservable<'Source> ) ( source:IObservable<'Source> ) = Observable.Timeout( source, timeout, other) - + /// Applies a timeout policy for each element in the observable sequence, using the specified scheduler to run timeout timers. - /// If the next element isn't received within the specified timeout duration starting from + /// If the next element isn't received within the specified timeout duration starting from /// its predecessor, the other observable sequence is used to produce future messages from that point on. let timeoutSpanOtherOn (scheduler:IScheduler) (timeout:TimeSpan) other source = Observable.Timeout( source, timeout, other, scheduler) @@ -1991,12 +1991,12 @@ module Observable = Observable.Timeout( source, Func<'Source,IObservable<'Timeout>> durationSelector ) - /// Applies a timeout policy to the observable sequence based on an initial timeout duration + /// Applies a timeout policy to the observable sequence based on an initial timeout duration /// for the first element, and a timeout duration computed for each subsequent element. - /// If the next element isn't received within the computed duration starting from its predecessor, + /// If the next element isn't received within the computed duration starting from its predecessor, /// a TimeoutException is propagated to the observer. - let timeout2Duration ( timeout:IObservable<'Timeout> ) - ( durationSelector ) + let timeout2Duration ( timeout:IObservable<'Timeout> ) + ( durationSelector ) ( source:IObservable<'Source> ) = Observable.Timeout( source, timeout, Func<'Source, IObservable<'Timeout>> durationSelector) @@ -2004,11 +2004,11 @@ module Observable = /// Applies a timeout policy to the observable sequence based on an initial timeout duration for the first /// element, and a timeout duration computed for each subsequent element. - /// If the next element isn't received within the computed duration starting from its predecessor, + /// If the next element isn't received within the computed duration starting from its predecessor, /// the other observable sequence is used to produce future messages from that point on. - let timeout2DurationOther ( timeout: IObservable<'Timeout>) + let timeout2DurationOther ( timeout: IObservable<'Timeout>) ( durationSelector ) - ( other : IObservable<'Source> ) + ( other : IObservable<'Source> ) ( source : IObservable<'Source> ) = Observable.Timeout( source, timeout, Func<'Source, IObservable<'Timeout>> durationSelector, other) @@ -2020,7 +2020,7 @@ module Observable = let timer ( dueTime:DateTimeOffset ) : IObservable = Observable.Timer( dueTime ) - + /// Returns an observable sequence that produces a single value at the specified absolute due time, /// using the specified scheduler to run the timer. let timerOn (scheduler:IScheduler) (dueTime:DateTimeOffset) = @@ -2033,7 +2033,7 @@ module Observable = /// Returns an observable sequence that produces a single value after the specified relative due time has elapsed. - let timerSpan ( dueTime:TimeSpan ) : IObservable = + let timerSpan ( dueTime:TimeSpan ) : IObservable = Observable.Timer( dueTime ) @@ -2069,11 +2069,11 @@ module Observable = /// Creates an array from an observable sequence. - let toArray source = + let toArray source = Observable.ToArray(source) /// Creates an observable sequence according to a specified key selector function - let toDictionary keySelector source = + let toDictionary keySelector source = Observable.ToDictionary(source, Func<_,_> keySelector) @@ -2082,25 +2082,25 @@ module Observable = /// and an a comparer let toDictionaryComparer (keySelector:'Source->'Key) (comparer:'Key) (source:'Source) = Observable.ToDictionary( source, keySelector, comparer ) - + /// Creates an observable sequence according to a specified key selector function let toDictionaryElements (keySelector:'Source->'Key )(elementSelector:'Source->'Elm) (source:'Source) = - Observable.ToDictionary(source, keySelector, elementSelector) + Observable.ToDictionary(source, keySelector, elementSelector) /// Creates an observable sequence according to a specified key selector function let toDictionaryCompareElements ( keySelector : 'Source -> 'Key ) - ( elementSelector: 'Source ->' Elm ) + ( elementSelector: 'Source ->' Elm ) ( comparer:'Key ) ( source:'Source ) = - Observable.ToDictionary( source , - Func<'Source,'Key> keySelector , - Func<'Source,'Elm> elementSelector , - comparer ) - + Observable.ToDictionary( source , + Func<'Source,'Key> keySelector , + Func<'Source,'Elm> elementSelector , + comparer ) + /// Exposes an observable sequence as an object with an Action based .NET event - let toEvent (source:IObservable) = + let toEvent (source:IObservable) = Observable.ToEvent(source) @@ -2108,9 +2108,9 @@ module Observable = let toEventType ( source:IObservable<'Source> ) : IEventSource<'Source> = Observable.ToEvent(source) - + /// Creates a list from an observable sequence - let toList source = + let toList source = Observable.ToList(source) @@ -2138,15 +2138,15 @@ module Observable = let toObservable ( source: seq<'T> ) = Observable.ToObservable(source) - /// Constructs an observable sequence that depends on a resource object, whose + /// Constructs an observable sequence that depends on a resource object, whose /// lifetime is tied to the resulting observable sequence's lifetime. let using ( resourceFactory: unit ->'TResource ) (observableFactory: 'TResource -> IObservable<'Result> ) : IObservable<'Result> = Observable.Using ( Func<_> resourceFactory, Func<_,_> observableFactory ) - - /// Constructs an observable sequence that depends on a resource object, whose + + /// Constructs an observable sequence that depends on a resource object, whose /// lifetime is tied to the resulting observable sequence's lifetime. - /// The resource is obtained and used through asynchronous functions. + /// The resource is obtained and used through asynchronous functions. /// The cancellation token passed to the asyncrhonous functions is tied to the returned disposable subscription, /// allowing best-effor cancellation at any stage of the resource acquisition or usage. let usingAsync resourceFactory observableFactory = @@ -2156,31 +2156,31 @@ module Observable = /// waits for the observable sequence to complete and returns the last /// element of the sequence. If the sequence terminates with OnError /// notification, the exception is thrown - let wait source = + let wait source = Observable.Wait( source ) /// Repeats the given function as long as the specified condition holds - /// where the condition is evaluated before each repeated source is + /// where the condition is evaluated before each repeated source is /// subscribed to - let whileLoop condition source = - Observable.While( Func condition, source ) - + let whileLoop condition source = + Observable.While( Func condition, source ) + /// Projects each element of an observable sequence into consecutive non-overlapping windows. - /// windowClosingSelector - A function invoked to define the boundaries of the produced windows. + /// windowClosingSelector - A function invoked to define the boundaries of the produced windows. /// A new window is started when the previous one is closed let window ( windowClosingSelector ) ( source:IObservable<'Source> ) : IObservable> = Observable.Window( source, Func> windowClosingSelector) - /// Projects each element of an observable sequence into consecutive non-overlapping windows + /// Projects each element of an observable sequence into consecutive non-overlapping windows /// which are produced based on timing information. let windowTimeSpan ( timeSpan:TimeSpan )( source:IObservable<'Source> ) : IObservable> = Observable.Window( source, timeSpan ) - /// Projects each element of an observable sequence into consecutive non-overlapping windows + /// Projects each element of an observable sequence into consecutive non-overlapping windows /// which are produced based on timing information, using the specified scheduler to run timers. let windowTimeSpanOn (scheduler:IScheduler) timeSpan source = Observable.Window( source, timeSpan, scheduler ) @@ -2200,7 +2200,7 @@ module Observable = let windowTimeShift ( timeSpan:TimeSpan )( timeShift:TimeSpan )( source:IObservable<'Source> ) : IObservable> = Observable.Window( source, timeSpan, timeShift ) - + /// Projects each element of an observable sequence into consecutive non-overlapping windows, using the specified scheduler to run timers. /// windowBoundaries - Sequence of window boundary markers. The current window is closed and a new window is opened upon receiving a boundary marker. let windowTimeShiftOn (scheduler:IScheduler) (timeSpan:TimeSpan) (timeShift:TimeSpan) source = @@ -2208,7 +2208,7 @@ module Observable = /// Projects each element of an observable sequence into consecutive non-overlapping windows - /// windowBoundaries - Sequence of window boundary markers. The current window is closed + /// windowBoundaries - Sequence of window boundary markers. The current window is closed /// and a new window is opened upon receiving a boundary marker let windowBounded ( windowBoundaries:IObservable<'WindowBoundary> )( source:IObservable<'Source> ) : IObservable> = Observable.Window( source, windowBoundaries ) @@ -2219,23 +2219,23 @@ module Observable = Observable.Window( source, count, skip ) - /// Projects each element of an observable sequence into consecutive non-overlapping windows + /// Projects each element of an observable sequence into consecutive non-overlapping windows /// which are produced based on element count information. let windowCount ( count:int )( source:IObservable<'Source> ) : IObservable> = Observable.Window( source, count ) - /// Projects each element of an observable sequence into a window that is completed when either it's full or + /// Projects each element of an observable sequence into a window that is completed when either it's full or /// a given amount of time has elapsed. - /// A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are + /// A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are /// taken, or at the scheduled time of departure, whichever event occurs first let windowTimeCount ( timeSpan:TimeSpan ) (count:int) ( source:IObservable<'Source> ): IObservable> = Observable.Window( source, timeSpan, count ) - - /// Projects each element of an observable sequence into a window that is completed when either it's full or + + /// Projects each element of an observable sequence into a window that is completed when either it's full or /// a given amount of time has elapsed, using the specified scheduler to run timers. - /// A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are + /// A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are /// taken, or at the scheduled time of departure, whichever event occurs first let windowTimeCountOn (scheduler:IScheduler) (timeSpan:TimeSpan) (count:int) source = Observable.Window( source, timeSpan, count, scheduler) @@ -2265,25 +2265,25 @@ module Observable = /// Merges the specified observable sequences into one observable sequence by emitting a /// list with the elements of the observable sequences at corresponding indexes. - let zipSeq ( sources:seq>) : IObservable> = + let zipSeq ( sources:seq>) : IObservable> = Observable.Zip( sources ) - /// Merges the specified observable sequences into one observable sequence by emitting + /// Merges the specified observable sequences into one observable sequence by emitting /// a list with the elements of the observable sequences at corresponding indexe let zipArray ( sources:IObservable<'Source> []) : IObservable> = Observable.Zip( sources ) - - /// Merges the specified observable sequences into one observable sequence by using - /// the selector function whenever all of the observable sequences have produced an + + /// Merges the specified observable sequences into one observable sequence by using + /// the selector function whenever all of the observable sequences have produced an /// element at a corresponding index. let zipSeqMap ( resultSelector: IList<'S> ->'R) ( sources: seq>) : IObservable<'R> = Observable.Zip( sources, Func,'R> resultSelector) - - - /// Merges an observable sequence and an enumerable sequence into one + + + /// Merges an observable sequence and an enumerable sequence into one /// observable sequence by using the selector function. let zipWithSeq ( resultSelector: 'Source1 -> 'Source2 -> 'Result ) ( second : seq<'Source2> ) @@ -2294,7 +2294,7 @@ module Observable = (*************************************************************** * F# extra combinator wrappers for Rx extensions ***************************************************************) - + /// **Description** /// Periodically repeats the observable sequence exposing a responses or failures. /// @@ -2327,7 +2327,7 @@ module Observable = /// - `source` - The source observable to take a subset from. let choose f source = Observable.Create (fun (o : IObserver<_>) -> - subscribeSafeWithCallbacks + subscribeSafeWithCallbacks (fun x -> Option.iter o.OnNext (try f x with ex -> o.OnError ex; None)) o.OnError o.OnCompleted @@ -2352,7 +2352,7 @@ module Observable = /// Projects each source value to an Observable which is merged in the output /// Observable only if the previous projected Observable has completed. - // + // /// **Returns** /// Returns an Observable that emits items based on applying a function that you /// supply to each item emitted by the source Observable, where that function @@ -2363,7 +2363,7 @@ module Observable = /// that one completes, it will accept and flatten the next projected Observable /// and repeat this process. let exhaustMap f source = - Observable.Create (fun (o : IObserver<_>) -> + Observable.Create (fun (o : IObserver<_>) -> let mutable hasSubscription = false let mutable innerSub = None let onInnerCompleted () = @@ -2371,8 +2371,8 @@ module Observable = innerSub |> Option.iter Disposable.dispose let onOuterNext x = if not hasSubscription then - hasSubscription <- true - f x |> subscribeSafeWithCallbacks + hasSubscription <- true + f x |> subscribeSafeWithCallbacks o.OnNext o.OnError onInnerCompleted |> fun y -> innerSub <- Some y source @@ -2387,11 +2387,11 @@ module Observable = let private serveInternal sch max callOnError sourceFactory = Observable.Create (fun (o : IObserver<'a>) -> let subscribeSource self = - let onError ex = + let onError ex = if callOnError ex then o.OnError ex - else self () + else self () sourceFactory () - |> subscribeSafeWithCallbacks + |> subscribeSafeWithCallbacks o.OnNext onError self let com = Disposable.Composite @@ -2400,7 +2400,7 @@ module Observable = com.Add current sch |> Schedule.actionRec (fun self -> current - |> Disposable.setIndirectly + |> Disposable.setIndirectly (fun () -> subscribeSource self)) |> com.Add com :> IDisposable) @@ -2415,7 +2415,7 @@ module Observable = /// - `max` - The maximum number of observables to be subscribed simultaneously. /// - `callOnError` - Function to determine whether or not the given exception should call the `OnError`. /// - `sourceFactory` - Function that returns the observable to invoke concurrently. - let serveGateCustomOn sch gate max callOnError sourceFactory = + let serveGateCustomOn sch gate max callOnError sourceFactory = serveInternal sch max callOnError sourceFactory |> synchronizeGate gate /// **Description** @@ -2449,34 +2449,34 @@ module Observable = /// **Parameters** /// - `max` - The maximum number of observables to be subscribed simultaneously. /// - `sourceFactory` - Function that returns the observable to invoke concurrently. - let serve max sourceFactory = + let serve max sourceFactory = serveOn Scheduler.CurrentThread max sourceFactory /// Generates a sequence using the producer/consumer pattern. /// The purpose of the source sequence is simply to notify the consumer when out-of-band data becomes available. - /// The data in the source sequence provides additional information to the function, + /// The data in the source sequence provides additional information to the function, /// but it does not have to be the actual data being produced. - /// + /// /// The function is not necessarily called for every value in the source sequence. - /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures + /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures /// that only one consumer is active at any given time, but it also means that the function is not guaranteed /// to receive every value in the source sequence; therefore, the function must read /// data from out-of-band storage instead; e.g., from a shared stream or queue. - /// - /// The function may also be called when data is not available. For example, if the current consuming + /// + /// The function may also be called when data is not available. For example, if the current consuming /// observable completes and additional notifications from the source were received, then the function - /// is called again to check whether new data was missed. This avoids a race condition between the source sequence - /// and the consuming observable's completion notification. If no data is available when function is called, then - /// an empty sequence should be returned and the function will not be called again until another notification is observed + /// is called again to check whether new data was missed. This avoids a race condition between the source sequence + /// and the consuming observable's completion notification. If no data is available when function is called, then + /// an empty sequence should be returned and the function will not be called again until another notification is observed /// from the source. /// /// Producers and the single active consumer are intended to access shared objects concurrently, yet it remains their responsibility - /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, + /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, /// a producer/consumer implementation that uses an in-memory queue must manually ensure that reads and writes to the queue are thread-safe. /// - /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, + /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, /// merge them together using the merge operator, and use the merged observable as the source argument in the consume operator. - /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. + /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. /// Just be sure that the source sequence is hot so that each subscription will consume based on the same producers' notifications. /// /// ## Parameters @@ -2486,11 +2486,11 @@ module Observable = /// ## Returns /// An observable sequence that is the concatenation of all subscriptions to the consumer observable. let consumeMap (f : 'a -> IObservable<'b>) (source : IObservable<'a>) = - Observable.Create (fun (o : IObserver<_>) -> + Observable.Create (fun (o : IObserver<_>) -> let gate = obj () let consumingSubscription = new SerialDisposable () let schedule = new SerialDisposable () - + let mutable lastSkipped = Unchecked.defaultof<'a> let mutable hasSkipped = false let mutable consuming = false @@ -2498,24 +2498,24 @@ module Observable = let onNext value = lock gate (fun () -> - if consuming + if consuming then lastSkipped <- value hasSkipped <- true else consuming <- true hasSkipped <- false) - + let mutable additionalData = value let scheduleRec self = let onCompleted () = - let consumeAgain, completeNow = - lock gate (fun () -> + let consumeAgain, completeNow = + lock gate (fun () -> consuming <- hasSkipped if consuming then additionalData <- lastSkipped hasSkipped <- false true, false else false, stopped) - + if consumeAgain then self () if completeNow then o.OnCompleted () @@ -2526,11 +2526,11 @@ module Observable = |> Disposable.setIndirectly (fun () -> subscribeSafeWithCallbacks o.OnNext o.OnError onCompleted xs)) - + Scheduler.Immediate |> Schedule.actionRec scheduleRec |> Disposable.setInnerDisposalOf schedule - + let onCompleted () = let completeNow = lock gate (fun () -> stopped <- true; consuming) @@ -2544,29 +2544,29 @@ module Observable = /// Generates a sequence using the producer/consumer pattern. /// The purpose of the source sequence is simply to notify the consumer when out-of-band data becomes available. - /// The data in the source sequence provides additional information to the function, + /// The data in the source sequence provides additional information to the function, /// but it does not have to be the actual data being produced. - /// + /// /// The function is not necessarily called for every value in the source sequence. - /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures + /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures /// that only one consumer is active at any given time, but it also means that the function is not guaranteed /// to receive every value in the source sequence; therefore, the function must read /// data from out-of-band storage instead; e.g., from a shared stream or queue. - /// - /// The function may also be called when data is not available. For example, if the current consuming + /// + /// The function may also be called when data is not available. For example, if the current consuming /// observable completes and additional notifications from the source were received, then the function - /// is called again to check whether new data was missed. This avoids a race condition between the source sequence - /// and the consuming observable's completion notification. If no data is available when function is called, then - /// an empty sequence should be returned and the function will not be called again until another notification is observed + /// is called again to check whether new data was missed. This avoids a race condition between the source sequence + /// and the consuming observable's completion notification. If no data is available when function is called, then + /// an empty sequence should be returned and the function will not be called again until another notification is observed /// from the source. /// /// Producers and the single active consumer are intended to access shared objects concurrently, yet it remains their responsibility - /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, + /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, /// a producer/consumer implementation that uses an in-memory queue must manually ensure that reads and writes to the queue are thread-safe. /// - /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, + /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, /// merge them together using the merge operator, and use the merged observable as the source argument in the consume operator. - /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. + /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. /// Just be sure that the source sequence is hot so that each subscription will consume based on the same producers' notifications. /// /// ## Parameters @@ -2576,32 +2576,32 @@ module Observable = /// ## Returns /// An observable sequence that is the concatenation of all subscriptions to the consumer observable. let consume consumer source = consumeMap (fun _ -> consumer) source - + /// Generates a sequence using the producer/consumer pattern. /// The purpose of the source sequence is simply to notify the consumer when out-of-band data becomes available. - /// The data in the source sequence provides additional information to the function, + /// The data in the source sequence provides additional information to the function, /// but it does not have to be the actual data being produced. - /// + /// /// The function is not necessarily called for every value in the source sequence. - /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures + /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures /// that only one consumer is active at any given time, but it also means that the function is not guaranteed /// to receive every value in the source sequence; therefore, the function must read /// data from out-of-band storage instead; e.g., from a shared stream or queue. - /// - /// The function may also be called when data is not available. For example, if the current consuming + /// + /// The function may also be called when data is not available. For example, if the current consuming /// observable completes and additional notifications from the source were received, then the function - /// is called again to check whether new data was missed. This avoids a race condition between the source sequence - /// and the consuming observable's completion notification. If no data is available when function is called, then - /// an empty sequence should be returned and the function will not be called again until another notification is observed + /// is called again to check whether new data was missed. This avoids a race condition between the source sequence + /// and the consuming observable's completion notification. If no data is available when function is called, then + /// an empty sequence should be returned and the function will not be called again until another notification is observed /// from the source. /// /// Producers and the single active consumer are intended to access shared objects concurrently, yet it remains their responsibility - /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, + /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, /// a producer/consumer implementation that uses an in-memory queue must manually ensure that reads and writes to the queue are thread-safe. /// - /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, + /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, /// merge them together using the merge operator, and use the merged observable as the source argument in the consume operator. - /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. + /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. /// Just be sure that the source sequence is hot so that each subscription will consume based on the same producers' notifications. /// /// ## Parameters @@ -2612,41 +2612,41 @@ module Observable = /// An observable sequence that is the concatenation of the values returned by the consumeNext function. let consumeNextOn sch f source = let rec onNext (o : IObserver<_>) x = - try f x + try f x with ex -> o.OnError ex; None |> function | Some next -> o.OnNext next; onNext o x | None -> o.OnCompleted () source |> consumeMap (fun x -> - Observable.Create (fun o -> + Observable.Create (fun o -> Schedule.action (fun () -> onNext o x) sch)) /// Generates a sequence using the producer/consumer pattern. /// The purpose of the source sequence is simply to notify the consumer when out-of-band data becomes available. - /// The data in the source sequence provides additional information to the function, + /// The data in the source sequence provides additional information to the function, /// but it does not have to be the actual data being produced. - /// + /// /// The function is not necessarily called for every value in the source sequence. - /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures + /// It is only called if the previous consumer's observable has completed; otherwise, the current notification is ignored. This ensures /// that only one consumer is active at any given time, but it also means that the function is not guaranteed /// to receive every value in the source sequence; therefore, the function must read /// data from out-of-band storage instead; e.g., from a shared stream or queue. - /// - /// The function may also be called when data is not available. For example, if the current consuming + /// + /// The function may also be called when data is not available. For example, if the current consuming /// observable completes and additional notifications from the source were received, then the function - /// is called again to check whether new data was missed. This avoids a race condition between the source sequence - /// and the consuming observable's completion notification. If no data is available when function is called, then - /// an empty sequence should be returned and the function will not be called again until another notification is observed + /// is called again to check whether new data was missed. This avoids a race condition between the source sequence + /// and the consuming observable's completion notification. If no data is available when function is called, then + /// an empty sequence should be returned and the function will not be called again until another notification is observed /// from the source. /// /// Producers and the single active consumer are intended to access shared objects concurrently, yet it remains their responsibility - /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, + /// to ensure thread-safety. The consume operator cannot do so without breaking concurrency. For example, /// a producer/consumer implementation that uses an in-memory queue must manually ensure that reads and writes to the queue are thread-safe. /// - /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, + /// Multiple producers are supported. Simply create an observable sequence for each producer that notifies when data is generated, /// merge them together using the merge operator, and use the merged observable as the source argument in the consume operator. - /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. + /// Multiple consumers are supported by calling consume once and then calling subscribe multiple times on the cold observable that is returned. /// Just be sure that the source sequence is hot so that each subscription will consume based on the same producers' notifications. /// /// ## Parameters diff --git a/src/FSharp.Control.Reactive/Scheduler.fs b/src/FSharp.Control.Reactive/Scheduler.fs index 5c686d8..6ca94f8 100644 --- a/src/FSharp.Control.Reactive/Scheduler.fs +++ b/src/FSharp.Control.Reactive/Scheduler.fs @@ -12,9 +12,9 @@ open System.Threading.Tasks module Scheduler = /// Returns the 'ISchedulerLongRunning' implementation of the specified scheduler, or 'None' if no such implementation is available. - let asLongRunning (sch : IScheduler) = - match sch.AsLongRunning () with - | null -> None + let asLongRunning (sch : IScheduler) = + match sch.AsLongRunning () with + | null -> None | x -> Some x type Scheduler () = @@ -29,7 +29,7 @@ module Scheduler = module Schedule = /// Schedule multiple 'schedule' operations together - let multiple (fs : (IScheduler -> IDisposable) list) sch = + let multiple (fs : (IScheduler -> IDisposable) list) sch = List.map (fun f -> f sch) fs |> Disposables.compose (*************************************************************** @@ -42,7 +42,7 @@ module Schedule = /// Schedules an function to be executed at a specified absolute time. let actionOffset offset f sch = Scheduler.Schedule (sch, (offset : DateTimeOffset), Action f) - + /// Schedules an function to be executed. let actionLong f sch = Scheduler.ScheduleLongRunning (sch, Action f) @@ -106,7 +106,7 @@ module Schedule = /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. /// Otherwise, the periodic task will be emulated using recursive scheduling. - let periodicAccAction state period f sch = + let periodicAccAction state period f sch = Scheduler.SchedulePeriodic (sch, state, period, Action<_> f) /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. @@ -122,14 +122,14 @@ module Schedule = /// Returns a scheduler that represents the original scheduler, without any of its interface-based optimizations (e.g. long running scheduling). let disableOptimizations = Scheduler.DisableOptimizations - + /// Returns a scheduler that represents the original scheduler, without any of its interface-based optimizations (e.g. long running scheduling). let disableOptimizationsTypes (optimizationInterfaces : Type list) sch = Scheduler.DisableOptimizations (sch, optimizationInterfaces |> List.toArray) /// Returns a scheduler that wraps the original scheduler, adding exception handling for scheduled actions. let catch f sch = Scheduler.Catch (sch, Func<_, _> f) - + (*************************************************************** * Async ***************************************************************) @@ -153,7 +153,7 @@ module Schedule = /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling 'sleep' to schedule the remainder of the current work item (known as the continuation) after the specified duration. let sleep dueTime sch = Scheduler.Sleep (sch, (dueTime : TimeSpan)) |> asAsync - + /// Suspends execution of the current work item on the scheduler for the specified duration. /// The caller should await the result of calling 'sleep' to schedule the remainder of the current work item (known as the continuation) after the specified duration. let sleepCancel dueTime ct sch = Scheduler.Sleep (sch, (dueTime : TimeSpan), ct) |> asAsync @@ -178,8 +178,8 @@ module Schedule = /// Schedules the work using an asynchonous function, allowing for cooperative scheduling in a imperative coding style. let async f sch = - let ff = Func (fun sc ct -> - f sc ct |> asTask ct :> Task) + let ff = Func (fun sc ct -> + f sc ct |> asTask ct :> Task) Scheduler.ScheduleAsync (sch, ff) /// Schedules the work using an asynchonous function, allowing for cooperative scheduling in a imperative coding style. @@ -190,7 +190,7 @@ module Schedule = /// Schedules the work using an asynchonous function, allowing for cooperative scheduling in a imperative coding style. let asyncAccUnit state f sch = - let ff = Func (fun sc st ct -> + let ff = Func (fun sc st ct -> f sc st ct |> asTask ct :> Task) Scheduler.ScheduleAsync (sch, state, ff) @@ -205,7 +205,7 @@ module Schedule = let ff = Func (fun sc ct -> f sc ct |> asTask ct :> Task) Scheduler.ScheduleAsync (sch, (dueTime : TimeSpan), ff) - + /// Schedules the work using an asynchonous function, allowing for cooperative scheduling in a imperative coding style. let asyncSpanResult dueTime f sch = let ff = Func> (fun sc ct -> @@ -226,7 +226,7 @@ module Schedule = /// Schedules the work using an asynchonous function, allowing for cooperative scheduling in a imperative coding style. let asyncAccSpanUnit state dueTime f sch = - let ff = Func (fun sc st ct -> + let ff = Func (fun sc st ct -> f sc st ct |> asTask ct :> Task) Scheduler.ScheduleAsync (sch, state, (dueTime : TimeSpan), ff) @@ -248,4 +248,3 @@ module Schedule = f sc st ct |> asTask ct :> Task<_>) Scheduler.ScheduleAsync (sch, state, (dueTime : DateTimeOffset), ff) - \ No newline at end of file diff --git a/src/FSharp.Control.Reactive/Subject.fs b/src/FSharp.Control.Reactive/Subject.fs index 43c4727..0c9b5de 100644 --- a/src/FSharp.Control.Reactive/Subject.fs +++ b/src/FSharp.Control.Reactive/Subject.fs @@ -3,20 +3,20 @@ open System.Reactive.Subjects type Subject<'a> private () = - + /// Represents and object that is both an observable sequence as well as an observer. /// Each notification is broadcasted to all subscribed observers. - static member broadcast + static member broadcast with get () = new System.Reactive.Subjects.Subject<'a> () /// Represents the result of an asynchronous operation. /// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers. - static member async + static member async with get () = new AsyncSubject<'a> () /// Represents an object that is both an observable sequence as well as an observer. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. - static member replay + static member replay with get () = new ReplaySubject<'a> () /// Represents a value that changes over time. @@ -34,7 +34,7 @@ module Subject = /// Notifies all subscribed observers about the arrival of the specified elements in the sequence. let onNexts xs s = Seq.iter (fun x -> onNext x s |> ignore) xs; s - + /// Notifies all subscribed observers about the specified exception. let onError ex (s : SubjectBase<'a>) = s.OnError ex; s @@ -42,7 +42,7 @@ module Subject = /// Notifies all subscribed observers about the end of the sequence. let onCompleted (s : SubjectBase<'a>) = s.OnCompleted (); s - + /// An empty subject representing a null-sink /// combining an Observable.empty and Observer.empty let empty = { new System.Reactive.Subjects.ISubject<_> with @@ -53,12 +53,12 @@ module Subject = member _.OnNext(_) = () member _.Subscribe(_) = Disposable.empty } - + /// An empty subject representing a null-sink /// for IObserver<'source> and IObservable<'result> let empty2 = { new System.Reactive.Subjects.ISubject<_,_> with member _.OnCompleted() = () member _.OnError(_) = () member _.OnNext(_) = () - member _.Subscribe(_) = Disposable.empty - } + member _.Subscribe(_) = Disposable.empty + }