Многопоточное программирование на C++
Реферат, 24 Января 2012, автор: пользователь скрыл имя
Краткое описание
Для начала поговорим о подводных камнях C++ применительно к MT программированию. На первый взгляд может показаться, что написанные на C++ потоки, не изменяющие одни и те же данные одновременно, могут работать абсолютно независимо и параллельно. К сожалению, это не совсем так: основной проблемой является стандартный распределитель памяти, т.е. операторы new/delete.
Прикрепленные файлы: 1 файл
СПО.docx
— 67.57 Кб (Скачать документ)С другой стороны, толковым программистам уже известно чем чреваты глобальные объекты, защищенные mutex-ами, а некоторые даже знают о том, что грамотный MT дизайн категорически не приветствует подобного рода решений!
Концептуально правильное решение проблемы получения итоговой статистики состоит в том, что каждый из рабочих потоков ведет свою собственную личную статистику, которую он может изменять абсолютно независимо и параллельно. А итоговая статистика получается в конце работы программы путем сложения их всех -- и никаких тебе mutex-ов!
Для создания личных
аргументов потока предназначена функция MainTask
Отмечу, что для хранения Stat аргументов потоков специально используется список list<Stat>, а не более привычный vector<Stat>. Дело в том, что в процессе добавления элементов в вектор используемая им память неоднократно перераспределяется, так что указатели на объекты Stat, ранее возвращенные функцией MainTask::proc_arg() инвалидируются. А воспользоваться вызовом vector::reserve() мы не можем в силу того, что MainTask ничего не должна знать о количестве рабочих потоков.
Сообщение о возникших ошибках, как последнее сообщение программы.
А вот еще одна серьезная задача, когда пресловутого mutex-а, казалось бы, уж точно не избежать! Суть проблемы в том, что если сразу же выводить на экран возникающие в процессе работы ошибки, то они вполне могут затеряться в выводе других потоков, продолжающих обработку своих сообщений. Решением задачи является запись сообщения об ошибке в специальный глобальный буфер, чтобы после окончания работы MainTask::proc() функция main() смогла его вывести последним.
В этом случае действительно имеет смысл завести общий для всех потоков буфер MainTask::gerr, распределенный с помощью глобального пула MainTask::gmp, и разграничить к нему доступ посредством привычной блокировки. Мы, конечно, могли бы завести по буферу в каждом аргументе потока и объединить их в конце работы точно так же, как мы объединяем статистику, но в данном случае усложнение кода программы неоправданно, т.к. потокам не нужно постоянно обращаться к этим данным и никаких дополнительных накладных расходов из-за данной блокировки не возникает.
Как ни крути, но очевидное решение с mutex-ом напрашивается само-собой и избежать его нам поможет только наблюдение о том, что однопоточный вариант работы программы ни в каких mutex-ах не нуждается, и даже более того: у нас должна быть возможность слинковать его с обычными, однопоточными версиями библиотек, никаких ссылок на mutex-ы, очевидно, не приемлющих. Так что прямое использование mutex-а отпадает... Как же быть?
К счастью, решение есть и оно заключается в использовании интерфейса task_opers, передаваемого в виде аргумента to в функцию MainTask::proc() thread_pool-ом. Он предоставляет функцию invoke(), которая позволяет вызывать переданный указатель на функцию с применением необходимой блокировки, т.е. необходимый (или нежелательный!) mutex автоматически обеспечивается самим thread_pool-ом:
void MainTask::proc(mem_pool& mp, const dq_vec& dqv, void* arg, task_opers& to)
{
data_queue& dq=*dqv[0];
Stat& st=*static_cast<Stat*>(arg);
приводим аргумент потока к его настоящему типу Stat
for (;;) {
shException exc(mp, 0);
try {
for (MsgIO mio(mp, dq); ; ) {
sh_ptr<Msg> msg=mio.read();
if (!msg.get()) break;
switch (msg->getType()) {
case Msg::FindFiles: {
doFindFiles(mp, dq, st, msg->to<FindFilesMsg>());
break;
}
}
}
return;
}
catch (shException she) { exc=she; }
catch
(...) { exc=recatchException(mp, _FLINE_); }
ErrData ed(gerr, toTextAll(exc));
to.invoke(addError, &ed);
заполняем ErrData и вызываем функцию addError() для добавления сообщения об ошибке
dq.set_intr(true);
break;
}
}
Конвертация исходного кода
Программа предназначена для рекурсивной конвертации файлов исходного кода в DOS/UNIX формат и/или удаления пробелов в конце строки.
Параметры командной строки: mtcnvsrc [-nt2] [d|u][e] mask[,mask2...]
| [-ntN] | количество потоков, 2 по умолчанию |
| [d] | конвертировать в DOS формат |
| [e] | удалять пробелы в конце строки |
| [u] | конвертировать в UNIX формат |
| mask[,mask2...] | маски для поиска файлов, могут содержать * и ? символы |
Примеры использования:
mtcnvsrc.exe ue *.h,*.hpp,*.c,*.cpp
mtcnvsrc.exe -nt4 e *.?pp
Как можно видеть, прямым назначением программы является исправление ошибок, найденных mtcksrc. При этом, автоматически исправляется только то, что не требует человеческого вмешательства.
Ну а если
говорить о сложности, то данная программа
скорее является небольшим подготовительным
этапом перед следующей, чем усложнением
предыдущей. Единственный момент, на который
можно обратить внимание -- это аргумент
потока в виде более общей структуры Arg.
На этот раз аргументы хранятся в векторе vector<sh_ptr<Arg> >, но это не приводит
к описанным выше проблемам, т.к. в процессе
их создания копируются и перераспределяются
элементы sh_ptr<Arg>, а не сами тяжеловесные Arg
структуры.
Сравнение директорий
Программа предназначена для рекурсивного сравнения директорий.
Параметры командной строки: mtdirdiff [-nt2] old_dir new_dir diff_dir
| [-ntN] | количество потоков, 2 по умолчанию |
| old_dir | директория со старыми файлами |
| new_dir | директория с новыми файлами |
| diff_dir | директория для копирования изменений в виде add, del и mod поддиректорий |
Примеры использования:
mtdirdiff.exe src\mtprog.sav
src\mtprog src\mtprog.diff
А здесь мы уже
имеем дело с небольшой, но достаточно
нетривиальной многопоточной
Прежде всего отмечу, что это первая программа, многопоточная работа которой разбита на несколько этапов, т.е. функция thread_pool::exec() вызывается неоднократно:
sh_thread_pool tp=(clp.numThr>1) ? new_thread_pool(mp, clp.numThr-1) :
new_thread_pool(mp);
MainTask
mt(clp.oldDir, clp.newDir, clp.diffDir);
sh_data_queue dq=new_data_queue(mp);
dq_vec
dqv(1, dq.get());
MsgIO mio(mp, *dq);
{
FindFilesMsg m1(&mt.oldDir), m2(&mt.newDir);
mio.write(m1);
mio.write(m2);
}
tp->exec(mt,
dqv);
{
CompareMsg m1(true), m2(false);
mio.write(m1);
mio.write(m2);
}
mt.argno=0;
tp->exec(mt, dqv);
И, как следствие, устройство функции proc_arg() немного усложнено:
void* MainTask::proc_arg()
{
assert(argno<=int(args.size()
if (argno==int(args.size()))
args.push_back(newArg(gmp));
return args[argno++].get();
}
Необходимость
разбиения на этапы возникает
из желания эффективным и
Сначала не более
двух рабочих потоков производят
поиск файлов и директорий с занесением
результатов в объекты mt.
Кстати сказать,
проверка данной гипотезы является неплохим
упражнением для
Затем стартует сравнение просканированных директорий, причем потокам не нужно синхронизировать свой доступ к hash_vec-рам с именам, т.к. их содержимое уже никем не изменяется.
Как можно видеть, без разбиения на этапы потокам пришлось бы синхронизировать свой доступ к hash_vec-рам, что неприемлемо с точки зрения правильного MT дизайна.
Структура DirCont использует свой собственный пул для имен директорий и файлов:
struct DirCont {
mem_pool ownmp;
sh_text name;
hash_vec<sh_text, char> dirs;
hash_vec<sh_text, unsigned long> files;
DirCont(const ch_rng& nm) : name(nt(ownmp, nm)), dirs(101), files(1001)
{}
};
Здесь мы имеем тот редкий случай, когда в одной точке приложения одновременно сходятся несколько объектов mem_pool, принадлежащих разным потокам:
объект ownmp, принадлежащий соответствующей структуре mt.oldDir или mt.newDir
привычный объект mp, являющийся временным личным объектом рабочего потока, создаваемым thread_pool-ом на время работы функции MainTask::proc()