Многопоточное программирование на C++

Реферат, 24 Января 2012, автор: пользователь скрыл имя

Краткое описание


Для начала поговорим о подводных камнях C++ применительно к MT программированию. На первый взгляд может показаться, что написанные на C++ потоки, не изменяющие одни и те же данные одновременно, могут работать абсолютно независимо и параллельно. К сожалению, это не совсем так: основной проблемой является стандартный распределитель памяти, т.е. операторы new/delete.

Прикрепленные файлы: 1 файл

СПО.docx

— 67.57 Кб (Скачать документ)

1. Многопоточное программирование на C++

  Работа с памятью

Для начала поговорим  о подводных камнях C++ применительно  к MT программированию. На первый взгляд может показаться, что написанные на C++ потоки, не изменяющие одни и те же данные одновременно, могут работать абсолютно независимо и параллельно. К сожалению, это не совсем так: основной проблемой является стандартный  распределитель памяти, т.е. операторы new/delete. Суть проблемы в том, что приложение получает свободные блоки памяти от системы и выдает их части потокам  по требованию, т.е. в процессе своей  работы потоки вынуждены изменять одни и те же структуры данных (глобальные цепочки свободных блоков) и, следовательно, синхронизация их работы неизбежна.

Прямым решением данной проблемы является повсеместное явное использование собственного распределителя памяти, кэширующего  полученные от глобального распределителя блоки. Т.е. своего объекта mem_pool для каждого отдельного потока (как минимум). Конечно, с точки зрения удобства кодирования повсеместное мелькание ссылок mem_pool& трудно назвать приемлемым -- стоит ли овчинка выделки? Давайте разберемся с помощью следующего примера:

example1/main.cpp
#include <list>

#include <vector>

#include <stdio.h>

#include <time.h>

#include <ders/stl_alloc.hpp>

#include <ders/thread.hpp> 

using namespace std;

using namespace ders; 

const int N=1000;

const int M=10000; 

void start_std(void*)

{

list<int> lst;

for (int i=0; i<N; i++) {

     for (int j=0; j<M; j++) lst.push_back(j);

     for (int j=0; j<M; j++) lst.pop_front();

}

} 

void start_ders(void*)

{

mem_pool mp;

stl_allocator<int> alloc(mp);

 

list<int, stl_allocator<int> > lst(alloc);

for (int i=0; i<N; i++) {

     for (int j=0; j<M; j++) lst.push_back(j);

     for (int j=0; j<M; j++) lst.pop_front();

}

} 

int main(int argc, char** argv)

{

if (argc!=3) {

    m1:

    fprintf(stderr, "main num_threads std|ders");

    return 1;

} 

int numThr=atoi(argv[1]);

if ( !(numThr>=1 && numThr<=100) ) {

    fprintf(stderr, "num_threads must be in [1, 100]");

    return 1;

} 

void (*start)(void*);

if (strcmp(argv[2], "std")==0) start=start_std;

else if (strcmp(argv[2], "ders")==0) start=start_ders;

else goto m1; 

clock_t c1=clock(); 

mem_pool mp;

vector<sh_thread> vthr;

for (int i=0; i<numThr; i++) vthr.push_back(new_thread(mp, start, 0));

for (int i=0; i<numThr; i++) vthr[i]->join(); 

clock_t c2=clock();

printf("%d\t%d\t%s\n", numThr, int(c2-c1), argv[2]); 

 return 0;

}

Программа запускает  заданное в командной строке количество потоков и в каждом из них выполнят фиксированное количество вставок/удалений элементов в стандартный список. Отличие функции start_ders состоит в том, что вместо стандартного аллокатора по умолчанию lst использует аллокатор на основе mem_pool.

  Работа с файлами

Второй пример более приближен к жизни и предназначен для измерения относительной производительности MT приложения, работающего с файлами. Работа с файлами главным образом "упирается" в производительность файловой подсистемы ОС, так что прикладной код, использующий системные вызовы ввода/вывода, вряд ли покажет существенную разницу производительности...

Или все же покажет?!

Как известно, стандартные потоки ввода/вывода C++ работают существенно медленнее потоков ввода/вывода C. Но, к сожалению, даже C-шные потоки FILE не подходят для MT приложений, т.к. все операции над ними обязаны быть thread-safe по умолчанию, а быстрые варианты функций getc() и putc(), не использующие пресловутые mutex-ы, имеют другие имена: getc_unlocked() и putc_unlocked() соответственно. Тем самым, написание кода, одинаково хорошо работающего как в ST, так и в MT окружении становится невозможным.

Главным образом, для решения этой проблемы и был  создан класс file, не использующий блокировок:

example2/main.cpp
#include <memory>

#include <vector>

#include <stdio.h>

#include <time.h>

#include <ders/file.hpp>

#include <ders/text_buf.hpp>

#include <ders/thread.hpp> 

using namespace std;

using namespace ders; 

const int BUF_SIZE=64*1024; 

struct MainData {

       const char* fname;

       MainData(const char* fn) : fname(fn) {}

}; 

struct ThreadData {

       MainData* md;

       int n; 

       ThreadData(MainData* md_, int n_) : md(md_), n(n_) {}

}; 

void start_std(void* arg)

{

ThreadData* td=(ThreadData*)arg;

auto_ptr<ThreadData> guard(td);

 

mem_pool mp;

file err(mp, fd::err); 

FILE* fin=fopen(td->md->fname, "rb");

if (!fin) {

    err.write(text_buf(mp)+"Can't open "+td->md->fname+'\n');

    return;

} 

 sh_text oname=text_buf(mp)+td->md->fname+'.'+td->n;

FILE* fout=fopen(oname->c_str(), "wb");

if (!fout) {

    err.write(text_buf(mp)+"Can't create "+oname+'\n');

    fclose(fin);

    return;

} 

setvbuf(fin, 0, _IOFBF, BUF_SIZE);

setvbuf(fout, 0, _IOFBF, BUF_SIZE); 

for (int ch; (ch=fgetc(fin))!=EOF; )

     fputc(ch, fout); 

fclose(fout);

fclose(fin);

} 

void start_ders(void* arg)

{

ThreadData* td=(ThreadData*)arg;

auto_ptr<ThreadData> guard(td);

 

mem_pool mp;

file err(mp, fd::err); 

file fin(mp);

if (!fin.open(td->md->fname, file::rdo, 0)) {

    err.write(text_buf(mp)+"Can't open "+td->md->fname+'\n');

    return;

} 

 sh_text oname=text_buf(mp)+td->md->fname+'.'+td->n;

file fout(mp);

if (!fout.open(oname, file::wro, file::crt|file::trnc)) {

    err.write(text_buf(mp)+"Can't create "+oname+'\n');

    return;

} 

buf_reader br(mp, fin, BUF_SIZE);

buf_writer bw(mp, fout, BUF_SIZE);

 

for (int ch; (ch=br.read())!=-1; )

     bw.write(ch);

} 

int main(int argc, char** argv)

{

mem_pool mp;

file err(mp, fd::err);

file out(mp, fd::out);

 

if (argc!=4) {

    m1:

    err.write("main file num_threads std|ders");

    return 1;

} 

int numThr=atoi(argv[2]);

if ( !(numThr>=1 && numThr<=100) ) {

    err.write("num_threads must be in [1, 100]");

    return 1;

} 

void (*start)(void*);

if (strcmp(argv[3], "std")==0) start=start_std;

else if (strcmp(argv[3], "ders")==0) start=start_ders;

else goto m1; 

MainData md(argv[1]); 

clock_t c1=clock(); 

vector<sh_thread> vthr;

for (int i=0; i<numThr; i++)

     vthr.push_back(new_thread(mp, start, new ThreadData(&md, i)));

for (int i=0; i<numThr; i++) vthr[i]->join(); 

clock_t c2=clock();

out.write(text_buf(mp)+numThr+'\t'+int(c2-c1)+'\t'+argv[3]+'\n'); 

 return 0;

}

Пример запускает  заданное количество потоков, в каждом из которых создается копия указанного в командной строке файла посредством  выбранной функции: start_std() или start_ders(). Потоками используется посимвольное копирование через буфер размера BUF_SIZE.  

  Многопоточные программы 

Коль  скоро с примерами уже покончено -- самое время перейти к программам. Удивительно, но факт: все из них (кроме mtftext, естественно) я ежедневно использую на практике! Т.е. даже этот функционально неполный инструментарий derslib, специально написанный для данной статьи, вполне подходит для создания реально полезных программ!

Пример

А теперь поговорим  о разработке примера, иллюстрирующего  суть технически грамотного подхода  к MT программированию. Нашей целью  является создание такой реализации, которая выполняет свою задачу вне  зависимости от количества доступных  рабочих потоков. Т.е. наличие дополнительного  потока может ускорить обработку  данных, но необходимым не является.

Более того, согласно замыслу прикладной код ничего не должен знать о количестве одновременно работающих потоков, да и вообще каким  бы то ни было образом ссылаться  на объекты типа mutex и thread.

Основная идея состоит в том, что общая задача приложения разбивается на подзадачи, которые могут быть выполнены  параллельно: тем самым мы разрешаем  параллельное одновременное выполнение, но ни в коем случае его не требуем! На первый взгляд фраза о том, что  мы не требуем параллельного выполнения может показаться избыточной, но это не так. Она имеет вполне определенный смысл, и он заключается в том, что отсутствие параллельно работающих потоков не приведет к остановке всего приложения, как это неизбежно происходит в стандартном примере производитель/потребитель с ограниченным буфером, когда отсутствие одного из них означает бесконечное ожидание оставшегося.

Учебной задачей  является все тот же поиск текста в файлах, но только многопоточный. Таким образом, суть задачи сводится к выполнению следующих операций:

Просматриваем все имена заданной директории.

Если найденное  имя является директорией, то нужно  произвести и ее просмотр.

Если найденное  имя является файлом и удовлетворяет  маске, то нужно просмотреть содержимое данного файла для поиска подходящих строк.

Операции 2 и 3, очевидно, можно выполнять параллельно  операции 1, а сам алгоритм, при  этом, будет выглядеть следующим  образом:

Помещаем в  очередь сообщений первое сообщение  типа FindFiles, содержащее имя корневой директории поиска.

С помощью одного или нескольких потоков начинаем обрабатывать очередь, а именно:

Пока очередь  не пуста и не находится в "прерванном" состоянии, начинаем извлекать сообщения.

Если извлечено  сообщение типа FindFiles -- обрабатываем директорию, порождая сообщения FindFiles и ScanFile.

Если извлечено  сообщение типа ScanFile -- обрабатываем файл, выводя найденные строки.

В случае возникновения  ошибки -- переводим очередь в "прерванное" состояние.

Проверяем состояние  очереди и завершаем работу с  соответствующим кодом возврата.

А теперь самое  время заглянуть в исходный код:

/** @file

* Main file of mtftext program.

*/ 

#include <vector>

#include <stdlib.h>

#include <ders/dir.hpp>

#include <ders/file.hpp>

#include <ders/text_buf.hpp>

#include <ders/thread_pool.hpp>

#include <ders/wldcrd_mtchr.hpp>

#include "msg.hpp" 

namespace mtftext {  // ::mtftext

Весь исходный код, кроме функции main(), естественно, заключен в namespace, совпадающий с именем программы.

using namespace ders; 

struct CmdLineParser {

Для разбора  аргументов командной строки используется специальная структура, чьи поля приближены к именам соответствующих  параметров. Следующая ниже структура MainTask содержит, в принципе, те же самые данные, но их именование отражает их смысл использования, а не видимый пользователю интерфейс командной строки.

       bool isS;

       int numThr;

       sh_text word;

       sh_text mask; 

       CmdLineParser(mem_pool& mp) : isS(false), numThr(0), word(nt(mp)), mask(

         nt(mp)) {} 

       void parse(int argc, char** argv);

}; 

struct MainTask : public task {

Главная структура  программы, параллельно выполняющая  свою функцию proc() с помощью класса ders::thread_pool.

       bool exitOnErr;

       sh_text srchPatt;

       wldcrd_mtchr fileMchr; 

       MainTask(bool eoe, sh_text sp, sh_text mk) : exitOnErr(eoe),

         srchPatt(sp), fileMchr(sp.pool(), mk) {} 

       virtual void destroy(mem_pool& mp2) { destroy_this(this, mp2); }

Типичная реализация чистой виртуальной функции destroy, косвенно унаследованной от интерфейса ders::destroyable.

       virtual void proc(mem_pool& mp, const dq_vec& dqv, void* arg, task_opers&

         to); 

       void doFindFiles(mem_pool& mp, data_queue& dq, const FindFilesMsg& msg);

       void doScanFile(mem_pool& mp, data_queue& dq, const ScanFileMsg& msg);

}; 

void CmdLineParser::parse(int, char** argv)

{

 const char* usage="mtftext [-s] num_threads word mask";

Командная строка имеет необязательный параметр -s (stop), предписывающий сразу же завершать работу при обнаружении ошибок. По умолчанию же в stderr записывается сообщение об ошибке и работа продолжается.

 mem_pool& mp=word.pool(); 

 char** it=argv;

 if (!*++it) throw newExitMsgException(mp, _FLINE_, usage, 1);

Возбуждаем исключение ExitMsgException в случае ошибки, которое используется для выхода из программы с заданным кодом возврата и, возможно, текстом сообщения. Похожего результата можно добиться и с помощью пары fprintf()/exit(), но в этом случае не будут вызваны деструкторы локальных объектов, что, вообще говоря, неприемлемо.

Информация о работе Многопоточное программирование на C++