Getting deeper with TPL & async (Spanish version)

  • View
    83

  • Download
    1

  • Category

    Science

Preview:

DESCRIPTION

Presentation about the Task Parallel Library and the new async paradigm in .NET Framework.

Citation preview

TPLAsync

Irán Reyes Fleitas

TPL, Async, Axum

Temas:• TPL

• Paralelización de código imperativo.

• Programación Paralela con tareas(Task).

• Colecciones de Concurrencia y Pipelines.

• Estructuras para la coordinación de los datos.

• Async

TPL

Evolución C#

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

TPL - Paralelización de código imperativo

Parallel Class( System.Threading.Tasks )

Parallel.ForParallel.ForEach

Parallel.Invoke

TPL - Paralelización de código imperativo

Parallel.Invoke

La manera más simple de paralelizar varios métodos.

Sintaxis:Invoke(  Action [] )Invoke( ParallelOptions, Action [] )

GoToRiver GoToPark GoToZoo GoToPlainArea

Parallel.Invoke(Walk.GoToPark, Walk.GoToRiver, Walk.GoToZoo, Walk.GoToPlainArea);

Parallel.Invoke( () => Walk.GoToPark("Santiago"), Walk.GoToRiver, delegate() { Walk.GoToZoo("26st"); }, Walk.GoToPlainArea);

IMPORTANTENo se tiene garantía de orden.

No retorna hasta que cada invocación no hay finalizado.

Patrón Fork/Joi

n

TPL - Paralelización de código imperativo

3 posibles escenarios de paralelismoEscenario Ideal

1er escenario

2do escenario

3er escenario

Ejemplo hipotético con una arquitectura con 4 núcleos lógicos.

1era ejecuciónGoToZooGoToRiverGoToParkGoToPlainArea

2da ejecuciónGoToParkGoToRiverGoToPlainAreaGoToZoo

3era ejecuciónGoToZooGoToPlainAreaGoToRiverGoToPark

TPL - Paralelización de código imperativo

Ventajas y Desventajas

1. Métodos con notables diferencias en cuanto al tiempo de ejecución.

2. Cada llamada crea una sobrecarga antes de correr los métodos.

3. Como todo código en paralelo, esta expuesto a existencias de interdependencia e incontrolables interacciones.

4. No tiene garantía de orden.

1. Es un método muy simple de lograr paralelismo sin tareas, ni hilos.

Ventajas

Desventajas

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial

Parallel.Invoke(Walk.GoToPark, Walk.GoToRiver, Walk.GoToZoo, Walk.GoToPlainArea);

Walk.GoToPark();Walk.GoToRiver();Walk.GoToZoo();Walk.GoToPlainArea();

Secuencial

Paralelo

Ejemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

TPL - Paralelización de código imperativo

Parallel.For

Versión paralelizada del clásico for.

Sintaxis:For( Int32, Int32, Action<Int32> )For( Int32, Int32, Action<Int32, ParallelLoopState> )

List<string> data = new List<string>(){"Estamos","paralelizando","el","for","y","el","foreach"};

for (int i = 0; i < data.Count; i++){ Console.Write(i);}

Parallel.For(0, data.Count, x => { Console.Write(x); });

Tradicional Paralelizado

IMPORTANTENo tiene por que cumplirse el orden.

Tener en cuenta si los elementos estan relacionados entre si.Desde LowerBound a UpperBound.

IMPORTANTEEl primer parámetro es inclusivo, el segundo exclusivo.

Load-Balanc

e

Pequeños bodies.

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial(RayTracing)

void Render(Scene scene, Color[,] rgb) {

for (int y = 0; y < screenHeight; y++) { for (int x = 0; x < screenWidth; x++) rgb[x,y] = TraceRay(new Ray(scene,x,y));

} }

void Render(Scene scene, Color[,] rgb) {

Parallel.For(0, screenHeight, delegate(int y) {for (int x = 0; x < screenWidth; x++) rgb[x,y] = TraceRay(new Ray(scene,x,y));

}); }

Secuencial

Paralelo

Ocho núcleos y 350 x 350

Secuencial: 1.7 fpsParalelo : 12 fps

Dos núcleos y 350 x 350

Secuencial: 1.0 fpsParalelo : 2.0 fps

Dos núcleos y 578 x 485

Secuencial: 0.5 fpsParalelo : 1.0 fps

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencial(Primos)List<int> primes = new List<int>();int cotaSup = 50000;

for (int i = 2; i < cotaSup; i++){ if (isPrime(i)) primes.Add(i);}

Secuencial

Paralelo

Parallel.For(2, cotaSup, (i) =>{ if (isPrime(i)) primes.Add(i);});

0.5 segundos

0.2 segundos

0.5/0.2 = 2.5x

Ejemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

TPL - Paralelización de código imperativo

F#

let sentences = [|"Estamos"; "paralelizando"; "el"; "for"; "y"; "el"; "foreach"|]

for index=0 to sentences.Length do printfn "%d" index

printfn ""

let accion indice = printfn "%d" indice

Parallel.For(0,sentences.Length, new Action<int>(accion))

Console.ReadKey();

TPL - Paralelización de código imperativo

Parallel.ForEach

Versión paralelizada del clásico foreach.

Sintaxis:ForEach <TSource>( IEnumerable <TSource>, Action <TSource> )

List<string> data = new List<string>(){"Estamos","paralelizando","el","for","y","el","foreach"};

foreach (var items in data){ Console.Write(items + " ");}

Tradicional Paralelizado

ForEach <TSource>( IEnumerable <TSource>, Action <TSource, ParallelLoopState> )

Parallel.ForEach(data, x => { Console.Write(x + " "); });

TPL - Paralelización de código imperativo

Análisis de tiempo con respecto al secuencialEjemplo hipotético (figuras)con una arquitectura con 4 núcleos lógicos y mediciones con 2 núcleos lógicos

Secuencial

Paralelo

foreach (var i in inputData){ if (isPrime(i)) resultData[indice] = i; indice++;}

28 segundos

14 segundos

28/14 = 2x

var op = Partitioner.Create(inputData);

Parallel.ForEach(op, (item, loopState, index) =>{ if (isPrime(item)) resultData[index] = item;});

TPL - Paralelización de código imperativo

¿Como paramos los ciclos?(Cancelando)

ParallelLoopResult loopResult1 = Parallel.For(0, 10, (x, state) =>

{ if (x < 5) Console.WriteLine(x); else state.Stop();});

ParallelLoopResult loopResult2 = Parallel.ForEach(data, (x, state) =>

{ if (!x.Equals("y")) Console.WriteLine(x); else state.Break();});

Console.WriteLine(loopResult1.LowestBreakIteration);Console.WriteLine(loopResult1.IsCompleted);

Console.WriteLine(loopResult2.LowestBreakIteration);Console.WriteLine(loopResult2.IsCompleted);

ParallelLoopState ParallelLoopResult

TPL - Paralelización de código imperativo

Manejo de Excepciones

try{ ..... .....}catch (AggregateException aggEx){ foreach (Exception ex in aggEx.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'",ex.Message)); }}

Formato:

AggregateException

TPL - Paralelización de código imperativo

Manejo de Excepciones

Ejemplo:try{

ParallelLoopResult loopResult = Parallel.For(0, 10, (x, state) =>{

if (x < 5)Console.WriteLine(x);

else{

var ex = "Excepción en el índice " + x;throw new InvalidDataException(ex);

}});

Console.WriteLine("Ciclo for completado: {0}", loopResult.IsCompleted);}catch (AggregateException aggEx){

foreach (var innerException in aggEx.InnerExceptions){

//Pueden haber 2 excepciones a causa del paralelismo.Console.WriteLine("Excepcion capturada: " + innerException.Message);

}}

TPL - Programación Paralela con tareas

ParallelOptions

ParallelOptions.MaxDegreeOfParallelismParallelOptions.TaskSchedulerParallelOptions.CancellationToken

Se utilizan en los métodos de Parallel.

var source = Enumerable.Range(8, 2000).ToArray();

double[] result = new double[source.Length];

ParallelOptions parallelOptions = new ParallelOptions();parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount*2; //Ejemplo

Parallel.ForEach(Partitioner.Create(8, source.Length),parallelOptions, range => {

for (int i = range.Item1; i < range.Item2; i++) result[i] = source[i]*Math.E; });

TPL - Paralelización de código imperativo

Particionando

Partitioner.Create(1,40)

Partición por rangos Partición por bloques

Parallel.ForEach(Partitioner.Create(10, 200), range => { Console.WriteLine("{0},{1}",range.Item1,range.Item2); for (int i = range.Item1; i < range.Item2; i++) { data[i] = data[i]*i; } });

Optimizando el particionado según el número de núcleos.

Partitioner.Create(1,40, ((numeroDeElementos /numeroDeNucleos)+1))

System.Environment.ProcessorCount

Sintaxis:Create <TSource >( IEnumerable<TSource > )Create ( Int32, Int32)

Create ( Int32, Int32, Int32)

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

• Parallel Linq (PLinq)• Operadores, Cancelando, Agregaciones, Excepciones.

TPL - Programación Paralela con tareas

Task

TPL - Programación Paralela con tareas

Task - Scheduling

TPL - Programación Paralela con tareas

Ciclo de vida y estado de una tarea

Enum TaskStatus

Miembros:

CreatedWaitingForActivationWaitingToRunRunningWaitingForChildrenToCompleteRanToCompletionCanceledFaulted

TPL - Programación Paralela con tareas

Invocando Tareas

GenerateSomething GenerateNothing

Parallel.Invoke(GenerateSomething,() => GenerateNothing());

//Los métodos no están corriendo todavía, pero las tareas están listas para empezar.//El estado para ambas tareas es TaskStatus.Created.var task1 = new Task(GenerateSomething);var task2 = new Task(() => GenerateNothing());task1.Start();task2.Start();Task.WaitAll(task1, task2);

var task1 = Task.Factory.StartNew(() => GenerateNothing());

TPL - Programación Paralela con tareas

TimeOuts

var task1 = new Task(GenerateSomethingTimeOut);var task2 = new Task(() => GenerateNothing());

task1.Start();task2.Start();

if(!Task.WaitAll(new Task[]{task1,task2},300)){ Console.WriteLine("GenerateSomething y GenerateNothing han tardado más de 300ms");}if(!task1.Wait(300)){ Console.WriteLine("GenerateSomething ha tardado más de 300ms");}

TPL - Programación Paralela con tareas

Manejando excepciones con las Task

static void GenerateSomethingCancel(CancellationToken cancellationToken){ cancellationToken.ThrowIfCancellationRequested(); Console.WriteLine("GenerateSomething"); Thread.Sleep(3000);

if (sw.Elapsed.Seconds > 1) throw new TimeoutException("La tarea se demoró mas de 1 segundos");

cancellationToken.ThrowIfCancellationRequested();}

try{ // Espera por que todas las tareas finalicen en menos de 3 segundos if (!Task.WaitAll(new Task[] { task1, task2 }, 3000)) { Console.WriteLine("GenerateSomething y GenerateNothing han tardado más de 300ms en terminar"); Console.WriteLine(task1.Status.ToString()); Console.WriteLine(task2.Status.ToString()); }}catch (AggregateException ex){ foreach (Exception innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); }}

TPL - Programación Paralela con tareas

Retornando valores desde las tareas

static List<string> GenerateSomethingReturn(){ Console.WriteLine("GenerateSomething"); Thread.Sleep(3000);

return new List<string>{"Estoy","retornando","una","lista","de","strings."};}

var task1 = Task.Factory.StartNew(() => GenerateSomethingReturn());

try { task1.Wait(); } catch (AggregateException ex) { foreach (Exception innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); } }

var task2 = Task.Factory.StartNew(() => { foreach (var result in task1.Result) { Console.WriteLine(result); } });

TPL - Programación Paralela con tareas

Cancelando Tareas usando Tokens

CancellationToken cancellationToken

CancellationTokenSource

Se pasa como parámetro

Controla la cancelación desde el método principal

static void GenerateSomethingCancel(CancellationToken cancellationToken){ cancellationToken.ThrowIfCancellationRequested(); Console.WriteLine("GenerateSomething"); Thread.Sleep(3000); cancellationToken.ThrowIfCancellationRequested();}

var cts = new CancellationTokenSource();var ct = cts.Token;

var task1 = Task.Factory.StartNew(() => GenerateNothingCancel(ct),ct);cts.Cancel();

if (task1.IsCanceled) { Console.WriteLine("La Tarea GenerateSomethingCancel que estaba en ejecucion fue cancelada"); }

TPL - Programación Paralela con tareas

TaskCreationOptions

TaskCreationOptions.AttachedToParentTaskCreationOptions.NoneTaskCreationOptions.LongRunningTaskCreationOptions.PreferFairness

var task2 = Task.Factory.StartNew(() =>{foreach (var result in task1.Result) { Console.WriteLine(result); }

},TaskCreationOptions.PreferFairness);

Optimizando el código

Ayudar al Scheduler

TPL - Programación Paralela con tareas

Concatenando múltiples tareas usando Continuación

var task1 = Task.Factory.StartNew(() => GenerateSomethingCancelReturn(ct), ct);var task2 = task1.ContinueWith(t => { foreach (var result in t.Result) { Console.WriteLine(result); } });

try{

task1.Wait();}catch (AggregateException ex){

foreach (Exception innerEx in ex.InnerExceptions)

{ Console.WriteLine(innerEx.ToString());

}}

var task2 = Task.Factory.StartNew(() =>{

foreach (var result in task1.Result){

Console.WriteLine(result);}

});

TPL - Programación Paralela con tareas

var f = Task.Factory; var build1 = f.StartNew(() => Build(project1)); var build2 = f.StartNew(() => Build(project2));var build3 = f.StartNew(() => Build(project3));

var build4 = build1.ContinueWith(() => Build(project4)); var build5 = f.ContinueWhenAll(new[] { build1, build2, build3 }, () => Build(project5)); var build6 = f.ContinueWhenAll(new[] { build3, build4 }, () => Build(project6)); var build7 = f.ContinueWhenAll(new[] { build5, build6 }, () => Build(project7)); var build8 = build5.ContinueWith(() => Build(project8));

Task.WaitAll(build1, build2, build3, build4, build5, build6, build7, build8);

DAG

TPL - Programación Paralela con tareas

Mezclando paralelismo y código secuencial con Continuación

TPL - Programación Paralela con tareas

TaskContinuationOptions

TaskContinuationOptions.AttachedToParentTaskContinuationOptions.ExecuteSynchronouslyTaskContinuationOptions.LongRunningTaskContinuationOptions.PreferFairnessTaskContinuationOptions.None

TaskContinuationOptions.NotOnCanceledTaskContinuationOptions.NotOnFaultedTaskContinuationOptions.NotOnRanToCompletionTaskContinuationOptions.OnlyOnCanceledTaskContinuationOptions.OnlyOnFaultedTaskContinuationOptions.OnlyOnRanToCompletion

var task2 = task1.ContinueWith(t => { foreach (var result in t.Result) { Console.WriteLine(result); } },TaskContinuationOptions.None);

Especificando el comportamiento de la próxima tarea

Condicionando la próxima tarea

TPL - Programación Paralela con tareas

Análisis de tiempo con respecto a los ThreadEjemplo hipotético con una arquitectura con 2 núcleos lógicos.

64 Threads versus 64 Tasks

3. Los primos hasta el 5000 : Thread 15 segundos. Tasks 13 segundos.

1. Los primos hasta el 50 : Thread 0.9 segundos. Tasks 0.2 segundos.

4. Los primos hasta el 50000: Thread 116 segundos. Tasks 104 segundos.

2. Los primos hasta el 500 : Thread 2 segundos. Tasks 1 segundo.

TPL

• Paralelización de código imperativo• Parallel.Invoke, Parallel Loops, Cancelando, Excepciones, Particionando .

• Programación Paralela con tareas(Task)• Task, TimeOuts, Cancelando, Excepciones, Retornando valores.

• Colecciones de Concurrencia y Pipelines• ConcurrentQueue, ConcurrentStack, ConcurrentBag, BlockingCollection,

ConcurrentDictionary.

TPL - Colecciones de Concurrencia y Pipelines

var data = new List<int>();

Parallel.ForEach(Partitioner.Create(0, 200), range =>{

for (int i = range.Item1; i < range.Item2; i++) lock (data) data.Add(i);});

data.ForEach(x => Console.Write(x + " "));

Solución

BlockingCollection<T>

ConcurrentBag<T>

ConcurrentQueue<T>ConcurrentStack<T>

ConcurrentDictionary<T>

TPL - Colecciones de Concurrencia y Pipelines

Colecciones Thread-Unsafe:

System.CollectionsSystem.Collections.Generic

Colecciones Thread-Safe:

System.collections.Concurrent

ConcurrentQueue<T>ConcurrentStack<T>ConcurrentBag<T>ConcurrentDictionary<TKey, TValue>BlockingCollection<T>

IProducerConsumerCollection<T>

TPL - Colecciones de Concurrencia y Pipelines

Colecciones de concurrencia ideales para escenarios productor-consumidor.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

Métodos Importantes:EnqueueTryDequeueTryPeek

Lock-Free

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

-Esta coleccion es completamente libre de lock (lock-free)

-Usa compare and swap (CAS)

-Cuando falla una operacion CAS se pone en estado de contencion.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

-Produce (over-head).

-Mejora el rendimiento de la cola y otras colecciones thread-unsafe, en determinados escenarios.

-Nos facilita el trabajo con la concurrencia.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentQueue<T>

Caracteristicas importantes:

concurrentQueue.Enqueue(item);

if (concurrentQueue.TryPeek(out item)) {

DoSomething(item); }

if (concurrentQueue.TryDequeue(out item)) {

DoSomething(item); }

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Métodos Importantes:PushTryPopTryPeek

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Otros Métodos:

PushRangePopRange

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

-Similar a la coleccion ConcurrentQueue.

-Es una coleccion LIFO.

-Atomicidad en los metodos PushRange y PopRange reduce la cantidad de insersiones y extracciones concurrentes en la coleccion.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Caracteristicas importantes:

concurrentStack.Push(item);

if (concurrentStack.TryPeek(out item)){

DoSomething(item);}

if (concurrentStack.TryPop(out item)){

DoSomething(item);}

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Caracteristicas importantes:

Parallel.ForEach(Partitioner.Create(0, partCount), p => { concurrentStack.PushRange

( numberArray, p.Item1, p.Item2 - p.Item1 );

});

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentStack<T>

Sintaxis:

count = s.TryPopRange(numberArray, 0, numberArray.Length);

count = s.TryPopRange(numberArray);

Count sera la cantidad de objetos que fueron sacados del tope de la cola e insertados el el array.

Atomiciadad o costo…PushRange

TryPopRange

Push TryPop

No memoria adicional

No Over-Head

Igual concurrencia

Buen rendimiento

Menor concurrencia

Atomiciadad

Memoria adicional

Over-Head

TPL - Colecciones de Concurrencia y Pipelines

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

Métodos Importantes:AddTryTakeTryPeek

Nota:Colección donde el orden no importa.

Ideal para escenarios Productor – Consumidor.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

-Ideal para ciertos escenarios productor-consumidor.

-No es completamente lock-free.

-Bastante ineficiente, donde el hilo productor es distinto al consumidor.

-Mantiene una cola local para cada hilo que accede a ella.

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentBag<T>

Caracteristicas importantes:

sentencesBag.Add(s.ToString());

string sentence; if (_sentencesBag.TryTake(out sentence)) {

_capWordsInSentencesBag.Add (

CapitalizeWords(delimiterChars, sentence, '\\' )); }

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Métodos Importantes:AddTryAddTakeTryTakeCompleteAddingGetConsumerEnumerable

Ofrece soporte para Bounding y Blocking.

Ideal para escenarios Productor – Consumidor.

Ideal para implementaciones de pipelines.

Es un wrapper para una interfaz del tipo IProducerConsumerCollection<T>

Capacidad máxima opcional.

Permite cancelación a través de tokens.

Existen 2 tipos de enumeraciones con foreach:1. Enumeración de solo lectura.2. Enumeración que elimina los elementos que han sido enumerados(P-C).

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

BlockingCollection<int> stackBC = new BlockingCollection<int>(new ConcurrentStack<int>());

BlockingCollection<int> bagBC = new BlockingCollection<int>(new ConcurrentBag<int>());

BlockingCollection<int> bc = new BlockingCollection<int>(count);

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

IsCompleted, IsAddingCompleted.

string aux; while (!sentences.IsCompleted) { if (sentences.TryTake(out aux)) upperSentences.Add(aux.ToUpper()); } upperSentences.CompleteAdding();

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

GetConsumingEnumerable()

foreach (var item in upperSentences.GetConsumingEnumerable()) { finalSentences.Add(item.Replace("U", "")); } upperSentences.CompleteAdding();

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: if (!_sentencesBC.TryAdd(newSentence, 2000, cancToken)) { throw new TimeoutException( "_sentencesBC took more than 2 seconds to add an item"); } catch (OperationCanceledException ex) { // The operation was cancelled break; }

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes:

BlockingCollection<TOutput>.AddToAny(Output, result, _token);

BlockingCollection<TOutput>.TryAddToAny(Output, result, timeOut, _token);

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: BlockingCollection<TOutput>.AddToAny(array, item, _token);

BlockingCollection<TOutput>.TryAddToAny(array, item, timeOut, _token);

Estos metodos devuelven el indice de la coleccion, en el array de colecciones, a la cual se le agrego el elemento.

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

Caracteristicas importantes: BlockingCollection<TOutput>.TakeFromAny(array, out item, _token);

BlockingCollection<TOutput>.TakeFromAny(array, out item, timeOut, _token);

Estos metodos devuelven el indice de la coleccion, en el array de colecciones, de la cual se elimino el elemento.

TPL - Colecciones de Concurrencia y Pipelines

BlockingCollection<T>

-Facilita el trabajo con las colecciones thread-safe.

-Produce Over-Head.

-Disminuye la cantidad y simplfica la complejidad del codigo.

-Ideal para la implementacion de pipelines(Ejemplo)

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentDictionary<T>

Métodos Importantes:AddOrUpdateGetEnumeratorGetOrAddTryAddTryGetValueTryRemoveTryUpdate

Lock-Free para

operaciones de lectura

Sintaxis:ConcurrentDictionary<TKey, TValue >()ConcurrentDictionary<TKey, TValue >(Int32, Int32)

int initialCapacity = 100;

int concurrencyLevel = Environment.ProcessorCount * 2;

ConcurrentDictionary<int, int> cd = new ConcurrentDictionary<int, int>(concurrencyLevel, initialCapacity);

for (int i = 0; i < 64; i++) cd[i] = i * i;

Console.WriteLine(“23² is {0} (should be {1})", cd[23], 23 * 23);

TPL - Colecciones de Concurrencia y Pipelines

ConcurrentDictionary<Tkey, TValue>

Caracteristicas importantes: _rectanglesDict.AddOrUpdate( newKey, newRect, (key, existingRect) => {if (existingRect != newRect) { lock (existingRect) { existingRect.Update( newRect.Location, newRect.Size); } return existingRect; } else { return existingRect; } });

Async - Programación Asincrónica

C# 5.0 - Async

Asynchronous

Programming

Model(APM)

Event-based

Asynchronous

Pattern(EAP)

Task Asynchrono

us Pattern(TAP

)

Async - Programación Asincrónica

Patrones estándares

TaskFactory.FromAsync TaskCompletionSource

Idea

Sincrónico = Asincrónico

TAP

APM

EAP

Async

Async - Programación Asincrónica

Microsoft Visual Studio Async Community Technology Preview (CTP). ( Visual Studio Async CTP )

Nuevas keywords:

async:

await:

Marca a métodos o expresiones lambdas como asincrónicas.Retiene el control hasta que la operación asincrónica termine.

Objetivo:

Programación asincrónica = Programación sincrónica.

Fin de los métodos callback.

Escribir códigos simples y fáciles de entender.

Async - Programación Asincrónica

public int SumPageSizes(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = new WebClient().DownloadData(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);return total;

}

Versión Sincrónica

Problemas:Bloquea la interfaz de usuario.

No nos va enseñando el estado de la descarga.

Solución Versión Asincrónica

public void SumPageSizesAsync(IList<Uri> uris){ SumPageSizesAsyncHelper(uris.GetEnumerator(), 0);}

private void SumPageSizesAsyncHelper(IEnumerator<Uri> enumerator, int total){

if (enumerator.MoveNext()){

statusText.Text = string.Format("Found {0} bytes ...", total);var client = new WebClient();client.DownloadDataCompleted += (sender, e) => SumPageSizesAsyncHelper(enumerator, total + e.Result.Length);client.DownloadDataAsync(enumerator.Current);

}else{

statusText.Text = string.Format("Found {0} bytes total", total);enumerator.Dispose();

}}

Problemas:Hay que romper el foreach.

En cada llamado se ancla un evento.El código es recursivo.

No retorna el total una ves calculado.

Versión Asíncrona con EAP

Async - Programación Asincrónica

ConclusionesEl método anterior es asincrónico con una sola llamada asincrónica y una sola estructura de control alrededor de esta. Imagínense más llamadas asincrónicas y más estructuras de control, sería un verdadero caos.

Primera solución: Utilizar APM o EAP con las nuevas clases de TPL.

Segunda solución: Utilizar TAP(junto a async).

public async Task<int> SumPageSizesAsyncBest(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());return total;

}

Async - Programación Asincrónica

Async - Programación Asincrónica

let asyncProcessFile (filePath : string) = async { printfn "Procesando fichero [%s]" (Path.GetFileName(filePath)) use fileStream = new FileStream(filePath,FileMode.Open) let bytesToRead = int fileStream.Length let! data = fileStream.AsyncRead(bytesToRead) //Returna un objeto Async<byte[]> printfn “Se leyeron [%d] bytes" data.Length use resultFile = new FileStream(filePath + ".results", FileMode.Create) do! resultFile.AsyncWrite(data,0,data.Length) printfn "Finalizado el procesamiento del archivo [%s]" <| Path.GetFileName(filePath) } |> Async.Start

asyncProcessFile "./testAsync.txt"Console.ReadKey();

async en F#

Async - Programación Asincrónica

Retorno

public async void SumPageSizesAsyncBestOther(IList<Uri> uris){int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());

}

Task Void( Fire and forget )

private async void sumButton_Click(object sender, RoutedEventArgs e) {    sumButton.IsEnabled = false;    await SumPageSizesAsync(GetUrls()));    sumButton.IsEnabled = true;}

( Incluyendo genéricas)

TPL - Async - Programación asincrónica

Código final propuesto

public async Task<int> SumPageSizesAsyncBetter(IList<Uri> uris){

var tasks = from uri in uris select new WebClient().DownloadDataTaskAsync(uri);var data = await TaskEx.WhenAll(tasks);return await TaskEx.Run(() =>data.Sum(s => s.Length));

}

Nota:Se propone incluir Run() y WhenAll() en la clase Task cuando async arrive a su versión final; mientras este en CTP se alojarán en una clase de prueba llamada TaskEx.

TPL - Async - Programación asincrónica

¿Como funciona async?

public static async Task DoSum(int from,int to){

int result = await Sum(from, to);string param = result.ToString() + "\r\n";File.AppendAllText(@"./result.txt", param);

}

public static Task<int> Sum(int from, int to){

Task<int> sum = TaskEx.Run(() =>{

int result = 0;for (int i = from; i <= to; i++){

TaskEx.Delay(500);result += i;

}return result;

});return sum;

}

public static Task DoSum(int from,int to){var task1 = Task.Factory.StartNew(() => Sum(from,to));

return task1.ContinueWith((antecedentTask) =>{string param = antecedentTask.Result.Result.ToString() + \r\n";File.AppendAllText(@"./result.txt", param);});

}

public static Task<int> Sum(int from, int to){

Task<int> sum = TaskEx.Run(() =>{

int result = 0;for (int i = from; i <= to; i++){

TaskEx.Delay(500);result += i;

}return result;

});return sum;

}

Original Transformado por el compilador

TPL - Async - Programación asincrónica

Como funciona async

public static async Task DoSum(int from,int to){

int result = await Sum(from, to);string param = result.ToString() + "\r\n";File.AppendAllText(@"./result.txt", param);

}

static void Main(string[] args){

int number;string input;Task myTask = new Task(Console.WriteLine);

while (true){

Console.WriteLine("Entre un número: ");input = Console.ReadLine();if (string.Empty == input)break;number = int.Parse(input);myTask = DoSum(1, number);

} myTask.Wait();}

Callback En cuanto la tarea finalice.

Continua la ejecución.

Retorna una tarea

TPL - Async - Programación asincrónica

public async Task<int> SumPageSizesAsyncBest(IList<Uri> uris){

int total = 0;foreach (var uri in uris){

statusText.Text = string.Format("Found {0} bytes ...", total);var data = await new WebClient().DownloadDataTaskAsync(uri);total += data.Length;

}statusText.Text = string.Format("Found {0} bytes total", total);listBox1.Items.Add(total.ToString());return total;

}

Retornando valores desde async

TPL - Async - Programación asincrónica

Cancelación desde async

public async void Inicio(Program program) { cts = new CancellationTokenSource(); program.Hola(cts.Token); Thread.Sleep(1000); if (cts != null) cts.Cancel(); }

public async Task Hola(CancellationToken ct) { Console.WriteLine("Before await"); await TaskEx.Delay(5000); ct.ThrowIfCancellationRequested(); Console.WriteLine("After await"); }

let cancelableTask = async { printfn "Waiting 10 seconds..." for i = 1 to 10 do printfn "%d..." i do! Async.Sleep(1000) printfn "Finished!" }

// Callback used when the operation is canceledlet cancelHandler (ex : OperationCanceledException) = printfn "The task has been canceled."

Async.TryCancelled(cancelableTask,cancelHandler)|>Async.StartThread.Sleep(2000)Async.CancelDefaultToken()

C# F#

TPL - Async - Programación asincrónica

Excepciones desde async

try{ string txt = await w.DownloadStringTaskAsync(url);}catch(WebException x){ --- Handle exception.}

let asyncOperation = async { try // ... with | :? IOException as ioe -> printfn "IOException: %s" ioe.Message | :? ArgumentException as ae -> printfn "ArgumentException: %s" ae.Message }

Las excepciones se manejan igual que de

manera sincrónica.