Transcript
Page 1: Getting deeper with TPL & async (Spanish version)

TPLAsync

Irán Reyes Fleitas

Page 2: Getting deeper with TPL & async (Spanish version)

TPL, Async, Axum

Temas:• TPL

• Paralelización de código imperativo.

• Programación Paralela con tareas(Task).

• Colecciones de Concurrencia y Pipelines.

• Estructuras para la coordinación de los datos.

• Async

Page 3: Getting deeper with TPL & async (Spanish version)

TPL

Evolución C#

Page 4: Getting deeper with TPL & async (Spanish version)

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

Page 5: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Parallel Class( System.Threading.Tasks )

Parallel.ForParallel.ForEach

Parallel.Invoke

Page 6: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Parallel.Invoke

La manera más simple de paralelizar varios métodos.

Sintaxis:Invoke(  Action [] )Invoke( ParallelOptions, Action [] )

GoToRiver GoToPark GoToZoo GoToPlainArea

Parallel.Invoke(Walk.GoToPark, Walk.GoToRiver, Walk.GoToZoo, Walk.GoToPlainArea);

Parallel.Invoke( () => Walk.GoToPark("Santiago"), Walk.GoToRiver, delegate() { Walk.GoToZoo("26st"); }, Walk.GoToPlainArea);

IMPORTANTENo se tiene garantía de orden.

No retorna hasta que cada invocación no hay finalizado.

Patrón Fork/Joi

n

Page 7: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

3 posibles escenarios de paralelismoEscenario Ideal

1er escenario

2do escenario

3er escenario

Ejemplo hipotético con una arquitectura con 4 núcleos lógicos.

1era ejecuciónGoToZooGoToRiverGoToParkGoToPlainArea

2da ejecuciónGoToParkGoToRiverGoToPlainAreaGoToZoo

3era ejecuciónGoToZooGoToPlainAreaGoToRiverGoToPark

Page 8: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Ventajas y Desventajas

1. Métodos con notables diferencias en cuanto al tiempo de ejecución.

2. Cada llamada crea una sobrecarga antes de correr los métodos.

3. Como todo código en paralelo, esta expuesto a existencias de interdependencia e incontrolables interacciones.

4. No tiene garantía de orden.

1. Es un método muy simple de lograr paralelismo sin tareas, ni hilos.

Ventajas

Desventajas

Page 9: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial

Parallel.Invoke(Walk.GoToPark, Walk.GoToRiver, Walk.GoToZoo, Walk.GoToPlainArea);

Walk.GoToPark();Walk.GoToRiver();Walk.GoToZoo();Walk.GoToPlainArea();

Secuencial

Paralelo

Ejemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

Page 10: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Parallel.For

Versión paralelizada del clásico for.

Sintaxis:For( Int32, Int32, Action<Int32> )For( Int32, Int32, Action<Int32, ParallelLoopState> )

List<string> data = new List<string>(){"Estamos","paralelizando","el","for","y","el","foreach"};

for (int i = 0; i < data.Count; i++){ Console.Write(i);}

Parallel.For(0, data.Count, x => { Console.Write(x); });

Tradicional Paralelizado

IMPORTANTENo tiene por que cumplirse el orden.

Tener en cuenta si los elementos estan relacionados entre si.Desde LowerBound a UpperBound.

IMPORTANTEEl primer parámetro es inclusivo, el segundo exclusivo.

Load-Balanc

e

Pequeños bodies.

Page 11: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial(RayTracing)

void Render(Scene scene, Color[,] rgb) {

for (int y = 0; y < screenHeight; y++) { for (int x = 0; x < screenWidth; x++) rgb[x,y] = TraceRay(new Ray(scene,x,y));

} }

void Render(Scene scene, Color[,] rgb) {

Parallel.For(0, screenHeight, delegate(int y) {for (int x = 0; x < screenWidth; x++) rgb[x,y] = TraceRay(new Ray(scene,x,y));

}); }

Secuencial

Paralelo

Ocho núcleos y 350 x 350

Secuencial: 1.7 fpsParalelo : 12 fps

Dos núcleos y 350 x 350

Secuencial: 1.0 fpsParalelo : 2.0 fps

Dos núcleos y 578 x 485

Secuencial: 0.5 fpsParalelo : 1.0 fps

Page 12: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial(Primos)List<int> primes = new List<int>();int cotaSup = 50000;

for (int i = 2; i < cotaSup; i++){ if (isPrime(i)) primes.Add(i);}

Secuencial

Paralelo

Parallel.For(2, cotaSup, (i) =>{ if (isPrime(i)) primes.Add(i);});

0.5 segundos

0.2 segundos

0.5/0.2 = 2.5x

Ejemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

Page 13: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

F#

let sentences = [|"Estamos"; "paralelizando"; "el"; "for"; "y"; "el"; "foreach"|]

for index=0 to sentences.Length do printfn "%d" index

printfn ""

let accion indice = printfn "%d" indice

Parallel.For(0,sentences.Length, new Action<int>(accion))

Console.ReadKey();

Page 14: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Parallel.ForEach

Versión paralelizada del clásico foreach.

Sintaxis:ForEach <TSource>( IEnumerable <TSource>, Action <TSource> )

List<string> data = new List<string>(){"Estamos","paralelizando","el","for","y","el","foreach"};

foreach (var items in data){ Console.Write(items + " ");}

Tradicional Paralelizado

ForEach <TSource>( IEnumerable <TSource>, Action <TSource, ParallelLoopState> )

Parallel.ForEach(data, x => { Console.Write(x + " "); });

Page 15: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencialEjemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

Secuencial

Paralelo

foreach (var i in inputData){ if (isPrime(i)) resultData[indice] = i; indice++;}

28 segundos

14 segundos

28/14 = 2x

var op = Partitioner.Create(inputData);

Parallel.ForEach(op, (item, loopState, index) =>{ if (isPrime(item)) resultData[index] = item;});

Page 16: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

¿Como paramos los ciclos?(Cancelando)

ParallelLoopResult loopResult1 = Parallel.For(0, 10, (x, state) =>

{ if (x < 5) Console.WriteLine(x); else state.Stop();});

ParallelLoopResult loopResult2 = Parallel.ForEach(data, (x, state) =>

{ if (!x.Equals("y")) Console.WriteLine(x); else state.Break();});

Console.WriteLine(loopResult1.LowestBreakIteration);Console.WriteLine(loopResult1.IsCompleted);

Console.WriteLine(loopResult2.LowestBreakIteration);Console.WriteLine(loopResult2.IsCompleted);

ParallelLoopState ParallelLoopResult

Page 17: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Manejo de Excepciones

try{ ..... .....}catch (AggregateException aggEx){ foreach (Exception ex in aggEx.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'",ex.Message)); }}

Formato:

AggregateException

Page 18: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Manejo de Excepciones

Ejemplo:try{

ParallelLoopResult loopResult = Parallel.For(0, 10, (x, state) =>{

if (x < 5)Console.WriteLine(x);

else{

var ex = "Excepción en el índice " + x;throw new InvalidDataException(ex);

}});

Console.WriteLine("Ciclo for completado: {0}", loopResult.IsCompleted);}catch (AggregateException aggEx){

foreach (var innerException in aggEx.InnerExceptions){

//Pueden haber 2 excepciones a causa del paralelismo.Console.WriteLine("Excepcion capturada: " + innerException.Message);

}}

Page 19: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

ParallelOptions

ParallelOptions.MaxDegreeOfParallelismParallelOptions.TaskSchedulerParallelOptions.CancellationToken

Se utilizan en los métodos de Parallel.

var source = Enumerable.Range(8, 2000).ToArray();

double[] result = new double[source.Length];

ParallelOptions parallelOptions = new ParallelOptions();parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount*2; //Ejemplo

Parallel.ForEach(Partitioner.Create(8, source.Length),parallelOptions, range => {

for (int i = range.Item1; i < range.Item2; i++) result[i] = source[i]*Math.E; });

Page 20: Getting deeper with TPL & async (Spanish version)

TPL - Paralelización de código imperativo

Particionando

Partitioner.Create(1,40)

Partición por rangos Partición por bloques

Parallel.ForEach(Partitioner.Create(10, 200), range => { Console.WriteLine("{0},{1}",range.Item1,range.Item2); for (int i = range.Item1; i < range.Item2; i++) { data[i] = data[i]*i; } });

Optimizando el particionado según el número de núcleos.

Partitioner.Create(1,40, ((numeroDeElementos /numeroDeNucleos)+1))

System.Environment.ProcessorCount

Sintaxis:Create <TSource >( IEnumerable<TSource > )Create ( Int32, Int32)

Create ( Int32, Int32, Int32)

Page 21: Getting deeper with TPL & async (Spanish version)

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

• Parallel Linq (PLinq)• Operadores, Cancelando, Agregaciones, Excepciones.

Page 22: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Task

Page 23: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Task - Scheduling

Page 24: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Ciclo de vida y estado de una tarea

Enum TaskStatus

Miembros:

CreatedWaitingForActivationWaitingToRunRunningWaitingForChildrenToCompleteRanToCompletionCanceledFaulted

Page 25: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Invocando Tareas

GenerateSomething GenerateNothing

Parallel.Invoke(GenerateSomething,() => GenerateNothing());

//Los métodos no están corriendo todavía, pero las tareas están listas para empezar.//El estado para ambas tareas es TaskStatus.Created.var task1 = new Task(GenerateSomething);var task2 = new Task(() => GenerateNothing());task1.Start();task2.Start();Task.WaitAll(task1, task2);

var task1 = Task.Factory.StartNew(() => GenerateNothing());

Page 26: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

TimeOuts

var task1 = new Task(GenerateSomethingTimeOut);var task2 = new Task(() => GenerateNothing());

task1.Start();task2.Start();

if(!Task.WaitAll(new Task[]{task1,task2},300)){ Console.WriteLine("GenerateSomething y GenerateNothing han tardado más de 300ms");}if(!task1.Wait(300)){ Console.WriteLine("GenerateSomething ha tardado más de 300ms");}

Page 27: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Manejando excepciones con las Task

static void GenerateSomethingCancel(CancellationToken cancellationToken){ cancellationToken.ThrowIfCancellationRequested(); Console.WriteLine("GenerateSomething"); Thread.Sleep(3000);

if (sw.Elapsed.Seconds > 1) throw new TimeoutException("La tarea se demoró mas de 1 segundos");

cancellationToken.ThrowIfCancellationRequested();}

try{ // Espera por que todas las tareas finalicen en menos de 3 segundos if (!Task.WaitAll(new Task[] { task1, task2 }, 3000)) { Console.WriteLine("GenerateSomething y GenerateNothing han tardado más de 300ms en terminar"); Console.WriteLine(task1.Status.ToString()); Console.WriteLine(task2.Status.ToString()); }}catch (AggregateException ex){ foreach (Exception innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); }}

Page 28: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Retornando valores desde las tareas

static List<string> GenerateSomethingReturn(){ Console.WriteLine("GenerateSomething"); Thread.Sleep(3000);

return new List<string>{"Estoy","retornando","una","lista","de","strings."};}

var task1 = Task.Factory.StartNew(() => GenerateSomethingReturn());

try { task1.Wait(); } catch (AggregateException ex) { foreach (Exception innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); } }

var task2 = Task.Factory.StartNew(() => { foreach (var result in task1.Result) { Console.WriteLine(result); } });

Page 29: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Cancelando Tareas usando Tokens

CancellationToken cancellationToken

CancellationTokenSource

Se pasa como parámetro

Controla la cancelación desde el método principal

static void GenerateSomethingCancel(CancellationToken cancellationToken){ cancellationToken.ThrowIfCancellationRequested(); Console.WriteLine("GenerateSomething"); Thread.Sleep(3000); cancellationToken.ThrowIfCancellationRequested();}

var cts = new CancellationTokenSource();var ct = cts.Token;

var task1 = Task.Factory.StartNew(() => GenerateNothingCancel(ct),ct);cts.Cancel();

if (task1.IsCanceled) { Console.WriteLine("La Tarea GenerateSomethingCancel que estaba en ejecucion fue cancelada"); }

Page 30: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

TaskCreationOptions

TaskCreationOptions.AttachedToParentTaskCreationOptions.NoneTaskCreationOptions.LongRunningTaskCreationOptions.PreferFairness

var task2 = Task.Factory.StartNew(() =>{foreach (var result in task1.Result) { Console.WriteLine(result); }

},TaskCreationOptions.PreferFairness);

Optimizando el código

Ayudar al Scheduler

Page 31: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Concatenando múltiples tareas usando Continuación

var task1 = Task.Factory.StartNew(() => GenerateSomethingCancelReturn(ct), ct);var task2 = task1.ContinueWith(t => { foreach (var result in t.Result) { Console.WriteLine(result); } });

try{

task1.Wait();}catch (AggregateException ex){

foreach (Exception innerEx in ex.InnerExceptions)

{ Console.WriteLine(innerEx.ToString());

}}

var task2 = Task.Factory.StartNew(() =>{

foreach (var result in task1.Result){

Console.WriteLine(result);}

});

Page 32: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

var f = Task.Factory; var build1 = f.StartNew(() => Build(project1)); var build2 = f.StartNew(() => Build(project2));var build3 = f.StartNew(() => Build(project3));

var build4 = build1.ContinueWith(() => Build(project4)); var build5 = f.ContinueWhenAll(new[] { build1, build2, build3 }, () => Build(project5)); var build6 = f.ContinueWhenAll(new[] { build3, build4 }, () => Build(project6)); var build7 = f.ContinueWhenAll(new[] { build5, build6 }, () => Build(project7)); var build8 = build5.ContinueWith(() => Build(project8));

Task.WaitAll(build1, build2, build3, build4, build5, build6, build7, build8);

DAG

Page 33: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Mezclando paralelismo y código secuencial con Continuación

Page 34: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

TaskContinuationOptions

TaskContinuationOptions.AttachedToParentTaskContinuationOptions.ExecuteSynchronouslyTaskContinuationOptions.LongRunningTaskContinuationOptions.PreferFairnessTaskContinuationOptions.None

TaskContinuationOptions.NotOnCanceledTaskContinuationOptions.NotOnFaultedTaskContinuationOptions.NotOnRanToCompletionTaskContinuationOptions.OnlyOnCanceledTaskContinuationOptions.OnlyOnFaultedTaskContinuationOptions.OnlyOnRanToCompletion

var task2 = task1.ContinueWith(t => { foreach (var result in t.Result) { Console.WriteLine(result); } },TaskContinuationOptions.None);

Especificando el comportamiento de la próxima tarea

Condicionando la próxima tarea

Page 35: Getting deeper with TPL & async (Spanish version)

TPL - Programación Paralela con tareas

Análisis de tiempo con respecto a los ThreadEjemplo hipotético con una arquitectura con 2 núcleos lógicos.

64 Threads versus 64 Tasks

3. Los primos hasta el 5000 : Thread 15 segundos. Tasks 13 segundos.

1. Los primos hasta el 50 : Thread 0.9 segundos. Tasks 0.2 segundos.

4. Los primos hasta el 50000: Thread 116 segundos. Tasks 104 segundos.

2. Los primos hasta el 500 : Thread 2 segundos. Tasks 1 segundo.

Page 36: Getting deeper with TPL & async (Spanish version)

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

Page 37: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

var data = new List<int>();

Parallel.ForEach(Partitioner.Create(0, 200), range =>{

for (int i = range.Item1; i < range.Item2; i++) lock (data) data.Add(i);});

data.ForEach(x => Console.Write(x + " "));

Solución

BlockingCollection<T>

ConcurrentBag<T>

ConcurrentQueue<T>ConcurrentStack<T>

ConcurrentDictionary<T>

Page 38: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

Colecciones Thread-Unsafe:

System.CollectionsSystem.Collections.Generic

Colecciones Thread-Safe:

System.collections.Concurrent

ConcurrentQueue<T>ConcurrentStack<T>ConcurrentBag<T>ConcurrentDictionary<TKey, TValue>BlockingCollection<T>

IProducerConsumerCollection<T>

Page 39: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

Colecciones de concurrencia ideales para escenarios productor-consumidor.

Page 40: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

Métodos Importantes:EnqueueTryDequeueTryPeek

Lock-Free

Page 41: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

-Esta coleccion es completamente libre de lock (lock-free)

-Usa compare and swap (CAS)

-Cuando falla una operacion CAS se pone en estado de contencion.

Page 42: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

-Produce (over-head).

-Mejora el rendimiento de la cola y otras colecciones thread-unsafe, en determinados escenarios.

-Nos facilita el trabajo con la concurrencia.

Page 43: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

Caracteristicas importantes:

concurrentQueue.Enqueue(item);

if (concurrentQueue.TryPeek(out item)) {

DoSomething(item); }

if (concurrentQueue.TryDequeue(out item)) {

DoSomething(item); }

Page 44: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Métodos Importantes:PushTryPopTryPeek

Page 45: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Otros Métodos:

PushRangePopRange

Page 46: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

-Similar a la coleccion ConcurrentQueue.

-Es una coleccion LIFO.

-Atomicidad en los metodos PushRange y PopRange reduce la cantidad de insersiones y extracciones concurrentes en la coleccion.

Page 47: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Caracteristicas importantes:

concurrentStack.Push(item);

if (concurrentStack.TryPeek(out item)){

DoSomething(item);}

if (concurrentStack.TryPop(out item)){

DoSomething(item);}

Page 48: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Caracteristicas importantes:

Parallel.ForEach(Partitioner.Create(0, partCount), p => { concurrentStack.PushRange

( numberArray, p.Item1, p.Item2 - p.Item1 );

});

Page 49: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Sintaxis:

count = s.TryPopRange(numberArray, 0, numberArray.Length);

count = s.TryPopRange(numberArray);

Count sera la cantidad de objetos que fueron sacados del tope de la cola e insertados el el array.

Page 50: Getting deeper with TPL & async (Spanish version)

Atomiciadad o costo…PushRange

TryPopRange

Push TryPop

No memoria adicional

No Over-Head

Igual concurrencia

Buen rendimiento

Menor concurrencia

Atomiciadad

Memoria adicional

Over-Head

TPL - Colecciones de Concurrencia y Pipelines

Page 51: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

Métodos Importantes:AddTryTakeTryPeek

Nota:Colección donde el orden no importa.

Ideal para escenarios Productor – Consumidor.

Page 52: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

-Ideal para ciertos escenarios productor-consumidor.

-No es completamente lock-free.

-Bastante ineficiente, donde el hilo productor es distinto al consumidor.

-Mantiene una cola local para cada hilo que accede a ella.

Page 53: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

Caracteristicas importantes:

sentencesBag.Add(s.ToString());

string sentence; if (_sentencesBag.TryTake(out sentence)) {

_capWordsInSentencesBag.Add (

CapitalizeWords(delimiterChars, sentence, '\\' )); }

Page 54: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Métodos Importantes:AddTryAddTakeTryTakeCompleteAddingGetConsumerEnumerable

Ofrece soporte para Bounding y Blocking.

Ideal para escenarios Productor – Consumidor.

Ideal para implementaciones de pipelines.

Es un wrapper para una interfaz del tipo IProducerConsumerCollection<T>

Capacidad máxima opcional.

Permite cancelación a través de tokens.

Existen 2 tipos de enumeraciones con foreach:1. Enumeración de solo lectura.2. Enumeración que elimina los elementos que han sido enumerados(P-C).

Page 55: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

BlockingCollection<int> stackBC = new BlockingCollection<int>(new ConcurrentStack<int>());

BlockingCollection<int> bagBC = new BlockingCollection<int>(new ConcurrentBag<int>());

BlockingCollection<int> bc = new BlockingCollection<int>(count);

Page 56: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

IsCompleted, IsAddingCompleted.

string aux; while (!sentences.IsCompleted) { if (sentences.TryTake(out aux)) upperSentences.Add(aux.ToUpper()); } upperSentences.CompleteAdding();

Page 57: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

GetConsumingEnumerable()

foreach (var item in upperSentences.GetConsumingEnumerable()) { finalSentences.Add(item.Replace("U", "")); } upperSentences.CompleteAdding();

Page 58: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: if (!_sentencesBC.TryAdd(newSentence, 2000, cancToken)) { throw new TimeoutException( "_sentencesBC took more than 2 seconds to add an item"); } catch (OperationCanceledException ex) { // The operation was cancelled break; }

Page 59: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

BlockingCollection<TOutput>.AddToAny(Output, result, _token);

BlockingCollection<TOutput>.TryAddToAny(Output, result, timeOut, _token);

Page 60: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: BlockingCollection<TOutput>.AddToAny(array, item, _token);

BlockingCollection<TOutput>.TryAddToAny(array, item, timeOut, _token);

Estos metodos devuelven el indice de la coleccion, en el array de colecciones, a la cual se le agrego el elemento.

Page 61: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: BlockingCollection<TOutput>.TakeFromAny(array, out item, _token);

BlockingCollection<TOutput>.TakeFromAny(array, out item, timeOut, _token);

Estos metodos devuelven el indice de la coleccion, en el array de colecciones, de la cual se elimino el elemento.

Page 62: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

-Facilita el trabajo con las colecciones thread-safe.

-Produce Over-Head.

-Disminuye la cantidad y simplfica la complejidad del codigo.

-Ideal para la implementacion de pipelines(Ejemplo)

Page 63: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentDictionary<T>

Métodos Importantes:AddOrUpdateGetEnumeratorGetOrAddTryAddTryGetValueTryRemoveTryUpdate

Lock-Free para

operaciones de lectura

Sintaxis:ConcurrentDictionary<TKey, TValue >()ConcurrentDictionary<TKey, TValue >(Int32, Int32)

int initialCapacity = 100;

int concurrencyLevel = Environment.ProcessorCount * 2;

ConcurrentDictionary<int, int> cd = new ConcurrentDictionary<int, int>(concurrencyLevel, initialCapacity);

for (int i = 0; i < 64; i++) cd[i] = i * i;

Console.WriteLine(“23² is {0} (should be {1})", cd[23], 23 * 23);

Page 64: Getting deeper with TPL & async (Spanish version)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentDictionary<Tkey, TValue>

Caracteristicas importantes: _rectanglesDict.AddOrUpdate( newKey, newRect, (key, existingRect) => {if (existingRect != newRect) { lock (existingRect) { existingRect.Update( newRect.Location, newRect.Size); } return existingRect; } else { return existingRect; } });

Page 65: Getting deeper with TPL & async (Spanish version)

Async - Programación Asincrónica

C# 5.0 - Async

Page 66: Getting deeper with TPL & async (Spanish version)

Asynchronous

Programming

Model(APM)

Event-based

Asynchronous

Pattern(EAP)

Task Asynchrono

us Pattern(TAP

)

Async - Programación Asincrónica

Patrones estándares

TaskFactory.FromAsync TaskCompletionSource

Idea

Sincrónico = Asincrónico

TAP

APM

EAP

Async

Page 67: Getting deeper with TPL & async (Spanish version)

Async - Programación Asincrónica

Microsoft Visual Studio Async Community Technology Preview (CTP). ( Visual Studio Async CTP )

Nuevas keywords:

async:

await:

Marca a métodos o expresiones lambdas como asincrónicas.Retiene el control hasta que la operación asincrónica termine.

Objetivo:

Programación asincrónica = Programación sincrónica.

Fin de los métodos callback.

Escribir códigos simples y fáciles de entender.

Page 68: Getting deeper with TPL & async (Spanish version)

Async - Programación Asincrónica

public int SumPageSizes(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = new WebClient().DownloadData(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);return total;

}

Versión Sincrónica

Problemas:Bloquea la interfaz de usuario.

No nos va enseñando el estado de la descarga.

Solución Versión Asincrónica

Page 69: Getting deeper with TPL & async (Spanish version)

public void SumPageSizesAsync(IList<Uri> uris){ SumPageSizesAsyncHelper(uris.GetEnumerator(), 0);}

private void SumPageSizesAsyncHelper(IEnumerator<Uri> enumerator, int total){

if (enumerator.MoveNext()){

statusText.Text = string.Format("Found {0} bytes ...", total);var client = new WebClient();client.DownloadDataCompleted += (sender, e) => SumPageSizesAsyncHelper(enumerator, total + e.Result.Length);client.DownloadDataAsync(enumerator.Current);

}else{

statusText.Text = string.Format("Found {0} bytes total", total);enumerator.Dispose();

}}

Problemas:Hay que romper el foreach.

En cada llamado se ancla un evento.El código es recursivo.

No retorna el total una ves calculado.

Versión Asíncrona con EAP

Async - Programación Asincrónica

Page 70: Getting deeper with TPL & async (Spanish version)

ConclusionesEl método anterior es asincrónico con una sola llamada asincrónica y una sola estructura de control alrededor de esta. Imagínense más llamadas asincrónicas y más estructuras de control, sería un verdadero caos.

Primera solución: Utilizar APM o EAP con las nuevas clases de TPL.

Segunda solución: Utilizar TAP(junto a async).

public async Task<int> SumPageSizesAsyncBest(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());return total;

}

Async - Programación Asincrónica

Page 71: Getting deeper with TPL & async (Spanish version)

Async - Programación Asincrónica

let asyncProcessFile (filePath : string) = async { printfn "Procesando fichero [%s]" (Path.GetFileName(filePath)) use fileStream = new FileStream(filePath,FileMode.Open) let bytesToRead = int fileStream.Length let! data = fileStream.AsyncRead(bytesToRead) //Returna un objeto Async<byte[]> printfn “Se leyeron [%d] bytes" data.Length use resultFile = new FileStream(filePath + ".results", FileMode.Create) do! resultFile.AsyncWrite(data,0,data.Length) printfn "Finalizado el procesamiento del archivo [%s]" <| Path.GetFileName(filePath) } |> Async.Start

asyncProcessFile "./testAsync.txt"Console.ReadKey();

async en F#

Page 72: Getting deeper with TPL & async (Spanish version)

Async - Programación Asincrónica

Retorno

public async void SumPageSizesAsyncBestOther(IList<Uri> uris){int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());

}

Task Void( Fire and forget )

private async void sumButton_Click(object sender, RoutedEventArgs e) {    sumButton.IsEnabled = false;    await SumPageSizesAsync(GetUrls()));    sumButton.IsEnabled = true;}

( Incluyendo genéricas)

Page 73: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

Código final propuesto

public async Task<int> SumPageSizesAsyncBetter(IList<Uri> uris){

var tasks = from uri in uris select new WebClient().DownloadDataTaskAsync(uri);var data = await TaskEx.WhenAll(tasks);return await TaskEx.Run(() =>data.Sum(s => s.Length));

}

Nota:Se propone incluir Run() y WhenAll() en la clase Task cuando async arrive a su versión final; mientras este en CTP se alojarán en una clase de prueba llamada TaskEx.

Page 74: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

¿Como funciona async?

public static async Task DoSum(int from,int to){

int result = await Sum(from, to);string param = result.ToString() + "\r\n";File.AppendAllText(@"./result.txt", param);

}

public static Task<int> Sum(int from, int to){

Task<int> sum = TaskEx.Run(() =>{

int result = 0;for (int i = from; i <= to; i++){

TaskEx.Delay(500);result += i;

}return result;

});return sum;

}

public static Task DoSum(int from,int to){var task1 = Task.Factory.StartNew(() => Sum(from,to));

return task1.ContinueWith((antecedentTask) =>{string param = antecedentTask.Result.Result.ToString() + \r\n";File.AppendAllText(@"./result.txt", param);});

}

public static Task<int> Sum(int from, int to){

Task<int> sum = TaskEx.Run(() =>{

int result = 0;for (int i = from; i <= to; i++){

TaskEx.Delay(500);result += i;

}return result;

});return sum;

}

Original Transformado por el compilador

Page 75: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

Como funciona async

public static async Task DoSum(int from,int to){

int result = await Sum(from, to);string param = result.ToString() + "\r\n";File.AppendAllText(@"./result.txt", param);

}

static void Main(string[] args){

int number;string input;Task myTask = new Task(Console.WriteLine);

while (true){

Console.WriteLine("Entre un número: ");input = Console.ReadLine();if (string.Empty == input)break;number = int.Parse(input);myTask = DoSum(1, number);

} myTask.Wait();}

Callback En cuanto la tarea finalice.

Continua la ejecución.

Retorna una tarea

Page 76: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

public async Task<int> SumPageSizesAsyncBest(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());return total;

}

Retornando valores desde async

Page 77: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

Cancelación desde async

public async void Inicio(Program program) { cts = new CancellationTokenSource(); program.Hola(cts.Token); Thread.Sleep(1000); if (cts != null) cts.Cancel(); }

public async Task Hola(CancellationToken ct) { Console.WriteLine("Before await"); await TaskEx.Delay(5000); ct.ThrowIfCancellationRequested(); Console.WriteLine("After await"); }

let cancelableTask = async { printfn "Waiting 10 seconds..." for i = 1 to 10 do printfn "%d..." i do! Async.Sleep(1000) printfn "Finished!" }

// Callback used when the operation is canceledlet cancelHandler (ex : OperationCanceledException) = printfn "The task has been canceled."

Async.TryCancelled(cancelableTask,cancelHandler)|>Async.StartThread.Sleep(2000)Async.CancelDefaultToken()

C# F#

Page 78: Getting deeper with TPL & async (Spanish version)

TPL - Async - Programación asincrónica

Excepciones desde async

try{ string txt = await w.DownloadStringTaskAsync(url);}catch(WebException x){ --- Handle exception.}

let asyncOperation = async { try // ... with | :? IOException as ioe -> printfn "IOException: %s" ioe.Message | :? ArgumentException as ae -> printfn "ArgumentException: %s" ae.Message }

Las excepciones se manejan igual que de

manera sincrónica.