PDA

Просмотр полной версии : [Статья] Программистов параллельных тред


Yukikaze
16.11.2012, 05:33
Здравствуйте, жуковцы. Пришла пора для еще одной познавательной статьи, и так начнем.
Думаю каждый из вас сталкивался с необходимостью распаралелить и синхронизировать потоки и заодно получить результат их работы, некоторые справлялись с этой задачей, некоторые нет, а некоторые использовали костыли которые страшно представить. В этой статье я покажу способ который в некоторой степени упростит вам работу с потоками и поможет распаралелить выполнение ваших задач.

Meet the class Task<T, TResult>
{
internal delegate TResult MethodHandler(T arg);
internal delegate void CompleteHandler(object sender, TResult e);

private bool isComplite;
private bool inProgress;
private readonly AsyncCallback _callback;
public T Argument { get; set; }
public MethodHandler Handler { get; set; }
public event CompleteHandler Complete;
public bool IsComplete { get { return isComplite; } }
public bool InProgress { get { return inProgress; } }

public Task(){ }
public Task(MethodHandler func, T arg)
{
this._callback = callback;
this.Argument = arg;
this.Handler = func;
}

private void callback(IAsyncResult ar)
{
var sender = (Task<T, TResult>)ar.AsyncState;
TResult result = sender.Handler.EndInvoke(ar);
OnComplete(result);
}

public void Execute()
{
if (Handler == null)
throw new NullReferenceException("Handler property can't be null");
isComplite = false;
inProgress = true;
Handler.BeginInvoke(Argument, _callback, this);
}

private void OnComplete(TResult e)
{
isComplite = true;
inProgress = false;
CompleteHandler handler = Complete;
if (handler != null) handler(this, e);
}
}
Данный класс очень прост в использовании, достаточно создать экземпляр с генерик параметрами типов входных и выходных данных.
Пример:
Task<int, string> task = new Task<int, string>(i => i.ToString("X2"), 57005);
int в треугольных скобках обозначает тип входящих данных, в примере это число 57005, а string это тип возвращаемый функцией переданной в первый i => i.ToString("X2").
Теперь можно запустить выполнение методом Execute():
Task<int, string> task = new Task<int, string>(i => i.ToString("X2"), 57005);
task.Execute();
Если вы запустили вышеприведенные строки то наверное уже заметили, что нихрена не происходит, это все потому что метод выполнился асинхронно и по окончанию вычислений спровоцировал событие, что-бы увидеть результат нужно подписаться на это событие.
Пример:
static void Main(string[] args)
{
Task<int, string> task = new Task<int, string>(i => i.ToString("X2"), 57005);
task.Complete += TaskComplete;
task.Execute();
Console.ReadKey(true);
}

private static void TaskComplete(object sender, string s)
{
Thread.Sleep(3000);
Console.WriteLine("0x" + s);
}

Теперь запустив приложение, через 3 секунды, мы увидим результат - надпись 0xDEAD. При этом приложение не повиснет на время выполнения кода.

Пример:
static void Main(string[] args)
{
Task<string, bool>(Func, "login1;password666");
task.Complete += TaskComplete;
task.Execute();
Console.ReadKey(true);
}

private static void TaskComplete(object sender, bool b)
{
var loginpassword = ((Task<string, bool>) sender).Argument;
Console.WriteLine("User: {0} Login successful: {1}", loginpassword, b);
}

private static bool Func(string s)
{
string[] pair = s.Split(';');
WebClient client = new WebClient();
string page = client.DownloadString(string.Format("[Ссылки могут видеть только зарегистрированные и активированные пользователи]{0}&password={1}", pair[0], pair[1]));
return page.Contains("Login successful");
}


class TaskQueue<T, TResult>
{
internal delegate void CompleteHandler(object sender, TResult e);

public event CompleteHandler Complete;

private int _limit;
private readonly object _sync = new object();
private readonly AutoResetEvent wait = new AutoResetEvent(false);
private readonly List<Task<T, TResult>> inProgress = new List<Task<T, TResult>>();
public Queue<Task<T, TResult>> Tasks;
public int ThreadLimit
{
get { return _limit + 1; }
set { _limit = value <= 0 ? 0 : value - 1; }
}

public TaskQueue(IEnumerable<Task<T, TResult>> tasks)
{
Tasks = new Queue<Task<T, TResult>>(tasks);
}

public TaskQueue(params Task<T, TResult>[] tasks)
{
Tasks = new Queue<Task<T, TResult>>(tasks);
}

public void Begin()
{
while (Tasks.Count > 0)
{
Task<T, TResult> temp = Tasks.Dequeue();
temp.Complete += TempOnComplete;
lock (_sync) inProgress.Add(temp);
temp.Execute();
if (inProgress.Count > 1) wait.WaitOne();
}
}

private void TempOnComplete(object sender, TResult result)
{
wait.Set();
lock (_sync) inProgress.Remove((Task<T, TResult>)sender);
OnComplete(sender, result);
}

protected virtual void OnComplete(object sender, TResult e)
{
CompleteHandler handler = Complete;
if (handler != null) handler(sender, e);
}
} в догонку
На вход подаем коллекцию тасков, подписываемся на событие Complete, задаем кол-во потоков и запускаем.
Пример:
static void Main()
{
List<Task<string, bool>> t = new List<Task<string, bool>>();
for (int i = 0; i < 100; i++)
{
Task<string, bool> temp = new Task<string, bool>(Func, "login;password" + i);
t.Add(temp);
}
TaskQueue<string, bool> h = new TaskQueue<string, bool>(t);
h.Complete += TaskComplete;
h.ThreadLimit = 4;
h.Begin();
Console.ReadKey(true);
}

private static void TaskComplete(object sender, bool b)
{
var loginpassword = ((Task<string, bool>) sender).Argument;
Console.WriteLine("User: {0} Login successful: {1}", loginpassword, b);
}

private static bool Func(string s)
{
string[] pair = s.Split(';');
WebClient client = new WebClient();
string page = client.DownloadString(string.Format("[Ссылки могут видеть только зарегистрированные и активированные пользователи]{0}&password={1}", pair[0], pair[1]));
return page.Contains("Login successful");
}

Пользуйтесь на здоровье, добра вам/md

Upadate #1
AsyncTask - безопасное выполнение в STA потоке
delegate TResult ThreadStart<in T, out TResult>(T arg);
class AsyncTask<T, TResult>
{
public event EventHandler<ExecuteCompletedEventArgs<TResult>> Complete;

public bool IsBusy { get; private set; }
public ThreadStart<T, TResult> Handler
{
get { return _threadStart; }
set { _threadStart = value; }
}

private ThreadStart<T, TResult> _threadStart;
private readonly SendOrPostCallback _completed;
private AsyncOperation _asyncOperation;

public AsyncTask(ThreadStart<T, TResult> func)
{
_threadStart = func;
_completed = AsyncOperationCompleted;
}

private void AsyncOperationCompleted(object state)
{
IsBusy = false;
OnComplete((ExecuteCompletedEventArgs<TResult>) state);
}

public bool Start(T arg)
{
if (IsBusy) return false;
IsBusy = true;
_asyncOperation = AsyncOperationManager.CreateOperation(arg);
_threadStart.BeginInvoke(arg, Callback, this);
return true;
}

private void Callback(IAsyncResult ar)
{
TResult result = _threadStart.EndInvoke(ar);
ExecuteCompletedEventArgs<TResult> e = new ExecuteCompletedEventArgs<TResult>(result);
_asyncOperation.PostOperationCompleted(_completed, e);
}

protected virtual void OnComplete(ExecuteCompletedEventArgs<TResult> e)
{
EventHandler<ExecuteCompletedEventArgs<TResult>> handler = Complete;
if (handler != null) handler(this, e);
}
}

class ExecuteEventArgs<T, TResult> : EventArgs
{
public T Argument { get; private set; }
public TResult Result { get; set; }
public ExecuteEventArgs(T arg)
{
this.Argument = arg;
}
}

public sealed class ExecuteCompletedEventArgs<T> : EventArgs
{
public T Result { get; private set; }
public ExecuteCompletedEventArgs(T arg)
{
this.Result = arg;
}
}

Sinyss
16.11.2012, 11:28
всего 1 поток задержался, рандом был благосклонен /dgs
к TaskComplete надо отдельно потокобезопасность прикручивать или она гарантируется таском?

warl0ck
16.11.2012, 14:31
спасибо, хороший класс. надо бы сохранить в блокнотик, что-бы потом с памяти не писать.

Yukikaze
16.11.2012, 16:54
потокобезопасность прикручивать или она гарантируется таском?
Таск не может быть потокобезопасным, так же как не может быть и не безопасным, это то же самое, что спросить - этот поток потокобезопасен? Класс Таск это обертка над потоком, по этому использование MethodInvoker mi = () => listBox1.Items.Add(s);
if (listBox1.InvokeRequired)
listBox1.Invoke(mi);
else
listBox1.Items.Add(s);
обязательно из за небезопасности контролов и формы

Добавлено через 4 часа 36 минут
Всерьез задумался над тем, как все таки BackgroundWorker вызывает событие в STAThread не спровоцировал исключение, в общем покопался в сорсах и узнал пару интересных вещей. Результат в первом посте.

Если в кратце то асинхронное выполнение проходит через экземпляр класса AsyncOperation который в свою очередь перенаправляет выполнение в главный поток

Sinyss
18.11.2012, 02:06
Я понимаю что это создавалось исключительно в самообразовательных целях, но все же стоит почитать:
Метод KISS ([Ссылки могут видеть только зарегистрированные и активированные пользователи] %29).
Как два программиста хлеб пекли ([Ссылки могут видеть только зарегистрированные и активированные пользователи]).

PS: О, напишу цикл статей про паттерны и где их надо использовать...

Yukikaze
18.11.2012, 02:19
Куда же еще больше упрощать, это же и так максимально упрощенный генерик бекграундворкер с некоторыми модификациями.
ЗЫ Как по мне использование шаблонов это хорошая практика для предотвращения постоянного анбоксинга при касте <тип> <-> object <-> <тип>

Sinyss
18.11.2012, 02:24
Куда же еще больше упрощать, это же и так максимально упрощенный генерик бекграундворкер с некоторыми модификациями.
А ведь можно было наследовать от него и дописать нужные реализации событий, ну и добавить в нужном моменте вызов события ReportProgress... по сути он будет всегда в одном и том же месте...

Yukikaze
18.11.2012, 02:40
Sinyss, ну это невозможно как минимум потому, что нельзя перегрузить события, а это убивает всю задумку

Sinyss
18.11.2012, 03:10
Sinyss, ну это невозможно как минимум потому, что нельзя перегрузить события, а это убивает всю задумку
Почему нельзя перезагрузить события? Там все нужные функции virtual, да и конкретную реализацию нужного события мы всегда можем переписать на нужный нам метод...

backgroundWorker1.ProgressChanged += new ProgressChangedEventHandler( backgroundWorker1_ProgressChanged);
// какие то действия
backgroundWorker1.ProgressChanged += new ProgressChangedEventHandler( backgroundWorker1_New_ProgressChanged);

Вот мы и перегрузили событие...

ИМХО событие это просто ссылка на функцию, которую мы можем в любой момент заменить на новую реализацию не удаляя старую

Yukikaze
18.11.2012, 03:46
Sinyss, вообще-то в приведенном примере ты подписался на событие, а виртуалом отмечены только евент инвокаторы которые практически всегда одинаковые, с заглушкой от NullReferenceExeption'а
protected virtual void OnComplete(EventArgs e)
{
SomeDelegate handler = Complete;
if (handler != null) handler(this, e);
}

А вот переписать событие это уже другая ситуация, например имеем
delegate void SomeDelegate(object sender, EventArgs e);
следовательно событие будет выглядеть так
event SomeDelegate SomeEvent;
а делегат SomeDeleagte мне по какой то причине не подходит, например потому, что принимает в параметр только EventArgs, а мне нужен int.

В общем возможно добавить только новые события и вызывать их старыми инвокаторами.
Короче с таким же успехом можно сказать : "Используй ThreadPool вместо BackgroundWorker'а зачем усложнять" ведь BackgroundWorker это обертка над Delegate.BeginInvoke который в свою очередь передает выполнение функции в ThreadPool. ASyncTask и BackgroundWorker используют разные реализации одних и тех же действий.

Sinyss
18.11.2012, 13:46
Ну в принципе можно написать myEventArg наследуемый от EventArgs и передавать его через приведение типов... но да, это будет не элегантно...

lcd1232
17.04.2014, 01:07
А что если у меня функция требует несколько входных параметров? Как быть?

Hermein
17.04.2014, 05:15
функция требует несколько входных параметров?
Я лично использую лямбды в таких случаях.
ps: Экспериментировал со всеми вариациями распараллеливания и работает лучше всех Parallel, результаты тестов привести не смогу, ибо делались специфические с огромными математическими задачами(от 300кк итераций), но, возможно для небольших задач подойдут Thread или Task