diff --git a/resources/sql/autopatches/20141123.taskpriority.1.sql b/resources/sql/autopatches/20141123.taskpriority.1.sql new file mode 100644 index 0000000000..b7790e711e --- /dev/null +++ b/resources/sql/autopatches/20141123.taskpriority.1.sql @@ -0,0 +1,2 @@ +UPDATE {$NAMESPACE}_worker.worker_activetask + SET priority = 5000 - priority; diff --git a/resources/sql/autopatches/20141123.taskpriority.2.sql b/resources/sql/autopatches/20141123.taskpriority.2.sql new file mode 100644 index 0000000000..91b8979112 --- /dev/null +++ b/resources/sql/autopatches/20141123.taskpriority.2.sql @@ -0,0 +1,2 @@ +UPDATE {$NAMESPACE}_worker.worker_archivetask + SET priority = 5000 - priority; diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index 699afdfc50..6d3fcb52e9 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,251 +1,255 @@ data = $data; } final protected function getTaskData() { return $this->data; } final public function executeTask() { $this->doWork(); } final public static function scheduleTask( $task_class, $data, $priority = null) { if ($priority === null) { $priority = self::PRIORITY_DEFAULT; } $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) ->setData($data) ->setPriority($priority); if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); while (true) { try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { list($queued_class, $queued_data, $queued_priority) = $queued_task; self::scheduleTask($queued_class, $queued_data, $queued_priority); } break; } catch (PhabricatorWorkerYieldException $ex) { phlog( pht( 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); sleep($ex->getDuration()); } } // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } /** * Wait for tasks to complete. If tasks are not leased by other workers, they * will be executed in this process while waiting. * * @param list List of queued task IDs to wait for. * @return void */ final public static function waitForTasks(array $task_ids) { if (!$task_ids) { return; } $task_table = new PhabricatorWorkerActiveTask(); $waiting = array_fuse($task_ids); while ($waiting) { $conn_w = $task_table->establishConnection('w'); // Check if any of the tasks we're waiting on are still queued. If they // are not, we're done waiting. $row = queryfx_one( $conn_w, 'SELECT COUNT(*) N FROM %T WHERE id IN (%Ld)', $task_table->getTableName(), $waiting); if (!$row['N']) { // Nothing is queued anymore. Stop waiting. break; } $tasks = id(new PhabricatorWorkerLeaseQuery()) ->withIDs($waiting) ->setLimit(1) ->execute(); if (!$tasks) { // We were not successful in leasing anything. Sleep for a bit and // see if we have better luck later. sleep(1); continue; } $task = head($tasks)->executeTask(); $ex = $task->getExecutionException(); if ($ex) { throw $ex; } } $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 'id IN (%Ld)', $task_ids); foreach ($tasks as $task) { if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { throw new Exception(pht('Task %d failed!', $task->getID())); } } } public function renderForDisplay(PhabricatorUser $viewer) { $data = PhutilReadableSerializer::printableValue($this->data); return phutil_tag('pre', array(), $data); } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } /** * Queue a task to be executed after this one succeeds. * * The followup task will be queued only if this task completes cleanly. * * @param string Task class to queue. * @param array Data for the followup task. * @param int|null Priority for the followup task. * @return this */ final protected function queueTask($class, array $data, $priority = null) { $this->queuedTasks[] = array($class, $data, $priority); return $this; } /** * Get tasks queued as followups by @{method:queueTask}. * * @return list> Queued task specifications. */ final public function getQueuedTasks() { return $this->queuedTasks; } } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php index 659a83821e..c5c9b0959f 100644 --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -1,212 +1,212 @@ true, ); } public function testLeaseTask() { $task = $this->scheduleTask(); $this->expectNextLease($task, 'Leasing should work.'); } public function testMultipleLease() { $task = $this->scheduleTask(); $this->expectNextLease($task); $this->expectNextLease( null, 'We should not be able to lease a task multiple times.'); } public function testOldestFirst() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $this->expectNextLease( $task1, 'Older tasks should lease first, all else being equal.'); $this->expectNextLease($task2); } public function testNewBeforeLeased() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $this->expectNextLease( $task2, 'Tasks not previously leased should lease before previously '. 'leased tasks.'); $this->expectNextLease($task1); } public function testExecuteTask() { $task = $this->scheduleAndExecuteTask(); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $task->getResult()); } public function testPermanentTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-permanent', )); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testTemporaryTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', )); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); } public function testTooManyTaskFailures() { // Expect temporary failures, then a permanent failure. $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', 'getMaximumRetryCount' => 3, 'getWaitBeforeRetry' => -60, )); // Temporary... $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(1, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(2, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(3, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(4, $task->getFailureCount()); // Permanent. $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertTrue($task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testWaitBeforeRetry() { $task = $this->scheduleTask( array( 'doWork' => 'fail-temporary', 'getWaitBeforeRetry' => 1000000, )); $this->expectNextLease($task)->executeTask(); $this->expectNextLease(null); } public function testRequiredLeaseTime() { $task = $this->scheduleAndExecuteTask( array( 'getRequiredLeaseTime' => 1000000, )); $this->assertTrue(($task->getLeaseExpires() - time()) > 1000); } public function testLeasedIsOldestFirst() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $task2->setLeaseOwner('test'); $task2->setLeaseExpires(time() - 200000); $task2->forceSaveWithoutLease(); $this->expectNextLease( $task2, 'Tasks which expired earlier should lease first, all else being equal.'); $this->expectNextLease($task1); } - public function testLeasedIsHighestPriority() { - $task1 = $this->scheduleTask(array(), 1); - $task2 = $this->scheduleTask(array(), 1); - $task3 = $this->scheduleTask(array(), 2); + public function testLeasedIsLowestPriority() { + $task1 = $this->scheduleTask(array(), 2); + $task2 = $this->scheduleTask(array(), 2); + $task3 = $this->scheduleTask(array(), 1); $this->expectNextLease( $task3, - 'Tasks with a higher priority should be scheduled first.'); + 'Tasks with a lower priority should be scheduled first.'); $this->expectNextLease( $task1, 'Tasks with the same priority should be FIFO.'); $this->expectNextLease($task2); } private function expectNextLease($task, $message = null) { $leased = id(new PhabricatorWorkerLeaseQuery()) ->setLimit(1) ->execute(); if ($task === null) { $this->assertEqual(0, count($leased), $message); return null; } else { $this->assertEqual(1, count($leased), $message); $this->assertEqual( (int)head($leased)->getID(), (int)$task->getID(), $message); return head($leased); } } private function scheduleAndExecuteTask( array $data = array(), $priority = null) { $task = $this->scheduleTask($data, $priority); $task = $this->expectNextLease($task); $task = $task->executeTask(); return $task; } private function scheduleTask(array $data = array(), $priority = null) { return PhabricatorWorker::scheduleTask( 'PhabricatorTestWorker', $data, $priority); } } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index fff5e85fa2..ea8e74e1be 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -1,253 +1,253 @@ skipLease = $skip; return $this; } public function withIDs(array $ids) { $this->ids = $ids; return $this; } public function setLimit($limit) { $this->limit = $limit; return $this; } public function execute() { if (!$this->limit) { throw new Exception('You must setLimit() when leasing tasks.'); } $task_table = new PhabricatorWorkerActiveTask(); $taskdata_table = new PhabricatorWorkerTaskData(); $lease_ownership_name = $this->getLeaseOwnershipName(); $conn_w = $task_table->establishConnection('w'); // Try to satisfy the request from new, unleased tasks first. If we don't // find enough tasks, try tasks with expired leases (i.e., tasks which have // previously failed). $phases = array( self::PHASE_UNLEASED, self::PHASE_EXPIRED, ); $limit = $this->limit; $leased = 0; $task_ids = array(); foreach ($phases as $phase) { // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary // read slaves complain that the query isn't repeatable. To avoid both // problems, do a SELECT and then an UPDATE. $rows = queryfx_all( $conn_w, 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), $this->buildWhereClause($conn_w, $phase), $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit - $leased)); // NOTE: Sometimes, we'll race with another worker and they'll grab // this task before we do. We could reduce how often this happens by // selecting more tasks than we need, then shuffling them and trying // to lock only the number we're actually after. However, the amount // of time workers spend here should be very small relative to their // total runtime, so keep it simple for the moment. if ($rows) { if ($this->skipLease) { $leased += count($rows); $task_ids += array_fuse(ipull($rows, 'id')); } else { queryfx( $conn_w, 'UPDATE %T task SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d %Q', $task_table->getTableName(), $lease_ownership_name, self::getDefaultLeaseDuration(), $this->buildUpdateWhereClause($conn_w, $phase, $rows)); $leased += $conn_w->getAffectedRows(); } if ($leased == $limit) { break; } } } if (!$leased) { return array(); } if ($this->skipLease) { $selection_condition = qsprintf( $conn_w, 'task.id IN (%Ld)', $task_ids); } else { $selection_condition = qsprintf( $conn_w, 'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()', $lease_ownership_name); } $data = queryfx_all( $conn_w, 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime FROM %T task LEFT JOIN %T taskdata ON taskdata.id = task.dataID WHERE %Q %Q %Q', $task_table->getTableName(), $taskdata_table->getTableName(), $selection_condition, $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit)); $tasks = $task_table->loadAllFromArray($data); $tasks = mpull($tasks, null, 'getID'); foreach ($data as $row) { $tasks[$row['id']]->setServerTime($row['_serverTime']); if ($row['_taskData']) { $task_data = json_decode($row['_taskData'], true); } else { $task_data = null; } $tasks[$row['id']]->setData($task_data); } return $tasks; } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { $where = array(); switch ($phase) { case self::PHASE_UNLEASED: $where[] = 'leaseOwner IS NULL'; break; case self::PHASE_EXPIRED: $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; break; default: throw new Exception("Unknown phase '{$phase}'!"); } if ($this->ids) { $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); } private function buildUpdateWhereClause( AphrontDatabaseConnection $conn_w, $phase, array $rows) { $where = array(); // NOTE: This is basically working around the MySQL behavior that // `IN (NULL)` doesn't match NULL. switch ($phase) { case self::PHASE_UNLEASED: $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); foreach ($rows as $row) { $in[] = qsprintf( $conn_w, '(id = %d AND leaseOwner = %s)', $row['id'], $row['leaseOwner']); } $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: // When selecting new tasks, we want to consume them in order of - // decreasing priority (and then FIFO). - return qsprintf($conn_w, 'ORDER BY priority DESC, id ASC'); + // increasing priority (and then FIFO). + return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly // FIFO order of their failures, which is not necessarily their original // queue order. // Particularly, this is important for tasks which use soft failures to // indicate that they are waiting on other tasks to complete: we need to // push them to the end of the queue after they fail, at least on // average, so we don't deadlock retrying the same blocked task over // and over again. return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); default: throw new Exception(pht('Unknown phase "%s"!', $phase)); } } private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { return qsprintf($conn_w, 'LIMIT %d', $limit); } private function getLeaseOwnershipName() { static $sequence = 0; $parts = array( getmypid(), time(), php_uname('n'), ++$sequence, ); return implode(':', $parts); } }