diff --git a/src/applications/feed/worker/FeedPublisherWorker.php b/src/applications/feed/worker/FeedPublisherWorker.php index b2d8860679..9d27722b59 100644 --- a/src/applications/feed/worker/FeedPublisherWorker.php +++ b/src/applications/feed/worker/FeedPublisherWorker.php @@ -1,39 +1,39 @@ loadFeedStory(); $uris = PhabricatorEnv::getEnvConfig('feed.http-hooks'); foreach ($uris as $uri) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'FeedPublisherHTTPWorker', array( 'key' => $story->getChronologicalKey(), 'uri' => $uri, )); } $argv = array( array(), ); // Find and schedule all the enabled Doorkeeper publishers. $doorkeeper_workers = id(new PhutilSymbolLoader()) ->setAncestorClass('DoorkeeperFeedWorker') ->loadObjects($argv); foreach ($doorkeeper_workers as $worker) { if (!$worker->isEnabled()) { continue; } - PhabricatorWorker::scheduleTask( + $this->queueTask( get_class($worker), array( 'key' => $story->getChronologicalKey(), )); } } } diff --git a/src/applications/harbormaster/worker/HarbormasterTargetWorker.php b/src/applications/harbormaster/worker/HarbormasterTargetWorker.php index ef83fcab0f..7fb13a54d2 100644 --- a/src/applications/harbormaster/worker/HarbormasterTargetWorker.php +++ b/src/applications/harbormaster/worker/HarbormasterTargetWorker.php @@ -1,65 +1,65 @@ getTaskData(); $id = idx($data, 'targetID'); $target = id(new HarbormasterBuildTargetQuery()) ->withIDs(array($id)) ->setViewer($this->getViewer()) ->executeOne(); if (!$target) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Bad build target ID "%d".', $id)); } return $target; } public function doWork() { $target = $this->loadBuildTarget(); $build = $target->getBuild(); $viewer = $this->getViewer(); try { $implementation = $target->getImplementation(); $implementation->execute($build, $target); $target->setTargetStatus(HarbormasterBuildTarget::STATUS_PASSED); $target->save(); } catch (Exception $ex) { phlog($ex); try { $log = $build->createLog($target, 'core', 'exception'); $start = $log->start(); $log->append((string)$ex); $log->finalize($start); } catch (Exception $log_ex) { phlog($log_ex); } $target->setTargetStatus(HarbormasterBuildTarget::STATUS_FAILED); $target->save(); } id(new HarbormasterBuildEngine()) ->setViewer($viewer) ->setBuild($build) ->continueBuild(); } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php index cdf4087196..da084da429 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php @@ -1,474 +1,474 @@ applyHeraldRules($repository, $commit); $commit->writeImportStatusFlag( PhabricatorRepositoryCommit::IMPORTED_HERALD); return $result; } private function applyHeraldRules( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { $commit->attachRepository($repository); // Don't take any actions on an importing repository. Principally, this // avoids generating thousands of audits or emails when you import an // established repository on an existing install. if ($repository->isImporting()) { return; } if ($repository->getDetail('herald-disabled')) { return; } $data = id(new PhabricatorRepositoryCommitData())->loadOneWhere( 'commitID = %d', $commit->getID()); if (!$data) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Unable to load commit data. The data for this task is invalid '. 'or no longer exists.')); } $adapter = id(new HeraldCommitAdapter()) ->setCommit($commit); $rules = id(new HeraldRuleQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withContentTypes(array($adapter->getAdapterContentType())) ->withDisabled(false) ->needConditionsAndActions(true) ->needAppliedToPHIDs(array($adapter->getPHID())) ->needValidateAuthors(true) ->execute(); $engine = new HeraldEngine(); $effects = $engine->applyRules($rules, $adapter); $engine->applyEffects($effects, $adapter, $rules); $xscript = $engine->getTranscript(); $audit_phids = $adapter->getAuditMap(); $cc_phids = $adapter->getAddCCMap(); if ($audit_phids || $cc_phids) { $this->createAudits($commit, $audit_phids, $cc_phids, $rules); } HarbormasterBuildable::applyBuildPlans( $commit->getPHID(), $repository->getPHID(), $adapter->getBuildPlans()); $explicit_auditors = $this->createAuditsFromCommitMessage($commit, $data); $this->publishFeedStory($repository, $commit, $data); $herald_targets = $adapter->getEmailPHIDs(); $email_phids = array_unique( array_merge( $explicit_auditors, array_keys($cc_phids), $herald_targets)); if (!$email_phids) { return; } $revision = $adapter->loadDifferentialRevision(); if ($revision) { $name = $revision->getTitle(); } else { $name = $data->getSummary(); } $author_phid = $data->getCommitDetail('authorPHID'); $reviewer_phid = $data->getCommitDetail('reviewerPHID'); $phids = array_filter( array( $author_phid, $reviewer_phid, $commit->getPHID(), )); $handles = id(new PhabricatorHandleQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withPHIDs($phids) ->execute(); $commit_handle = $handles[$commit->getPHID()]; $commit_name = $commit_handle->getName(); if ($author_phid) { $author_name = $handles[$author_phid]->getName(); } else { $author_name = $data->getAuthorName(); } if ($reviewer_phid) { $reviewer_name = $handles[$reviewer_phid]->getName(); } else { $reviewer_name = null; } $who = implode(', ', array_filter(array($author_name, $reviewer_name))); $description = $data->getCommitMessage(); $commit_uri = PhabricatorEnv::getProductionURI($commit_handle->getURI()); $differential = $revision ? PhabricatorEnv::getProductionURI('/D'.$revision->getID()) : 'No revision.'; $files = $adapter->loadAffectedPaths(); sort($files); $files = implode("\n", $files); $xscript_id = $xscript->getID(); $why_uri = '/herald/transcript/'.$xscript_id.'/'; $reply_handler = PhabricatorAuditCommentEditor::newReplyHandlerForCommit( $commit); $template = new PhabricatorMetaMTAMail(); $inline_patch_text = $this->buildPatch($template, $repository, $commit); $body = new PhabricatorMetaMTAMailBody(); $body->addRawSection($description); $body->addTextSection(pht('DETAILS'), $commit_uri); // TODO: This should be integrated properly once we move to // ApplicationTransactions. $field_list = PhabricatorCustomField::getObjectFields( $commit, PhabricatorCustomField::ROLE_APPLICATIONTRANSACTIONS); $field_list ->setViewer(PhabricatorUser::getOmnipotentUser()) ->readFieldsFromStorage($commit); foreach ($field_list->getFields() as $field) { try { $field->buildApplicationTransactionMailBody( new DifferentialTransaction(), // Bogus object to satisfy typehint. $body); } catch (Exception $ex) { // Log the exception and continue. phlog($ex); } } $body->addTextSection(pht('DIFFERENTIAL REVISION'), $differential); $body->addTextSection(pht('AFFECTED FILES'), $files); $body->addReplySection($reply_handler->getReplyHandlerInstructions()); $body->addHeraldSection($why_uri); $body->addRawSection($inline_patch_text); $body = $body->render(); $prefix = PhabricatorEnv::getEnvConfig('metamta.diffusion.subject-prefix'); $threading = PhabricatorAuditCommentEditor::getMailThreading( $repository, $commit); list($thread_id, $thread_topic) = $threading; $template->setRelatedPHID($commit->getPHID()); $template->setSubject("{$commit_name}: {$name}"); $template->setSubjectPrefix($prefix); $template->setVarySubjectPrefix("[Commit]"); $template->setBody($body); $template->setThreadID($thread_id, $is_new = true); $template->addHeader('Thread-Topic', $thread_topic); $template->setIsBulk(true); $template->addHeader('X-Herald-Rules', $xscript->getXHeraldRulesHeader()); if ($author_phid) { $template->setFrom($author_phid); } // TODO: We should verify that each recipient can actually see the // commit before sending them email (T603). $mails = $reply_handler->multiplexMail( $template, id(new PhabricatorHandleQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withPHIDs($email_phids) ->execute(), array()); foreach ($mails as $mail) { $mail->saveAndSend(); } } private function createAudits( PhabricatorRepositoryCommit $commit, array $map, array $ccmap, array $rules) { assert_instances_of($rules, 'HeraldRule'); $requests = id(new PhabricatorRepositoryAuditRequest())->loadAllWhere( 'commitPHID = %s', $commit->getPHID()); $requests = mpull($requests, null, 'getAuditorPHID'); $rules = mpull($rules, null, 'getID'); $maps = array( PhabricatorAuditStatusConstants::AUDIT_REQUIRED => $map, PhabricatorAuditStatusConstants::CC => $ccmap, ); foreach ($maps as $status => $map) { foreach ($map as $phid => $rule_ids) { $request = idx($requests, $phid); if ($request) { continue; } $reasons = array(); foreach ($rule_ids as $id) { $rule_name = '?'; if ($rules[$id]) { $rule_name = $rules[$id]->getName(); } if ($status == PhabricatorAuditStatusConstants::AUDIT_REQUIRED) { $reasons[] = pht( '%s Triggered Audit', "H{$id} {$rule_name}"); } else { $reasons[] = pht( '%s Triggered CC', "H{$id} {$rule_name}"); } } $request = new PhabricatorRepositoryAuditRequest(); $request->setCommitPHID($commit->getPHID()); $request->setAuditorPHID($phid); $request->setAuditStatus($status); $request->setAuditReasons($reasons); $request->save(); } } $commit->updateAuditStatus($requests); $commit->save(); } /** * Find audit requests in the "Auditors" field if it is present and trigger * explicit audit requests. */ private function createAuditsFromCommitMessage( PhabricatorRepositoryCommit $commit, PhabricatorRepositoryCommitData $data) { $message = $data->getCommitMessage(); $matches = null; if (!preg_match('/^Auditors:\s*(.*)$/im', $message, $matches)) { return array(); } $phids = id(new PhabricatorObjectListQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->setAllowPartialResults(true) ->setAllowedTypes( array( PhabricatorPeoplePHIDTypeUser::TYPECONST, PhabricatorProjectPHIDTypeProject::TYPECONST, )) ->setObjectList($matches[1]) ->execute(); if (!$phids) { return array(); } $requests = id(new PhabricatorRepositoryAuditRequest())->loadAllWhere( 'commitPHID = %s', $commit->getPHID()); $requests = mpull($requests, null, 'getAuditorPHID'); foreach ($phids as $phid) { if (isset($requests[$phid])) { continue; } $request = new PhabricatorRepositoryAuditRequest(); $request->setCommitPHID($commit->getPHID()); $request->setAuditorPHID($phid); $request->setAuditStatus( PhabricatorAuditStatusConstants::AUDIT_REQUESTED); $request->setAuditReasons( array( 'Requested by Author', )); $request->save(); $requests[$phid] = $request; } $commit->updateAuditStatus($requests); $commit->save(); return $phids; } private function publishFeedStory( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, PhabricatorRepositoryCommitData $data) { if (time() > $commit->getEpoch() + (24 * 60 * 60)) { // Don't publish stories that are more than 24 hours old, to avoid // ridiculous levels of feed spam if a repository is imported without // disabling feed publishing. return; } $author_phid = $commit->getAuthorPHID(); $committer_phid = $data->getCommitDetail('committerPHID'); $publisher = new PhabricatorFeedStoryPublisher(); $publisher->setStoryType(PhabricatorFeedStoryTypeConstants::STORY_COMMIT); $publisher->setStoryData( array( 'commitPHID' => $commit->getPHID(), 'summary' => $data->getSummary(), 'authorName' => $data->getAuthorName(), 'authorPHID' => $author_phid, 'committerName' => $data->getCommitDetail('committer'), 'committerPHID' => $committer_phid, )); $publisher->setStoryTime($commit->getEpoch()); $publisher->setRelatedPHIDs( array_filter( array( $author_phid, $committer_phid, ))); if ($author_phid) { $publisher->setStoryAuthorPHID($author_phid); } $publisher->publish(); } private function buildPatch( PhabricatorMetaMTAMail $template, PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { $attach_key = 'metamta.diffusion.attach-patches'; $inline_key = 'metamta.diffusion.inline-patches'; $attach_patches = PhabricatorEnv::getEnvConfig($attach_key); $inline_patches = PhabricatorEnv::getEnvConfig($inline_key); if (!$attach_patches && !$inline_patches) { return; } $encoding = $repository->getDetail('encoding', 'UTF-8'); $result = null; $patch_error = null; try { $raw_patch = $this->loadRawPatchText($repository, $commit); if ($attach_patches) { $commit_name = $repository->formatCommitName( $commit->getCommitIdentifier()); $template->addAttachment( new PhabricatorMetaMTAAttachment( $raw_patch, $commit_name.'.patch', 'text/x-patch; charset='.$encoding)); } } catch (Exception $ex) { phlog($ex); $patch_error = 'Unable to generate: '.$ex->getMessage(); } if ($patch_error) { $result = $patch_error; } else if ($inline_patches) { $len = substr_count($raw_patch, "\n"); if ($len <= $inline_patches) { // We send email as utf8, so we need to convert the text to utf8 if // we can. if ($encoding) { $raw_patch = phutil_utf8_convert($raw_patch, 'UTF-8', $encoding); } $result = phutil_utf8ize($raw_patch); } } if ($result) { $result = "PATCH\n\n{$result}\n"; } return $result; } private function loadRawPatchText( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { $drequest = DiffusionRequest::newFromDictionary( array( 'user' => PhabricatorUser::getOmnipotentUser(), 'initFromConduit' => false, 'repository' => $repository, 'commit' => $commit->getCommitIdentifier(), )); $raw_query = DiffusionRawDiffQuery::newFromDiffusionRequest($drequest); $raw_query->setLinesOfContext(3); $time_key = 'metamta.diffusion.time-limit'; $byte_key = 'metamta.diffusion.byte-limit'; $time_limit = PhabricatorEnv::getEnvConfig($time_key); $byte_limit = PhabricatorEnv::getEnvConfig($byte_key); if ($time_limit) { $raw_query->setTimeout($time_limit); } $raw_diff = $raw_query->loadRawDiff(); $size = strlen($raw_diff); if ($byte_limit && $size > $byte_limit) { $pretty_size = phabricator_format_bytes($size); $pretty_limit = phabricator_format_bytes($byte_limit); throw new Exception( "Patch size of {$pretty_size} exceeds configured byte size limit of ". "{$pretty_limit}."); } return $raw_diff; } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php index 50dd90348b..9a94281471 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php @@ -1,143 +1,138 @@ triggerOwnerAudits($repository, $commit); $commit->writeImportStatusFlag( PhabricatorRepositoryCommit::IMPORTED_OWNERS); if ($this->shouldQueueFollowupTasks()) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'PhabricatorRepositoryCommitHeraldWorker', array( 'commitID' => $commit->getID(), )); } } private function triggerOwnerAudits( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { if ($repository->getDetail('herald-disabled')) { return; } $affected_paths = PhabricatorOwnerPathQuery::loadAffectedPaths( $repository, $commit, PhabricatorUser::getOmnipotentUser()); $affected_packages = PhabricatorOwnersPackage::loadAffectedPackages( $repository, $affected_paths); if ($affected_packages) { $requests = id(new PhabricatorRepositoryAuditRequest()) ->loadAllWhere( 'commitPHID = %s', $commit->getPHID()); $requests = mpull($requests, null, 'getAuditorPHID'); foreach ($affected_packages as $package) { $request = idx($requests, $package->getPHID()); if ($request) { // Don't update request if it exists already. continue; } if ($package->getAuditingEnabled()) { $reasons = $this->checkAuditReasons($commit, $package); if ($reasons) { $audit_status = PhabricatorAuditStatusConstants::AUDIT_REQUIRED; } else { $audit_status = PhabricatorAuditStatusConstants::AUDIT_NOT_REQUIRED; } } else { $reasons = array(); $audit_status = PhabricatorAuditStatusConstants::NONE; } $relationship = new PhabricatorRepositoryAuditRequest(); $relationship->setAuditorPHID($package->getPHID()); $relationship->setCommitPHID($commit->getPHID()); $relationship->setAuditReasons($reasons); $relationship->setAuditStatus($audit_status); $relationship->save(); $requests[$package->getPHID()] = $relationship; } $commit->updateAuditStatus($requests); $commit->save(); } } private function checkAuditReasons( PhabricatorRepositoryCommit $commit, PhabricatorOwnersPackage $package) { $data = id(new PhabricatorRepositoryCommitData())->loadOneWhere( 'commitID = %d', $commit->getID()); $reasons = array(); if ($data->getCommitDetail('vsDiff')) { $reasons[] = "Changed After Revision Was Accepted"; } $commit_author_phid = $data->getCommitDetail('authorPHID'); if (!$commit_author_phid) { $reasons[] = "Commit Author Not Recognized"; } $revision_id = $data->getCommitDetail('differential.revisionID'); $revision_author_phid = null; $commit_reviewedby_phid = null; if ($revision_id) { // TODO: (T603) This is probably safe to use an omnipotent user on, // but check things more closely. $revision = id(new DifferentialRevision())->load($revision_id); if ($revision) { $revision_author_phid = $revision->getAuthorPHID(); $commit_reviewedby_phid = $data->getCommitDetail('reviewerPHID'); if ($revision_author_phid !== $commit_author_phid) { $reasons[] = "Author Not Matching with Revision"; } } else { $reasons[] = "Revision Not Found"; } } else { $reasons[] = "No Revision Specified"; } $owners_phids = PhabricatorOwnersOwner::loadAffiliatedUserPHIDs( array($package->getID())); if (!($commit_author_phid && in_array($commit_author_phid, $owners_phids) || $commit_reviewedby_phid && in_array($commit_reviewedby_phid, $owners_phids))) { $reasons[] = "Owners Not Involved"; } return $reasons; } } diff --git a/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php b/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php index 0fd91e83b7..69ed92d750 100644 --- a/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php +++ b/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php @@ -1,157 +1,157 @@ getCommitIdentifier(); $callsign = $repository->getCallsign(); $full_name = 'r'.$callsign.$identifier; $this->log("Parsing %s...\n", $full_name); if ($this->isBadCommit($full_name)) { $this->log("This commit is marked bad!"); return; } $results = $this->parseCommitChanges($repository, $commit); if ($results) { $this->writeCommitChanges($repository, $commit, $results); } $this->finishParse(); } public function parseChangesForUnitTest( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { return $this->parseCommitChanges($repository, $commit); } public static function lookupOrCreatePaths(array $paths) { $repository = new PhabricatorRepository(); $conn_w = $repository->establishConnection('w'); $result_map = self::lookupPaths($paths); $missing_paths = array_fill_keys($paths, true); $missing_paths = array_diff_key($missing_paths, $result_map); $missing_paths = array_keys($missing_paths); if ($missing_paths) { foreach (array_chunk($missing_paths, 128) as $path_chunk) { $sql = array(); foreach ($path_chunk as $path) { $sql[] = qsprintf($conn_w, '(%s, %s)', $path, md5($path)); } queryfx( $conn_w, 'INSERT IGNORE INTO %T (path, pathHash) VALUES %Q', PhabricatorRepository::TABLE_PATH, implode(', ', $sql)); } $result_map += self::lookupPaths($missing_paths); } return $result_map; } private static function lookupPaths(array $paths) { $repository = new PhabricatorRepository(); $conn_w = $repository->establishConnection('w'); $result_map = array(); foreach (array_chunk($paths, 128) as $path_chunk) { $chunk_map = queryfx_all( $conn_w, 'SELECT path, id FROM %T WHERE pathHash IN (%Ls)', PhabricatorRepository::TABLE_PATH, array_map('md5', $path_chunk)); foreach ($chunk_map as $row) { $result_map[$row['path']] = $row['id']; } } return $result_map; } protected function finishParse() { $commit = $this->commit; $commit->writeImportStatusFlag( PhabricatorRepositoryCommit::IMPORTED_CHANGE); id(new PhabricatorSearchIndexer()) ->queueDocumentForIndexing($commit->getPHID()); PhabricatorOwnersPackagePathValidator::updateOwnersPackagePaths($commit); if ($this->shouldQueueFollowupTasks()) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'PhabricatorRepositoryCommitOwnersWorker', array( 'commitID' => $commit->getID(), )); } } private function writeCommitChanges( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, array $changes) { $repository_id = (int)$repository->getID(); $commit_id = (int)$commit->getID(); // NOTE: This SQL is being built manually instead of with qsprintf() // because some SVN changes affect an enormous number of paths (millions) // and this showed up as significantly slow on a profile at some point. $changes_sql = array(); foreach ($changes as $change) { $values = array( $repository_id, (int)$change->getPathID(), $commit_id, nonempty((int)$change->getTargetPathID(), 'null'), nonempty((int)$change->getTargetCommitID(), 'null'), (int)$change->getChangeType(), (int)$change->getFileType(), (int)$change->getIsDirect(), (int)$change->getCommitSequence(), ); $changes_sql[] = '('.implode(', ', $values).')'; } $conn_w = $repository->establishConnection('w'); queryfx( $conn_w, 'DELETE FROM %T WHERE commitID = %d', PhabricatorRepository::TABLE_PATHCHANGE, $commit_id); foreach (PhabricatorLiskDAO::chunkSQL($changes_sql) as $chunk) { queryfx( $conn_w, 'INSERT INTO %T (repositoryID, pathID, commitID, targetPathID, targetCommitID, changeType, fileType, isDirect, commitSequence) VALUES %Q', PhabricatorRepository::TABLE_PATHCHANGE, $chunk); } } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php index 62c7572678..0789080a1b 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php @@ -1,26 +1,26 @@ setRepository($repository) ->withIdentifier($commit->getCommitIdentifier()) ->execute(); $this->updateCommitData($ref); if ($this->shouldQueueFollowupTasks()) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'PhabricatorRepositoryGitCommitChangeParserWorker', array( 'commitID' => $commit->getID(), )); } } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php index dedf14db82..a5bc1784a1 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php @@ -1,26 +1,26 @@ setRepository($repository) ->withIdentifier($commit->getCommitIdentifier()) ->execute(); $this->updateCommitData($ref); if ($this->shouldQueueFollowupTasks()) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'PhabricatorRepositoryMercurialCommitChangeParserWorker', array( 'commitID' => $commit->getID(), )); } } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php index d15613850d..dad025dcc4 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php @@ -1,26 +1,26 @@ setRepository($repository) ->withIdentifier($commit->getCommitIdentifier()) ->execute(); $this->updateCommitData($ref); if ($this->shouldQueueFollowupTasks()) { - PhabricatorWorker::scheduleTask( + $this->queueTask( 'PhabricatorRepositorySvnCommitChangeParserWorker', array( 'commitID' => $commit->getID(), )); } } } diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index 4a25cf9f8c..8d8cfc61ee 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,197 +1,223 @@ data = $data; } final protected function getTaskData() { return $this->data; } final public function executeTask() { $this->doWork(); } final public static function scheduleTask($task_class, $data) { $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) ->setData($data); if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); $worker->doWork(); // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } /** * Wait for tasks to complete. If tasks are not leased by other workers, they * will be executed in this process while waiting. * * @param list List of queued task IDs to wait for. * @return void */ final public static function waitForTasks(array $task_ids) { if (!$task_ids) { return; } $task_table = new PhabricatorWorkerActiveTask(); $waiting = array_fuse($task_ids); while ($waiting) { $conn_w = $task_table->establishConnection('w'); // Check if any of the tasks we're waiting on are still queued. If they // are not, we're done waiting. $row = queryfx_one( $conn_w, 'SELECT COUNT(*) N FROM %T WHERE id IN (%Ld)', $task_table->getTableName(), $waiting); if (!$row['N']) { // Nothing is queued anymore. Stop waiting. break; } $tasks = id(new PhabricatorWorkerLeaseQuery()) ->withIDs($waiting) ->setLimit(1) ->execute(); if (!$tasks) { // We were not successful in leasing anything. Sleep for a bit and // see if we have better luck later. sleep(1); continue; } $task = head($tasks)->executeTask(); $ex = $task->getExecutionException(); if ($ex) { throw $ex; } } $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 'id IN (%Ld)', $task_ids); foreach ($tasks as $task) { if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { throw new Exception( pht("Task %d failed!", $task->getID())); } } } public function renderForDisplay(PhabricatorUser $viewer) { $data = PhutilReadableSerializer::printableValue($this->data); return phutil_tag('pre', array(), $data); } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } protected function log($pattern /* $args */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } + + /** + * Queue a task to be executed after this one suceeds. + * + * The followup task will be queued only if this task completes cleanly. + * + * @param string Task class to queue. + * @param array Data for the followup task. + * @return this + */ + protected function queueTask($class, array $data) { + $this->queuedTasks[] = array($class, $data); + return $this; + } + + + /** + * Get tasks queued as followups by @{method:queueTask}. + * + * @return list> Queued task specifications. + */ + public function getQueuedTasks() { + return $this->queuedTasks; + } + } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index 507a8b6c44..7d21cc5c29 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -1,230 +1,236 @@ ids = $ids; return $this; } public function setLimit($limit) { $this->limit = $limit; return $this; } public function execute() { if (!$this->limit) { throw new Exception("You must setLimit() when leasing tasks."); } $task_table = new PhabricatorWorkerActiveTask(); $taskdata_table = new PhabricatorWorkerTaskData(); $lease_ownership_name = $this->getLeaseOwnershipName(); $conn_w = $task_table->establishConnection('w'); // Try to satisfy the request from new, unleased tasks first. If we don't // find enough tasks, try tasks with expired leases (i.e., tasks which have // previously failed). $phases = array( self::PHASE_UNLEASED, self::PHASE_EXPIRED, ); $limit = $this->limit; $leased = 0; foreach ($phases as $phase) { // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary // read slaves complain that the query isn't repeatable. To avoid both // problems, do a SELECT and then an UPDATE. $rows = queryfx_all( $conn_w, 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), $this->buildWhereClause($conn_w, $phase), $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit - $leased)); // NOTE: Sometimes, we'll race with another worker and they'll grab // this task before we do. We could reduce how often this happens by // selecting more tasks than we need, then shuffling them and trying // to lock only the number we're actually after. However, the amount // of time workers spend here should be very small relative to their // total runtime, so keep it simple for the moment. if ($rows) { queryfx( $conn_w, 'UPDATE %T task SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d %Q', $task_table->getTableName(), $lease_ownership_name, - self::DEFAULT_LEASE_DURATION, + self::getDefaultLeaseDuration(), $this->buildUpdateWhereClause($conn_w, $phase, $rows)); $leased += $conn_w->getAffectedRows(); if ($leased == $limit) { break; } } } if (!$leased) { return array(); } $data = queryfx_all( $conn_w, 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime FROM %T task LEFT JOIN %T taskdata ON taskdata.id = task.dataID WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() %Q %Q', $task_table->getTableName(), $taskdata_table->getTableName(), $lease_ownership_name, $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit)); $tasks = $task_table->loadAllFromArray($data); $tasks = mpull($tasks, null, 'getID'); foreach ($data as $row) { $tasks[$row['id']]->setServerTime($row['_serverTime']); if ($row['_taskData']) { $task_data = json_decode($row['_taskData'], true); } else { $task_data = null; } $tasks[$row['id']]->setData($task_data); } return $tasks; } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { $where = array(); switch ($phase) { case self::PHASE_UNLEASED: $where[] = 'leaseOwner IS NULL'; break; case self::PHASE_EXPIRED: $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; break; default: throw new Exception("Unknown phase '{$phase}'!"); } if ($this->ids) { $where[] = qsprintf( $conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); } private function buildUpdateWhereClause( AphrontDatabaseConnection $conn_w, $phase, array $rows) { $where = array(); // NOTE: This is basically working around the MySQL behavior that // `IN (NULL)` doesn't match NULL. switch ($phase) { case self::PHASE_UNLEASED: $where[] = qsprintf( $conn_w, 'leaseOwner IS NULL'); $where[] = qsprintf( $conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); foreach ($rows as $row) { $in[] = qsprintf( $conn_w, '(id = %d AND leaseOwner = %s)', $row['id'], $row['leaseOwner']); } $where[] = qsprintf( $conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: // When selecting new tasks, we want to consume them in roughly // FIFO order, so we order by the task ID. return qsprintf($conn_w, 'ORDER BY id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly // FIFO order of their failures, which is not necessarily their original // queue order. // Particularly, this is important for tasks which use soft failures to // indicate that they are waiting on other tasks to complete: we need to // push them to the end of the queue after they fail, at least on // average, so we don't deadlock retrying the same blocked task over // and over again. return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); default: throw new Exception(pht('Unknown phase "%s"!', $phase)); } } private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { return qsprintf($conn_w, 'LIMIT %d', $limit); } private function getLeaseOwnershipName() { static $sequence = 0; $parts = array( getmypid(), time(), php_uname('n'), ++$sequence, ); return implode(':', $parts); } } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php index 124ff71996..36181852f4 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -1,154 +1,165 @@ self::IDS_COUNTER, self::CONFIG_TIMESTAMPS => false, ) + parent::getConfiguration(); } public function setServerTime($server_time) { $this->serverTime = $server_time; $this->localTime = time(); return $this; } public function setLeaseDuration($lease_duration) { $this->checkLease(); $server_lease_expires = $this->serverTime + $lease_duration; $this->setLeaseExpires($server_lease_expires); // NOTE: This is primarily to allow unit tests to set negative lease // durations so they don't have to wait around for leases to expire. We // check that the lease is valid above. return $this->forceSaveWithoutLease(); } public function save() { $this->checkLease(); return $this->forceSaveWithoutLease(); } public function forceSaveWithoutLease() { $is_new = !$this->getID(); if ($is_new) { $this->failureCount = 0; } if ($is_new && ($this->getData() !== null)) { $data = new PhabricatorWorkerTaskData(); $data->setData($this->getData()); $data->save(); $this->setDataID($data->getID()); } return parent::save(); } protected function checkLease() { if ($this->leaseOwner) { $current_server_time = $this->serverTime + (time() - $this->localTime); if ($current_server_time >= $this->leaseExpires) { $id = $this->getID(); $class = $this->getTaskClass(); throw new Exception( "Trying to update Task {$id} ({$class}) after lease expiration!"); } } } public function delete() { throw new Exception( "Active tasks can not be deleted directly. ". "Use archiveTask() to move tasks to the archive."); } public function archiveTask($result, $duration) { if ($this->getID() === null) { throw new Exception( "Attempting to archive a task which hasn't been save()d!"); } $this->checkLease(); $archive = id(new PhabricatorWorkerArchiveTask()) ->setID($this->getID()) ->setTaskClass($this->getTaskClass()) ->setLeaseOwner($this->getLeaseOwner()) ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) ->setResult($result) ->setDuration($duration); // NOTE: This deletes the active task (this object)! $archive->save(); return $archive; } public function executeTask() { // We do this outside of the try .. catch because we don't have permission // to release the lease otherwise. $this->checkLease(); + $did_succeed = false; try { $worker = $this->getWorkerInstance(); $maximum_failures = $worker->getMaximumRetryCount(); if ($maximum_failures !== null) { if ($this->getFailureCount() > $maximum_failures) { $id = $this->getID(); throw new PhabricatorWorkerPermanentFailureException( "Task {$id} has exceeded the maximum number of failures ". "({$maximum_failures})."); } } $lease = $worker->getRequiredLeaseTime(); if ($lease !== null) { $this->setLeaseDuration($lease); } $t_start = microtime(true); $worker->executeTask(); $t_end = microtime(true); $duration = (int)(1000000 * ($t_end - $t_start)); $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $duration); + $did_succeed = true; } catch (PhabricatorWorkerPermanentFailureException $ex) { $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_FAILURE, 0); $result->setExecutionException($ex); } catch (Exception $ex) { $this->setExecutionException($ex); $this->setFailureCount($this->getFailureCount() + 1); $this->setFailureTime(time()); $retry = $worker->getWaitBeforeRetry($this); $retry = coalesce( $retry, - PhabricatorWorkerLeaseQuery::DEFAULT_LEASE_DURATION); + PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry()); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } + // NOTE: If this throws, we don't want it to cause the task to fail again, + // so execute it out here and just let the exception escape. + if ($did_succeed) { + foreach ($worker->getQueuedTasks() as $task) { + list($class, $data) = $task; + PhabricatorWorker::scheduleTask($class, $data); + } + } + return $result; } }