Skip to content

Commit

Permalink
Merge pull request #8 from Froussios/Polishing-content
Browse files Browse the repository at this point in the history
Polished book content
  • Loading branch information
Froussios committed Jul 6, 2015
2 parents f8f024b + b7805ae commit 4f1ea02
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 37 deletions.
8 changes: 4 additions & 4 deletions Part 2 - Sequence Basics/1. Creating a sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ var ob = Observable.Create<string>(

Now when a consumer disposes of their subscription, the underlying `Timer` will be disposed of too.

`Observable.Creat`e also has an overload that requires your `Func` to return an `Action` instead of an `IDisposable`. In a similar example to above, this one shows how you could use an action to un-register the event handler, preventing a memory leak by retaining the reference to the timer.
`Observable.Create` also has an overload that requires your `Func` to return an `Action` instead of an `IDisposable`. In a similar example to above, this one shows how you could use an action to un-register the event handler, preventing a memory leak by retaining the reference to the timer.

```C#
//Example code only
Expand Down Expand Up @@ -555,7 +555,7 @@ There is also an [overload that converts a Task (non generic) to an `IObservable

### `FromAsync`

Another convenient method for converting tasks into observables is `Observable.FromAsync`. It takes a `Task<TResult>` and returns an `IObservable<TResult>`. This may sound the same as using `task.ToObservable()` like be saw previously. However, `Task` is a special entity in C#. The "async" in the name refers to the [`async`/`await`](https://msdn.microsoft.com/en-us/library/hh191443.aspx) model with which C# supports asynchronous blocking operations. With `async` we can create `Tasks` directly out of functions.
Another convenient method for converting tasks into observables is `Observable.FromAsync`. It takes a `Task<TResult>` and returns an `IObservable<TResult>`. This may sound the same as using `task.ToObservable()` like we saw previously. However, `Task` is a special entity in C#. The "async" in the name refers to the [`async`/`await`](https://msdn.microsoft.com/en-us/library/hh191443.aspx) model, with which C# supports asynchronous blocking operations. With `async` we can create `Tasks` directly out of functions.

```C#
Observable.FromAsync(async () =>
Expand All @@ -570,9 +570,9 @@ Output
Asynchronous operation complete
```

By marking a method that returns an instance of `TResult` with the `async` keyword, the function will then be implicitly understood to return a `Task<TResult>`. Such methods are non-blocking: calling an asynchronous method will initiate the corresponding work but won't complete it. The result will become available later in the `Task` and work may or may not progress concurrently, depending on the nature of the task and the platform. To get the result (and block until it's ready), you must use `await` on the `Task`. Any function which has to `await` on a `Task` can itself be declared asynchronous. `await` will block the sequence of execution for the function that uses it, but not for the outer function, unless it also `await`s.
A method which returns an a `TResult` type and is marked with the `async` keyword is implicitly understood to return a `Task<TResult>`. Such methods are non-blocking: calling an asynchronous method will initiate the corresponding work but won't complete it. Work will progress concurrently and the result will become available later in the `Task`. To get the result (and block until it's ready), you must use `await` on the `Task`. Any function which has to `await` on a `Task` must itself be declared asynchronous. `await` will block the sequence of execution for the function that uses it, but not for the outer function, unless it also `await`s.

Asynchronous methods look a lot like traditional methods, but the callstack does not apply to them in the same way. Instead, they are composed of tasks in continuation passing style. If you find the concept confusing, then don't worry, because you don't need to understand C#'s support for `async`/`await` to work with Rx. There is a lot of magic underneath those simple keywords, and there are better places to learn about it in detail. Rx supports the `async`/`await` paradigm very neatly, but it only becomes relevant where you would have to work with `Task` anyway, such as IO operations. Rx code doesn't need to `await`.
Asynchronous methods look a lot like traditional methods, but the callstack does not apply to them like you're used to. Instead, they are composed of tasks in continuation passing style. We'll see them some more in when we'll be discussing blocking and concurrency. If you find them confusing, then don't worry, because you don't need to understand `async`/`await` to work with Rx. There is some magic underneath those simple keywords, and there are [better places to learn](https://msdn.microsoft.com/en-us/library/hh191443.aspx) about it in detail. Rx supports the `async`/`await` paradigm very neatly, but it is only relevant when another library would force to use Tasks. Rx code doesn't need to `await`.

### From `IEnumerable<T>`

Expand Down
1 change: 0 additions & 1 deletion Part 2 - Sequence Basics/2. Reducing a sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ subject.OnNext(4);
subject.OnNext(5);
subject.OnNext(6);
subject.OnNext(7);
subject.OnNext(8);
subject.OnCompleted();
```

Expand Down
4 changes: 3 additions & 1 deletion Part 2 - Sequence Basics/3. Inspection.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ The `All()` extension method works just like the `Any` method, except that all v
var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Subject completed"));
var all = subject.All(i => i < 5);
all.Subscribe(b => Console.WriteLine("All values less than 5? {0}", b));
all.Subscribe(
b => Console.WriteLine("All values less than 5? {0}", b),
() => Console.WriteLine("all completed"));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(6);
Expand Down
71 changes: 49 additions & 22 deletions Part 4 - Concurrency/1. Scheduling and threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -792,50 +792,77 @@ This is example code and your mileage may vary. I find that increasing the buffe

### ScheduleAsync

Using recursive scheduling allows a repetitive task not only to be cancelable, but also to share its thread with other tasks, as other tasks can be scheduled between the recursive steps. Scheduling recursive tasks can be confusing, but C# has built-in support for asynchronous operations sharing the same thread, using `async`/`await`. Since Rx 2.0, you can schedule tasks that use `async`/`await`.
Using recursive scheduling allows a repetitive task not only to be cancelable, but also to share its thread with other tasks, as other tasks can be scheduled between the recursive steps. Scheduling recursive tasks can be confusing and some cases can be written more clearly with a `for` loop. An `async` function can break it's flow of execution and yield control to other tasks. Since Rx 2.0, you can schedule tasks that use `async`/`await`.

Our next example comes from this [blog post](http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx) that introduces Rx 2.0. First consider a recursive solution to generating a range of numbers:

```C#
Observable.Create<int>(observer =>
public IObservable<int> RangeRecursive(int start, int count, IScheduler scheduler)
{
return scheduler.Schedule(0, (i, self) =>
return Observable.Create<int>(observer =>
{
if (i < count)
return scheduler.Schedule(0, (i, self) =>
{
observer.OnNext(start + i);
self(i + 1); /* Here is the recursive call */
}
else
{
observer.OnCompleted();
}
if (i < count)
{
observer.OnNext(start + i);
self(i + 1); /* Here is the recursive call */
}
else
{
observer.OnCompleted();
}
});
});
});
}
```

If you prefer using `async`/`await`, you can implement a range like this:

```C#
Observable.Create<int>(observer =>
public IObservable<int> RangeAsync(int start, int count, IScheduler scheduler)
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
return Observable.Create<int>(observer =>
{
for (int i = 0; i < count; i++)
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
observer.OnNext(start + i);
await ctrl.Yield(); /* Use a task continuation to schedule next event */
}
observer.OnCompleted();
for (int i = 0; i < count; i++)
{
observer.OnNext(start + i);
await ctrl.Yield(); /* Use a task continuation to schedule next event */
}
observer.OnCompleted();

return Disposable.Empty;
return Disposable.Empty;
});
});
});
}
```

The scheduled action takes two parameters: an instance of `IScheduler` and a `CancellationToken`. We use the reference to the scheduler to await on `Yield()`. `Yield` informs the scheduler to execute other methods from the queue. It returns a `SchedulerOperation` that becomes ready when the scheduler decides that it is our turn again. We `await` on the `SchedulerOperation`, which means that the execution of our function is suspended until the scheduler decides that our function should proceed.

The cancellation token is used to inform the method that it should terminate (in Rx it means that the subscription has been cancelled). In this case, we didn't need to use it, because our function doesn't resume if terminated while yielding.
The cancellation token is used to inform the method that it should terminate (in Rx it means that the subscription has been cancelled). In this case, we didn't need to use it, because our function doesn't resume if it's terminated while yielding.

```C#
var scheduler = Scheduler.Default;
RangeAsync(0, 3, scheduler)
.Subscribe(i => Console.WriteLine("First " + i));
RangeAsync(0, 3, scheduler)
.Subscribe(i => Console.WriteLine("Second " + i));
Console.WriteLine("Subscribed");
```
Output
```
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
```

We yield in every iteration, which means that the scheduler can perfectly interleave two such tasks. You have complete control over when to yield and you may decide that a coarser granularity fits better such a cheap generator.

### Combinations of scheduler features

Expand Down
16 changes: 8 additions & 8 deletions Part 4 - Concurrency/2. Testing Rx.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public sealed class ImmediateSchedulers : ISchedulerService

The `TestScheduler` provides further advanced features. I find that I am able to get by quite well without these methods, but others may find them useful. Perhaps this is because I have found myself accustomed to testing without them from using earlier versions of Rx.

### Start(Func<IObservable<T>>)
### `Start(Func<IObservable<T>>)`

There are three overloads to `Start`, which are used to start an observable sequence at a given time, record the notifications it makes and dispose of the subscription at a given time. This can be confusing at first, as the parameterless overload of `Start` is quite unrelated. These three overloads return an `ITestableObserver<T>` which allows you to record the notifications from an observable sequence, much like the `Materialize` method we saw in the Transformation chapter.

Expand Down Expand Up @@ -691,20 +691,20 @@ We can see the major difference a Hot Observable bears by changing the virtual c
```C#
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
new Recorded>Notification>long<<(10000000, Notification.CreateOnNext(0L)),
new Recorded>Notification>long<<(20000000, Notification.CreateOnNext(1L)),
new Recorded>Notification>long<<(30000000, Notification.CreateOnNext(2L)),
new Recorded>Notification>long<<(40000000, Notification.CreateOnNext(3L)),
new Recorded>Notification>long<<(40000000, Notification.CreateOnCompleted>long<())
new Recorded<Notification<long>>(10000000, Notification.CreateOnNext(0L)),
new Recorded<Notification<long>>(20000000, Notification.CreateOnNext(1L)),
new Recorded<Notification<long>>(30000000, Notification.CreateOnNext(2L)),
new Recorded<Notification<long>>(40000000, Notification.CreateOnNext(3L)),
new Recorded<Notification<long>>(40000000, Notification.CreateOnCompleted<long>())
);
var testObserver = scheduler.Start(
() =< source,
() => source,
0,
TimeSpan.FromSeconds(1).Ticks,
TimeSpan.FromSeconds(5).Ticks);
Console.WriteLine("Time is {0} ticks", scheduler.Clock);
Console.WriteLine("Received {0} notifications", testObserver.Messages.Count);
foreach (Recorded>Notification>long<< message in testObserver.Messages)
foreach (Recorded<Notification<long>> message in testObserver.Messages)
{
Console.WriteLine(" {0} @ {1}", message.Value, message.Time);
}
Expand Down
2 changes: 1 addition & 1 deletion Part 4 - Concurrency/3. Sequences of coincidence.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ source.Window(3)
{
var id = windowIdx++;
Console.WriteLine("--Starting new window");
var windowName = "Window" + thisWindowIdx;
var windowName = "Window" + windowIdx;
window.Subscribe(
value => Console.WriteLine("{0} : {1}", windowName, value),
ex => Console.WriteLine("{0} : {1}", windowName, ex),
Expand Down

0 comments on commit 4f1ea02

Please sign in to comment.