поскольку я иногда брутально выкачиваю всякие сайты (и нет, это не всегда порнуха), то автоматизирую я это дишечкой, понятно. и не секрет, что иногда выкачивать можно быстрее, если сначала построить список файлов, а потом сливать их несколькими потоками. по этому поводу я покажу вам, как удобно делается мультипоточная качалка на D.
код неполный, но представление о том, что я хочу показать, даст.
// ////////////////////////////////////////////////////////////////////////// //
import core.atomic;
__gshared string[] urlList; // it is protected by `synchronized`
shared usize urlDone;
void downloadThread (usize tnum, Tid ownerTid) {
bool done = false;
while (!done) {
string url;
usize utotal;
receive(
// usize: url index to download
(usize unum) {
synchronized {
if (unum >= urlList.length) {
// url index too big? done with it all
done = true;
cursorToInfoLine(tnum);
write(«\r\e[0;1;31mDONE\e[0m\e[K»);
} else {
url = urlList[unum];
utotal = urlList.length;
}
}
},
);
// download file
if (!done) {
import std.exception : collectException;
import std.file : mkdirRecurse;
import std.path : baseName;
string line;
{
import std.conv : to;
line ~= to!string(tnum)~»: [«;
auto cs = to!string(atomicLoad(urlDone)+1);
auto ts = to!string(utotal);
foreach (; cs.length..ts.length) line ~= ' ';
line ~= cs~"/"~ts~"] "~nameNorm(url.baseName)~" ... ";
}
auto pbar = PBar2(line, atomicLoad(urlDone), utotal);
int oldPrc = -1, oldPos = -1;
auto conn = HTTP();
conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
conn.onProgress = (scope usize dlTotal, scope usize dlNow, scope usize ulTotal, scope usize ulNow) {
if (dlTotal > 0) {
pbar.setTotal0(dlTotal);
pbar[0] = dlNow;
}
synchronized {
pbar[1] = urlDone;
cursorToInfoLine(tnum);
pbar.draw();
}
return 0;
};
collectException(mkdirRecurse(destDir));
string fname = destDir~"/"~nameNorm(url.baseName);
download(url, fname, conn);
}
// signal parent that we are idle
ownerTid.send(tnum);
}
}
// ////////////////////////////////////////////////////////////////////////// //
struct ThreadInfo {
Tid tid;
bool idle;
}
ThreadInfo[6] threads;
void startThreads () {
foreach (immutable usize idx; 0..threads.length) {
synchronized ++threadCount;
threads[idx].idle = true;
threads[idx].tid = spawn(&downloadThread, idx, thisTid);
}
}
void stopThreads () {
foreach (immutable usize idx; 0..threads.length) {
threads[idx].idle = false;
threads[idx].tid.send(usize.max); // 'stop' signal
}
for (;;) {
usize idleCount = 0;
foreach (ref trd; threads) if (trd.idle) ++idleCount;
if (idleCount == threads.length) break;
receive(
(uint tnum) { threads[tnum].idle = true; }
);
}
}
// ////////////////////////////////////////////////////////////////////////// //
// и, в общем-то, фрагмент `main()`:
writeln("downloading index page...");
string page;
{
auto conn = HTTP();
conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
page = cast(string)get(pageURL, conn);
}
writeln("parsing page...");
urlList = collectUrls(page);
if (urlList.length == 0) {
import core.exception : ExitException;
writeln("FATAL: no urls found!");
throw new ExitException();
}
destDir = getOutDir(urlList[0], pathPfx);
if (!forceDown && destDir != "_down") {
import std.file : exists;
if (destDir.exists) {
import core.exception : ExitException;
writeln("ERROR: duplicate download: '", destDir, "'");
throw new ExitException();
}
}
writeln("downloading to '", destDir, "'...");
startThreads();
while (urlDone < urlList.length) {
// find idle thread and send it url index
usize tnum;
for (;;) {
for (tnum = 0; tnum < threads.length; ++tnum) if (threads[tnum].idle) break;
if (tnum < threads.length) break;
// no idle thread found, wait for completion message
receive(
(uint tnum) { threads[tnum].idle = true; }
);
// and try again
}
usize uidx = atomicLoad(urlDone);
atomicOp!"+="(urlDone, 1);
threads[tnum].idle = false;
threads[tnum].tid.send(uidx);
}
// all downloads sheduled; wait for completion
stopThreads();
addProcessedUrl(pageURL);
removeInfoLines();
writeln(atomicLoad(urlDone), " images downloaded");
понятно, что можно не крутиться в цикле, занимаясь поиском бездельничающего потока, потому что мы и так в сообщении получаем его номер, и вообше код чуть почистить. но идея была не в этом, а в том, чтобы показать, как прельстиво делать такую фигню с message passing concurrency.
конечно, в стандартной библиотеке есть намного более продвинутые вещи для многопоточности: в принципе, можно было обойтись простым foreach по списку url и сгрузить остальное на автораспараллеливатель, но во-первых, я об этом забыл, а во-вторых, это выглядит не так заумно. |