crypt of decay - D, мультипоточная качалка [entries|archive|friends|userinfo]
ketmar

[ userinfo | ljr userinfo ]
[ archive | journal archive ]

D, мультипоточная качалка [Apr. 1st, 2015|10:43 am]
Previous Entry Add to Memories Tell A Friend Next Entry
[Tags|]

поскольку я иногда брутально выкачиваю всякие сайты (и нет, это не всегда порнуха), то автоматизирую я это дишечкой, понятно. и не секрет, что иногда выкачивать можно быстрее, если сначала построить список файлов, а потом сливать их несколькими потоками. по этому поводу я покажу вам, как удобно делается мультипоточная качалка на D.

код неполный, но представление о том, что я хочу показать, даст.
// ////////////////////////////////////////////////////////////////////////// //
import core.atomic;

__gshared string[] urlList; // it is protected by `synchronized`
shared usize urlDone;


void downloadThread (usize tnum, Tid ownerTid) {
  bool done = false;
  while (!done) {
    string url;
    usize utotal;
    receive(
      // usize: url index to download
      (usize unum) {
        synchronized {
          if (unum >= urlList.length) {
            // url index too big? done with it all
            done = true;
            cursorToInfoLine(tnum);
            write(«\r\e[0;1;31mDONE\e[0m\e[K»);
          } else {
            url = urlList[unum];
            utotal = urlList.length;
          }
        }
      },
    );
    // download file
    if (!done) {
      import std.exception : collectException;
      import std.file : mkdirRecurse;
      import std.path : baseName;
      string line;
      {
        import std.conv : to;
        line ~= to!string(tnum)~»: [«;
        auto cs = to!string(atomicLoad(urlDone)+1);
        auto ts = to!string(utotal);
        foreach (; cs.length..ts.length) line ~= ' ';
        line ~= cs~"/"~ts~"] "~nameNorm(url.baseName)~" ... ";
      }
      auto pbar = PBar2(line, atomicLoad(urlDone), utotal);
      int oldPrc = -1, oldPos = -1;
      auto conn = HTTP();
      conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
      conn.onProgress = (scope usize dlTotal, scope usize dlNow, scope usize ulTotal, scope usize ulNow) {
        if (dlTotal > 0) {
          pbar.setTotal0(dlTotal);
          pbar[0] = dlNow;
        }
        synchronized {
          pbar[1] = urlDone;
          cursorToInfoLine(tnum);
          pbar.draw();
        }
        return 0;
      };
      collectException(mkdirRecurse(destDir));
      string fname = destDir~"/"~nameNorm(url.baseName);
      download(url, fname, conn);
    }
    // signal parent that we are idle
    ownerTid.send(tnum);
  }
}


// ////////////////////////////////////////////////////////////////////////// //
struct ThreadInfo {
  Tid tid;
  bool idle;
}

ThreadInfo[6] threads;


void startThreads () {
  foreach (immutable usize idx; 0..threads.length) {
    synchronized ++threadCount;
    threads[idx].idle = true;
    threads[idx].tid = spawn(&downloadThread, idx, thisTid);
  }
}


void stopThreads () {
  foreach (immutable usize idx; 0..threads.length) {
    threads[idx].idle = false;
    threads[idx].tid.send(usize.max); // 'stop' signal
  }
  for (;;) {
    usize idleCount = 0;
    foreach (ref trd; threads) if (trd.idle) ++idleCount;
    if (idleCount == threads.length) break;
    receive(
      (uint tnum) { threads[tnum].idle = true; }
    );
  }
}


// ////////////////////////////////////////////////////////////////////////// //
// и, в общем-то, фрагмент `main()`:
    writeln("downloading index page...");
    string page;
    {
      auto conn = HTTP();
      conn.setUserAgent("Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)");
      page = cast(string)get(pageURL, conn);
    }
    writeln("parsing page...");
    urlList = collectUrls(page);
    if (urlList.length == 0) {
      import core.exception : ExitException;
      writeln("FATAL: no urls found!");
      throw new ExitException();
    }
    destDir = getOutDir(urlList[0], pathPfx);
    if (!forceDown && destDir != "_down") {
      import std.file : exists;
      if (destDir.exists) {
        import core.exception : ExitException;
        writeln("ERROR: duplicate download: '", destDir, "'");
        throw new ExitException();
      }
    }
    writeln("downloading to '", destDir, "'...");
    startThreads();
    while (urlDone < urlList.length) {
      // find idle thread and send it url index
      usize tnum;
      for (;;) {
        for (tnum = 0; tnum < threads.length; ++tnum) if (threads[tnum].idle) break;
        if (tnum < threads.length) break;
        // no idle thread found, wait for completion message
        receive(
          (uint tnum) { threads[tnum].idle = true; }
        );
        // and try again
      }
      usize uidx = atomicLoad(urlDone);
      atomicOp!"+="(urlDone, 1);
      threads[tnum].idle = false;
      threads[tnum].tid.send(uidx);
    }
    // all downloads sheduled; wait for completion
    stopThreads();
    addProcessedUrl(pageURL);
    removeInfoLines();
    writeln(atomicLoad(urlDone), " images downloaded");

понятно, что можно не крутиться в цикле, занимаясь поиском бездельничающего потока, потому что мы и так в сообщении получаем его номер, и вообше код чуть почистить. но идея была не в этом, а в том, чтобы показать, как прельстиво делать такую фигню с message passing concurrency.

конечно, в стандартной библиотеке есть намного более продвинутые вещи для многопоточности: в принципе, можно было обойтись простым foreach по списку url и сгрузить остальное на автораспараллеливатель, но во-первых, я об этом забыл, а во-вторых, это выглядит не так заумно.
Linkmeow!

Comments:
From:(Anonymous)
Date:April 1st, 2015 - 10:03 am
(Link)
поебота этот message passing, если нет никакой хитрожопой коммуникации потоков, всегда можно тупо в стандартный TaskPool задания загнать. или же в твоём случае потоки не должны пассивно ждать, пока их ссылками снабдят для скачивания, а пусть сами же и читают их в цикле из общего списка, защищённого обычным мьютексом.

не убедил, короче.
[User Picture]
From:[info]ketmar
Date:April 1st, 2015 - 10:08 am
(Link)
тяжело не уметь читать, да?
From:(Anonymous)
Date:April 1st, 2015 - 10:13 am
(Link)
> я покажу вам, как удобно делается мультипоточная качалка на D
>> портянка с какими-то пиздовывертами
>>> удобно
меня этот момент смущает
[User Picture]
From:[info]ketmar
Date:April 1st, 2015 - 10:19 am
(Link)
попробуй иногда дочитывать пост до конца. говорят, во многих случаях это помогает не выглядеть дураком.
From:(Anonymous)
Date:April 1st, 2015 - 12:22 pm
(Link)
осталось отобразить скачанное и личный браузер готов!
так победим
From:(Anonymous)
Date:April 1st, 2015 - 12:28 pm
(Link)
На нормальном языке это всё, наверное, пять строчек займет.
[User Picture]
From:[info]ketmar
Date:April 1st, 2015 - 12:32 pm
(Link)
ещё один дебил. у вас тут перепись тех, кто не умеет дочитывать, что ли?
[User Picture]
From:[info]newfoundland
Date:April 1st, 2015 - 05:01 pm
(Link)
а нахуй это кому-то дочитывать? жизнь она одна, всякого спама вокруг и так дохуя.
[User Picture]
From:[info]ketmar
Date:April 1st, 2015 - 05:05 pm
(Link)
а ещё я в неё ем!
From:(Anonymous)
Date:April 1st, 2015 - 11:25 pm
(Link)
1. юзерагент порадовал. но зачем калекой прикидываться-то? =)
2. отчего нет обработки редиректов, куков, рефереров и всяких прочих плюшек вроде созданения оригинального имени файла и таймстампа?
[User Picture]
From:[info]ketmar
Date:April 2nd, 2015 - 06:07 am
(Link)
1. потому что мне лень было выяснять, что там именно сайт хочет. взял первый под руку попавшийся.

2. потому что «оригинальный таймстамп» тебе никто не возвращает, и он нахуй не нужен. «оригинальное имя файла» точно так же нахуй ненужно. всё остальное обрабатывает libcurl.