diff --git a/resources/sql/autopatches/09082015.workeryield.1.sql b/resources/sql/autopatches/09082015.workeryield.1.sql new file mode 100644 --- /dev/null +++ b/resources/sql/autopatches/09082015.workeryield.1.sql @@ -0,0 +1,2 @@ +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD continuationData LONGTEXT COLLATE {$COLLATE_TEXT} NOT NULL; diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -8,6 +8,7 @@ private $data; private static $runAllTasksInProcess = false; private $queuedTasks = array(); + private $continuationData; // NOTE: Lower priority numbers execute first. The priority numbers have to // have the same ordering that IDs do (lowest first) so MySQL can use a @@ -91,6 +92,15 @@ $this->doWork(); } + final public function setContinuationData($continuation_data) { + $this->continuationData = $continuation_data; + return $this; + } + + final protected function getContinuationData() { + return $this->continuationData; + } + final public static function scheduleTask( $task_class, $data, @@ -110,10 +120,12 @@ if (self::$runAllTasksInProcess) { // Do the work in-process. + $continuation_data = array(); $worker = newv($task_class, array($data)); while (true) { try { + $worker->setContinuationData($continuation_data); $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { list($queued_class, $queued_data, $queued_priority) = $queued_task; @@ -128,6 +140,7 @@ $task_class, $ex->getDuration())); sleep($ex->getDuration()); + $continuation_data = $ex->getContinuationData(); } } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php @@ -30,6 +30,17 @@ case 'fail-permanent': throw new PhabricatorWorkerPermanentFailureException( pht('Permanent failure!')); + case 'yield-with-continuation': + $test = idx($this->getContinuationData(), 'test', 0); + switch ($test) { + case 0: + throw new PhabricatorWorkerYieldException(1, array('test' => 1)); + case 1: + throw new PhabricatorWorkerYieldException(1, array('test' => 2)); + case 2: + throw new PhabricatorWorkerYieldException(1, array('test' => 3)); + } + break; default: return; } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -143,6 +143,19 @@ $this->assertTrue(($task->getLeaseExpires() - time()) > 1000); } + public function testYieldWithContinuation() { + $task = $this->scheduleAndExecuteTask( + array( + 'doWork' => 'yield-with-continuation', + )); + + $this->assertEqual(1, idx($task->getContinuationData(), 'test')); + $task->executeTask(); + $this->assertEqual(2, idx($task->getContinuationData(), 'test')); + $task->executeTask(); + $this->assertEqual(3, idx($task->getContinuationData(), 'test')); + } + public function testLeasedIsOldestFirst() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); diff --git a/src/infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php b/src/infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php --- a/src/infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php +++ b/src/infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php @@ -9,9 +9,11 @@ final class PhabricatorWorkerYieldException extends Exception { private $duration; + private $continuationData; - public function __construct($duration) { + public function __construct($duration, $data = array()) { $this->duration = $duration; + $this->continuationData = $data; parent::__construct(); } @@ -19,4 +21,8 @@ return $this->duration; } + public function getContinuationData() { + return $this->continuationData; + } + } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -3,6 +3,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask { protected $failureTime; + protected $continuationData = array(); private $serverTime; private $localTime; @@ -36,11 +37,16 @@ ) + $parent[self::CONFIG_KEY_SCHEMA], ); + $config[self::CONFIG_SERIALIZATION] = array( + 'continuationData' => self::SERIALIZATION_JSON, + ); + $config[self::CONFIG_COLUMN_SCHEMA] = array( // T6203/NULLABILITY // This isn't nullable in the archive table, so at a minimum these // should be the same. 'dataID' => 'uint32?', + 'continuationData' => 'text', ) + $parent[self::CONFIG_COLUMN_SCHEMA]; return $config + $parent; @@ -141,6 +147,7 @@ $worker = null; try { $worker = $this->getWorkerInstance(); + $worker->setContinuationData($this->getContinuationData()); $maximum_failures = $worker->getMaximumRetryCount(); if ($maximum_failures !== null) { @@ -174,6 +181,7 @@ $result->setExecutionException($ex); } catch (PhabricatorWorkerYieldException $ex) { $this->setExecutionException($ex); + $this->setContinuationData($ex->getContinuationData()); $retry = $ex->getDuration(); $retry = max($retry, 5);