Gearman воркер для скачивания файлов по URl

Gearman, сервер организации и распределения задач, более подробно можно узнать на оффициальном сайте http://gearman.org/

Данную статью я пишу для тех кто уже знает для чего он нужен и как его устанавливать.  В одном из моих проектов gearman воркеры скачивают файлы по ссылке. Также они во время работы отсылают статусы о прогрессе. Вы можете запустить нужно количество воркеров в зависимости от доступного интернет канала и конфигурации серверов.

Собственно сам класс воркера

<?php 
 
class FetchWorker extends GearmanWorker {
 
	public $job;
	public $downloaded;
	public $tmp_file_name;
	public $tmp_file_pointer;
	public $download_size;
	public $username;
	public $pswd;
	public $url;
	public $referrer;
	public $last_part_time;
 
 
	private function resetvalues() {
		$this->download_size = 0;
		$this->downloaded = 0;
		$this->username = '';
		$this->pswd = '';
		$this->referrer = '';
 
	}
	private function closefp() {
		@fclose($this->tmp_file_pointer);
	}
 
	private	function remote_filesize()
	{	
		ob_start();
		$ch = curl_init($this->url);
		curl_setopt($ch, CURLOPT_HEADER, 1);
		curl_setopt($ch, CURLOPT_NOBODY, 1);
	    curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
	    curl_setopt($ch, CURLOPT_REFERER, $this->referrer);
 
		if(!empty($this->username) && !empty($this->pswd)) {
			curl_setopt($ch, CURLOPT_USERPWD, $this->username.':'.$this->pswd);
		}
 
		$ok = curl_exec($ch);
 
		curl_close($ch);
		$head = ob_get_contents();
		ob_end_clean();
 
		$regex = '/Content-Length:\s([0-9].+?)\s/';
		$count = preg_match($regex, $head, $matches);
 
		return isset($matches[1]) ? $matches[1] : 0;
	}
 
 
	function fetchfile($job) {
		$this->resetvalues();
 
		$this->job = $job;
		$workload= $this->job->workload();
 
		echo "Received job: " . $this->job->handle() . "\n";
 
		$args = json_decode($workload);
 
		if(!isset($args->url) || !isset($args->tmp_file)) {
			print "Url and tmp_file are required";
			return;
		}
 
		$this->url = $args->url;
		$this->pswd = (isset($args->password))? $args->password : '';
		$this->username = (isset($args->username))? $args->username : '';
		$this->referrer = (isset($args->referrer)) ? $args->referrer : $this->url;
 
		$this->download_size = $this->remote_filesize();
 
		if($this->download_size==0) {
			$this->resetvalues();
			print "Can't get size header of the remote file";
			return;
		}
 
		$this->tmp_file_name = $args->tmp_file;
		$this->tmp_file_pointer = fopen($this->tmp_file_name, "wb");
 
		if($this->tmp_file_pointer)
			$this->fetchurl();
		else {
				$this->resetvalues();
				print("Couldn't open tmp file");
				return;
			 }
	}
 
	function percent($num_amount, $num_total) {
		$count1 = $num_amount / $num_total;
		$count2 = $count1 * 100;
		$count = ceil($count2);
		return $count;
	}
 
	function fetchurl() {
 
		$ch = curl_init($this->url);	
		curl_setopt($ch, CURLOPT_NOPROGRESS, true);
		curl_setopt($ch, CURLOPT_WRITEFUNCTION, array($this, 'fetchcallback'));
		curl_setopt($ch, CURLOPT_REFERER, $this->referrer);
 
		curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
 
		if(!empty($this->username) && !empty($this->pswd)) {
			curl_setopt($ch, CURLOPT_USERPWD, $this->username.':'.$this->pswd);
		}
 
		curl_setopt($ch, CURLOPT_HEADERFUNCTION, array($this, 'readHeader'));
		curl_exec($ch);
		curl_close($ch);
	}
 
	function readHeader($ch, $headers_text) {
		return strlen($headers_text);
	}
 
	function fetchcallback($ch, $str) {
			$length =  strlen($str);
 
			$this->downloaded +=$length;
			if(!fwrite($this->tmp_file_pointer,$str)) {
				$this->resetvalues();
				print "Can't write to tmp file\n";
				return;
			} 
 
			if($this->downloaded >= $this->download_size) {
				 $this->resetvalues();
				 $this->job->sendStatus(100,100);
				 $this->job->sendComplete();
 
				 $this->closefp();			
				 print "100% Done\n";	
				 return;
			}		
 
			if($this->last_part_time > time()- 2) // 2 seconds
			return $length;
 
			$this->last_part_time = time();
			$this->job->sendStatus($this->percent($this->downloaded, $this->download_size) ,100);
 
			print $this->percent($this->downloaded, $this->download_size)."%...";
			return $length;	
	}
 
}

Как видите для скачивания используется curl.
Далее прилагаю код демона, который поднимает вышеуказанный класс. У меня это реализовано как консольная комманда Yii фреймворка, но вы можете реализовать демон как угодно.

Версия с использованием Yii

<?php 
//FetchCommand.php |  placed in commands folder of Yii protected dir
class FetchCommand extends CConsoleCommand
{
	public function run($args)
	{
 
		$worker = new FetchWorker();
		$worker->addServer(); 
		$worker->addFunction("fetchfile", array($worker, "fetchfile")); 
 
 
		while (1)
		{
		  print "Waiting for job...\n";
 
		  $ret= $worker->work();
		  if ($worker->returnCode() != GEARMAN_SUCCESS)
		    break;
		}
 
	}
 
}

Standalone версия

 
<?php 
//worker.php
 
		$worker = new FetchWorker();
		$worker->addServer(); 
		$worker->addFunction("fetchfile", array($worker, "fetchfile")); 
 
 
		while (1)
		{
		  print "Waiting for job...\n";
 
		  $ret= $worker->work();
		  if ($worker->returnCode() != GEARMAN_SUCCESS)
		    break;
		}

Запуск standalone версии

 
php worker.php

Запус версии с использованием Yii

php dispatch.php Fetch

Ну и наконец чтобы статья была исчерпывающей я приведу скрипт (точку входа) dispatch.php для Yii. В принципе он очень похож на обычный index.php для Yii application

require_once(dirname(__FILE__).'/framework/yii.php');
$config=dirname(__FILE__).'/protected/config/console.php';
 
// remove the following line when in production mode
defined('YII_DEBUG') or define('YII_DEBUG',true);
 
Yii::createConsoleApplication($config)->run();

Нужно только создать отдельный конфиг для консоли. Как минимум скопируйте содержимое main config
Все, будем считать что наши демоны :) запущены и ждут камманды от gearman jоb сервера чтобы начать работу.

Даем комманду job серверу

<?php 
//index.php
 
# Create our client object.
$client= new GearmanClient();
 
# Add default server (localhost).
$client->addServer();
 
if(isset($_GET['handle'])){
	$handle = $_GET['handle'];
	$status = $client->jobStatus($handle);
	print_r($status);
	die;
}
 
$params = array(
		'referrer'=>'',
		'tmp_file'=>dirname(__FILE__).'/test.file',
		'url'=>'http://dl1.overload.in.ua/files/2050/SOD3z9jF3LiXdxO/file.zip',
		);
 
$params = json_encode($params);
$handle = $client->doBackground("fetchfile", $params);
header('Location: index.php?handle='.urlencode($handle));

В примере идет подключение к job серверу, дается фоновая команда с параметрами. Далее работа скрипта заканчивается и идет редирект с параметром именем задачи job-handle
Далее по этому параметру скрипт может следить за прогрессом выполнения именно этого таска.

Конец.