diff --git a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php index ae6f941c14..e8c58ed52f 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php @@ -1,719 +1,734 @@ repairMode = $repair_mode; return $this; } public function getRepairMode() { return $this->repairMode; } /** * @task discovery */ public function discoverCommits() { $repository = $this->getRepository(); $lock = $this->newRepositoryLock($repository, 'repo.look', false); try { $lock->lock(); } catch (PhutilLockException $ex) { throw new DiffusionDaemonLockException( pht( 'Another process is currently discovering repository "%s", '. 'skipping discovery.', $repository->getDisplayName())); } try { $result = $this->discoverCommitsWithLock(); } catch (Exception $ex) { $lock->unlock(); throw $ex; } $lock->unlock(); return $result; } private function discoverCommitsWithLock() { $repository = $this->getRepository(); $viewer = $this->getViewer(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: $refs = $this->discoverSubversionCommits(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $refs = $this->discoverMercurialCommits(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $refs = $this->discoverGitCommits(); break; default: throw new Exception(pht("Unknown VCS '%s'!", $vcs)); } if ($this->isInitialImport($refs)) { $this->log( pht( 'Discovered more than %s commit(s) in an empty repository, '. 'marking repository as importing.', new PhutilNumber(PhabricatorRepository::IMPORT_THRESHOLD))); $repository->markImporting(); } // Clear the working set cache. $this->workingSet = array(); // Record discovered commits and mark them in the cache. foreach ($refs as $ref) { $this->recordCommit( $repository, $ref->getIdentifier(), $ref->getEpoch(), $ref->getCanCloseImmediately(), $ref->getParents()); $this->commitCache[$ref->getIdentifier()] = true; } $version = $this->getObservedVersion($repository); if ($version !== null) { id(new DiffusionRepositoryClusterEngine()) ->setViewer($viewer) ->setRepository($repository) ->synchronizeWorkingCopyAfterDiscovery($version); } return $refs; } /* -( Discovering Git Repositories )--------------------------------------- */ /** * @task git */ private function discoverGitCommits() { $repository = $this->getRepository(); if (!$repository->isHosted()) { $this->verifyGitOrigin($repository); } $heads = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) ->execute(); if (!$heads) { // This repository has no heads at all, so we don't need to do // anything. Generally, this means the repository is empty. return array(); } $heads = $this->sortRefs($heads); $head_commits = mpull($heads, 'getCommitIdentifier'); $this->log( pht( 'Discovering commits in repository "%s".', $repository->getDisplayName())); $this->fillCommitCache($head_commits); $refs = array(); foreach ($heads as $ref) { $name = $ref->getShortName(); $commit = $ref->getCommitIdentifier(); $this->log(pht('Examining ref "%s", at "%s".', $name, $commit)); if (!$repository->shouldTrackRef($ref)) { $this->log(pht('Skipping, ref is untracked.')); continue; } if ($this->isKnownCommit($commit)) { $this->log(pht('Skipping, HEAD is known.')); continue; } $this->log(pht('Looking for new commits.')); $head_refs = $this->discoverStreamAncestry( new PhabricatorGitGraphStream($repository, $commit), $commit, $repository->shouldAutocloseRef($ref)); $this->didDiscoverRefs($head_refs); $refs[] = $head_refs; } return array_mergev($refs); } /* -( Discovering Subversion Repositories )-------------------------------- */ /** * @task svn */ private function discoverSubversionCommits() { $repository = $this->getRepository(); if (!$repository->isHosted()) { $this->verifySubversionRoot($repository); } $upper_bound = null; $limit = 1; $refs = array(); do { // Find all the unknown commits on this path. Note that we permit // importing an SVN subdirectory rather than the entire repository, so // commits may be nonsequential. if ($upper_bound === null) { $at_rev = 'HEAD'; } else { $at_rev = ($upper_bound - 1); } try { list($xml, $stderr) = $repository->execxRemoteCommand( 'log --xml --quiet --limit %d %s', $limit, $repository->getSubversionBaseURI($at_rev)); } catch (CommandException $ex) { $stderr = $ex->getStdErr(); if (preg_match('/(path|File) not found/', $stderr)) { // We've gone all the way back through history and this path was not // affected by earlier commits. break; } throw $ex; } $xml = phutil_utf8ize($xml); $log = new SimpleXMLElement($xml); foreach ($log->logentry as $entry) { $identifier = (int)$entry['revision']; $epoch = (int)strtotime((string)$entry->date[0]); $refs[$identifier] = id(new PhabricatorRepositoryCommitRef()) ->setIdentifier($identifier) ->setEpoch($epoch) ->setCanCloseImmediately(true); if ($upper_bound === null) { $upper_bound = $identifier; } else { $upper_bound = min($upper_bound, $identifier); } } // Discover 2, 4, 8, ... 256 logs at a time. This allows us to initially // import large repositories fairly quickly, while pulling only as much // data as we need in the common case (when we've already imported the // repository and are just grabbing one commit at a time). $limit = min($limit * 2, 256); } while ($upper_bound > 1 && !$this->isKnownCommit($upper_bound)); krsort($refs); while ($refs && $this->isKnownCommit(last($refs)->getIdentifier())) { array_pop($refs); } $refs = array_reverse($refs); $this->didDiscoverRefs($refs); return $refs; } private function verifySubversionRoot(PhabricatorRepository $repository) { list($xml) = $repository->execxRemoteCommand( 'info --xml %s', $repository->getSubversionPathURI()); $xml = phutil_utf8ize($xml); $xml = new SimpleXMLElement($xml); $remote_root = (string)($xml->entry[0]->repository[0]->root[0]); $expect_root = $repository->getSubversionPathURI(); $normal_type_svn = PhabricatorRepositoryURINormalizer::TYPE_SVN; $remote_normal = id(new PhabricatorRepositoryURINormalizer( $normal_type_svn, $remote_root))->getNormalizedPath(); $expect_normal = id(new PhabricatorRepositoryURINormalizer( $normal_type_svn, $expect_root))->getNormalizedPath(); if ($remote_normal != $expect_normal) { throw new Exception( pht( 'Repository "%s" does not have a correctly configured remote URI. '. 'The remote URI for a Subversion repository MUST point at the '. 'repository root. The root for this repository is "%s", but the '. 'configured URI is "%s". To resolve this error, set the remote URI '. 'to point at the repository root. If you want to import only part '. 'of a Subversion repository, use the "Import Only" option.', $repository->getDisplayName(), $remote_root, $expect_root)); } } /* -( Discovering Mercurial Repositories )--------------------------------- */ /** * @task hg */ private function discoverMercurialCommits() { $repository = $this->getRepository(); $branches = id(new DiffusionLowLevelMercurialBranchesQuery()) ->setRepository($repository) ->execute(); $this->fillCommitCache(mpull($branches, 'getCommitIdentifier')); $refs = array(); foreach ($branches as $branch) { // NOTE: Mercurial branches may have multiple heads, so the names may // not be unique. $name = $branch->getShortName(); $commit = $branch->getCommitIdentifier(); $this->log(pht('Examining branch "%s" head "%s".', $name, $commit)); if (!$repository->shouldTrackBranch($name)) { $this->log(pht('Skipping, branch is untracked.')); continue; } if ($this->isKnownCommit($commit)) { $this->log(pht('Skipping, this head is a known commit.')); continue; } $this->log(pht('Looking for new commits.')); $branch_refs = $this->discoverStreamAncestry( new PhabricatorMercurialGraphStream($repository, $commit), $commit, $close_immediately = true); $this->didDiscoverRefs($branch_refs); $refs[] = $branch_refs; } return array_mergev($refs); } /* -( Internals )---------------------------------------------------------- */ private function discoverStreamAncestry( PhabricatorRepositoryGraphStream $stream, $commit, $close_immediately) { $discover = array($commit); $graph = array(); $seen = array(); // Find all the reachable, undiscovered commits. Build a graph of the // edges. while ($discover) { $target = array_pop($discover); if (empty($graph[$target])) { $graph[$target] = array(); } $parents = $stream->getParents($target); foreach ($parents as $parent) { if ($this->isKnownCommit($parent)) { continue; } $graph[$target][$parent] = true; if (empty($seen[$parent])) { $seen[$parent] = true; $discover[] = $parent; } } } // Now, sort them topographically. $commits = $this->reduceGraph($graph); $refs = array(); foreach ($commits as $commit) { $refs[] = id(new PhabricatorRepositoryCommitRef()) ->setIdentifier($commit) ->setEpoch($stream->getCommitDate($commit)) ->setCanCloseImmediately($close_immediately) ->setParents($stream->getParents($commit)); } return $refs; } private function reduceGraph(array $edges) { foreach ($edges as $commit => $parents) { $edges[$commit] = array_keys($parents); } $graph = new PhutilDirectedScalarGraph(); $graph->addNodes($edges); $commits = $graph->getTopographicallySortedNodes(); // NOTE: We want the most ancestral nodes first, so we need to reverse the // list we get out of AbstractDirectedGraph. $commits = array_reverse($commits); return $commits; } private function isKnownCommit($identifier) { if (isset($this->commitCache[$identifier])) { return true; } if (isset($this->workingSet[$identifier])) { return true; } if ($this->repairMode) { // In repair mode, rediscover the entire repository, ignoring the // database state. We can hit the local cache above, but if we miss it // stop the script from going to the database cache. return false; } $this->fillCommitCache(array($identifier)); return isset($this->commitCache[$identifier]); } private function fillCommitCache(array $identifiers) { if (!$identifiers) { return; } // When filling the cache we ignore commits which have been marked as // unreachable, treating them as though they do not exist. When recording // commits later we'll revive commits that exist but are unreachable. $commits = id(new PhabricatorRepositoryCommit())->loadAllWhere( 'repositoryID = %d AND commitIdentifier IN (%Ls) AND (importStatus & %d) != %d', $this->getRepository()->getID(), $identifiers, PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE); foreach ($commits as $commit) { $this->commitCache[$commit->getCommitIdentifier()] = true; } while (count($this->commitCache) > self::MAX_COMMIT_CACHE_SIZE) { array_shift($this->commitCache); } } /** * Sort branches so we process closeable branches first. This makes the * whole import process a little cheaper, since we can close these commits * the first time through rather than catching them in the refs step. * * @task internal * * @param list List of refs. * @return list Sorted list of refs. */ private function sortRefs(array $refs) { $repository = $this->getRepository(); $head_refs = array(); $tail_refs = array(); foreach ($refs as $ref) { if ($repository->shouldAutocloseRef($ref)) { $head_refs[] = $ref; } else { $tail_refs[] = $ref; } } return array_merge($head_refs, $tail_refs); } private function recordCommit( PhabricatorRepository $repository, $commit_identifier, $epoch, $close_immediately, array $parents) { $commit = new PhabricatorRepositoryCommit(); $conn_w = $repository->establishConnection('w'); // First, try to revive an existing unreachable commit (if one exists) by // removing the "unreachable" flag. If we succeed, we don't need to do // anything else: we already discovered this commit some time ago. queryfx( $conn_w, 'UPDATE %T SET importStatus = (importStatus & ~%d) WHERE repositoryID = %d AND commitIdentifier = %s', $commit->getTableName(), PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, $repository->getID(), $commit_identifier); if ($conn_w->getAffectedRows()) { + $commit = $commit->loadOneWhere( + 'repositoryID = %d AND commitIdentifier = %s', + $repository->getID(), + $commit_identifier); + + // After reviving a commit, schedule new daemons for it. + $this->didDiscoverCommit($repository, $commit, $epoch); return; } - $commit->setRepositoryID($repository->getID()); $commit->setCommitIdentifier($commit_identifier); $commit->setEpoch($epoch); if ($close_immediately) { $commit->setImportStatus(PhabricatorRepositoryCommit::IMPORTED_CLOSEABLE); } $data = new PhabricatorRepositoryCommitData(); try { // If this commit has parents, look up their IDs. The parent commits // should always exist already. $parent_ids = array(); if ($parents) { $parent_rows = queryfx_all( $conn_w, 'SELECT id, commitIdentifier FROM %T WHERE commitIdentifier IN (%Ls) AND repositoryID = %d', $commit->getTableName(), $parents, $repository->getID()); $parent_map = ipull($parent_rows, 'id', 'commitIdentifier'); foreach ($parents as $parent) { if (empty($parent_map[$parent])) { throw new Exception( pht('Unable to identify parent "%s"!', $parent)); } $parent_ids[] = $parent_map[$parent]; } } else { // Write an explicit 0 so we can distinguish between "really no // parents" and "data not available". if (!$repository->isSVN()) { $parent_ids = array(0); } } $commit->openTransaction(); $commit->save(); $data->setCommitID($commit->getID()); $data->save(); foreach ($parent_ids as $parent_id) { queryfx( $conn_w, 'INSERT IGNORE INTO %T (childCommitID, parentCommitID) VALUES (%d, %d)', PhabricatorRepository::TABLE_PARENTS, $commit->getID(), $parent_id); } $commit->saveTransaction(); - $this->insertTask($repository, $commit); - - queryfx( - $conn_w, - 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) - VALUES (%d, 1, %d, %d) - ON DUPLICATE KEY UPDATE - size = size + 1, - lastCommitID = - IF(VALUES(epoch) > epoch, VALUES(lastCommitID), lastCommitID), - epoch = IF(VALUES(epoch) > epoch, VALUES(epoch), epoch)', - PhabricatorRepository::TABLE_SUMMARY, - $repository->getID(), - $commit->getID(), - $epoch); + $this->didDiscoverCommit($repository, $commit, $epoch); if ($this->repairMode) { // Normally, the query should throw a duplicate key exception. If we // reach this in repair mode, we've actually performed a repair. $this->log(pht('Repaired commit "%s".', $commit_identifier)); } PhutilEventEngine::dispatchEvent( new PhabricatorEvent( PhabricatorEventType::TYPE_DIFFUSION_DIDDISCOVERCOMMIT, array( 'repository' => $repository, 'commit' => $commit, ))); } catch (AphrontDuplicateKeyQueryException $ex) { $commit->killTransaction(); // Ignore. This can happen because we discover the same new commit // more than once when looking at history, or because of races or // data inconsistency or cosmic radiation; in any case, we're still // in a good state if we ignore the failure. } } + private function didDiscoverCommit( + PhabricatorRepository $repository, + PhabricatorRepositoryCommit $commit, + $epoch) { + + $this->insertTask($repository, $commit); + + // Update the repository summary table. + queryfx( + $commit->establishConnection('w'), + 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) + VALUES (%d, 1, %d, %d) + ON DUPLICATE KEY UPDATE + size = size + 1, + lastCommitID = + IF(VALUES(epoch) > epoch, VALUES(lastCommitID), lastCommitID), + epoch = IF(VALUES(epoch) > epoch, VALUES(epoch), epoch)', + PhabricatorRepository::TABLE_SUMMARY, + $repository->getID(), + $commit->getID(), + $epoch); + } + private function didDiscoverRefs(array $refs) { foreach ($refs as $ref) { $this->workingSet[$ref->getIdentifier()] = true; } } private function insertTask( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, $data = array()) { $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $class = 'PhabricatorRepositoryGitCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: $class = 'PhabricatorRepositorySvnCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $class = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; break; default: throw new Exception(pht("Unknown repository type '%s'!", $vcs)); } $data['commitID'] = $commit->getID(); PhabricatorWorker::scheduleTask($class, $data); } private function isInitialImport(array $refs) { $commit_count = count($refs); if ($commit_count <= PhabricatorRepository::IMPORT_THRESHOLD) { // If we fetched a small number of commits, assume it's an initial // commit or a stack of a few initial commits. return false; } $viewer = $this->getViewer(); $repository = $this->getRepository(); $any_commits = id(new DiffusionCommitQuery()) ->setViewer($viewer) ->withRepository($repository) ->setLimit(1) ->execute(); if ($any_commits) { // If the repository already has commits, this isn't an import. return false; } return true; } private function getObservedVersion(PhabricatorRepository $repository) { if ($repository->isHosted()) { return null; } if ($repository->isGit()) { return $this->getGitObservedVersion($repository); } return null; } private function getGitObservedVersion(PhabricatorRepository $repository) { $refs = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) ->execute(); if (!$refs) { return null; } // In Git, the observed version is the most recently discovered commit // at any repository HEAD. It's possible for this to regress temporarily // if a branch is pushed and then deleted. This is acceptable because it // doesn't do anything meaningfully bad and will fix itself on the next // push. $ref_identifiers = mpull($refs, 'getCommitIdentifier'); $ref_identifiers = array_fuse($ref_identifiers); $version = queryfx_one( $repository->establishConnection('w'), 'SELECT MAX(id) version FROM %T WHERE repositoryID = %d AND commitIdentifier IN (%Ls)', id(new PhabricatorRepositoryCommit())->getTableName(), $repository->getID(), $ref_identifiers); if (!$version) { return null; } return (int)$version['version']; } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php index 9ad8ef2b66..4bd9344409 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitHeraldWorker.php @@ -1,134 +1,144 @@ shouldSkipImportStep()) { + // This worker has no followup tasks, so we can just bail out + // right away without queueing anything. + return; + } + // Reload the commit to pull commit data and audit requests. $commit = id(new DiffusionCommitQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withIDs(array($commit->getID())) ->needCommitData(true) ->needAuditRequests(true) ->executeOne(); $data = $commit->getCommitData(); if (!$data) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Unable to load commit data. The data for this task is invalid '. 'or no longer exists.')); } $commit->attachRepository($repository); $content_source = $this->newContentSource(); $committer_phid = $data->getCommitDetail('committerPHID'); $author_phid = $data->getCommitDetail('authorPHID'); $acting_as_phid = nonempty( $committer_phid, $author_phid, id(new PhabricatorDiffusionApplication())->getPHID()); $editor = id(new PhabricatorAuditEditor()) ->setActor(PhabricatorUser::getOmnipotentUser()) ->setActingAsPHID($acting_as_phid) ->setContinueOnMissingFields(true) ->setContinueOnNoEffect(true) ->setContentSource($content_source); $xactions = array(); $xactions[] = id(new PhabricatorAuditTransaction()) ->setTransactionType(PhabricatorAuditTransaction::TYPE_COMMIT) ->setDateCreated($commit->getEpoch()) ->setNewValue(array( 'description' => $data->getCommitMessage(), 'summary' => $data->getSummary(), 'authorName' => $data->getAuthorName(), 'authorPHID' => $commit->getAuthorPHID(), 'committerName' => $data->getCommitDetail('committer'), 'committerPHID' => $data->getCommitDetail('committerPHID'), )); $reverts_refs = id(new DifferentialCustomFieldRevertsParser()) ->parseCorpus($data->getCommitMessage()); $reverts = array_mergev(ipull($reverts_refs, 'monograms')); if ($reverts) { $reverted_commits = id(new DiffusionCommitQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withRepository($repository) ->withIdentifiers($reverts) ->execute(); $reverted_commit_phids = mpull($reverted_commits, 'getPHID', 'getPHID'); // NOTE: Skip any write attempts if a user cleverly implies a commit // reverts itself. unset($reverted_commit_phids[$commit->getPHID()]); $reverts_edge = DiffusionCommitRevertsCommitEdgeType::EDGECONST; $xactions[] = id(new PhabricatorAuditTransaction()) ->setTransactionType(PhabricatorTransactions::TYPE_EDGE) ->setMetadataValue('edge:type', $reverts_edge) ->setNewValue(array('+' => array_fuse($reverted_commit_phids))); } try { $raw_patch = $this->loadRawPatchText($repository, $commit); } catch (Exception $ex) { $raw_patch = pht('Unable to generate patch: %s', $ex->getMessage()); } $editor->setRawPatch($raw_patch); return $editor->applyTransactions($commit, $xactions); } private function loadRawPatchText( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { $drequest = DiffusionRequest::newFromDictionary( array( 'user' => PhabricatorUser::getOmnipotentUser(), 'repository' => $repository, 'commit' => $commit->getCommitIdentifier(), )); $raw_query = DiffusionRawDiffQuery::newFromDiffusionRequest($drequest); $raw_query->setLinesOfContext(3); $time_key = 'metamta.diffusion.time-limit'; $byte_key = 'metamta.diffusion.byte-limit'; $time_limit = PhabricatorEnv::getEnvConfig($time_key); $byte_limit = PhabricatorEnv::getEnvConfig($byte_key); if ($time_limit) { $raw_query->setTimeout($time_limit); } $raw_diff = $raw_query->loadRawDiff(); $size = strlen($raw_diff); if ($byte_limit && $size > $byte_limit) { $pretty_size = phutil_format_bytes($size); $pretty_limit = phutil_format_bytes($byte_limit); throw new Exception(pht( 'Patch size of %s exceeds configured byte size limit (%s) of %s.', $pretty_size, $byte_key, $pretty_limit)); } return $raw_diff; } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php index 3f41226b9f..8f94df90be 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitOwnersWorker.php @@ -1,168 +1,172 @@ triggerOwnerAudits($repository, $commit); - - $commit->writeImportStatusFlag( - PhabricatorRepositoryCommit::IMPORTED_OWNERS); + if (!$this->shouldSkipImportStep()) { + $this->triggerOwnerAudits($repository, $commit); + $commit->writeImportStatusFlag($this->getImportStepFlag()); + } if ($this->shouldQueueFollowupTasks()) { $this->queueTask( 'PhabricatorRepositoryCommitHeraldWorker', array( 'commitID' => $commit->getID(), )); } } private function triggerOwnerAudits( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { if (!$repository->shouldPublish()) { return; } $affected_paths = PhabricatorOwnerPathQuery::loadAffectedPaths( $repository, $commit, PhabricatorUser::getOmnipotentUser()); $affected_packages = PhabricatorOwnersPackage::loadAffectedPackages( $repository, $affected_paths); if (!$affected_packages) { return; } $data = id(new PhabricatorRepositoryCommitData())->loadOneWhere( 'commitID = %d', $commit->getID()); $commit->attachCommitData($data); $author_phid = $data->getCommitDetail('authorPHID'); $revision_id = $data->getCommitDetail('differential.revisionID'); if ($revision_id) { $revision = id(new DifferentialRevisionQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withIDs(array($revision_id)) ->needReviewerStatus(true) ->executeOne(); } else { $revision = null; } $requests = id(new PhabricatorRepositoryAuditRequest()) ->loadAllWhere( 'commitPHID = %s', $commit->getPHID()); $requests = mpull($requests, null, 'getAuditorPHID'); foreach ($affected_packages as $package) { $request = idx($requests, $package->getPHID()); if ($request) { // Don't update request if it exists already. continue; } if ($package->isArchived()) { // Don't trigger audits if the package is archived. continue; } if ($package->getAuditingEnabled()) { $reasons = $this->checkAuditReasons( $commit, $package, $author_phid, $revision); if ($reasons) { $audit_status = PhabricatorAuditStatusConstants::AUDIT_REQUIRED; } else { $audit_status = PhabricatorAuditStatusConstants::AUDIT_NOT_REQUIRED; } } else { $reasons = array(); $audit_status = PhabricatorAuditStatusConstants::NONE; } $relationship = new PhabricatorRepositoryAuditRequest(); $relationship->setAuditorPHID($package->getPHID()); $relationship->setCommitPHID($commit->getPHID()); $relationship->setAuditReasons($reasons); $relationship->setAuditStatus($audit_status); $relationship->save(); $requests[$package->getPHID()] = $relationship; } $commit->updateAuditStatus($requests); $commit->save(); } private function checkAuditReasons( PhabricatorRepositoryCommit $commit, PhabricatorOwnersPackage $package, $author_phid, $revision) { $owner_phids = PhabricatorOwnersOwner::loadAffiliatedUserPHIDs( array( $package->getID(), )); $owner_phids = array_fuse($owner_phids); $reasons = array(); if (!$author_phid) { $reasons[] = pht('Commit Author Not Recognized'); } else if (isset($owner_phids[$author_phid])) { return $reasons; } if (!$revision) { $reasons[] = pht('No Revision Specified'); return $reasons; } $accepted_statuses = array( DifferentialReviewerStatus::STATUS_ACCEPTED, DifferentialReviewerStatus::STATUS_ACCEPTED_OLDER, ); $accepted_statuses = array_fuse($accepted_statuses); $found_accept = false; foreach ($revision->getReviewerStatus() as $reviewer) { $reviewer_phid = $reviewer->getReviewerPHID(); // If this reviewer isn't a package owner, just ignore them. if (empty($owner_phids[$reviewer_phid])) { continue; } // If this reviewer accepted the revision and owns the package, we're // all clear and do not need to trigger an audit. if (isset($accepted_statuses[$reviewer->getStatus()])) { $found_accept = true; break; } } if (!$found_accept) { $reasons[] = pht('Owners Not Involved'); } return $reasons; } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php index aa2aaa270b..f6e2661b87 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php @@ -1,80 +1,124 @@ commit) { return $this->commit; } $commit_id = idx($this->getTaskData(), 'commitID'); if (!$commit_id) { throw new PhabricatorWorkerPermanentFailureException( pht('No "%s" in task data.', 'commitID')); } $commit = id(new DiffusionCommitQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withIDs(array($commit_id)) ->executeOne(); if (!$commit) { throw new PhabricatorWorkerPermanentFailureException( pht('Commit "%s" does not exist.', $commit_id)); } + if ($commit->isUnreachable()) { + throw new PhabricatorWorkerPermanentFailureException( + pht( + 'Commit "%s" has been deleted: it is no longer reachable from '. + 'any ref.', + $commit_id)); + } + $this->commit = $commit; return $commit; } final protected function doWork() { $commit = $this->loadCommit(); $repository = $commit->getRepository(); $this->repository = $repository; $this->parseCommit($repository, $this->commit); } final protected function shouldQueueFollowupTasks() { return !idx($this->getTaskData(), 'only'); } + protected function getImportStepFlag() { + return null; + } + + final protected function shouldSkipImportStep() { + // If this step has already been performed and this is a "natural" task + // which was queued by the normal daemons, decline to do the work again. + // This mitigates races if commits are rapidly deleted and revived. + $flag = $this->getImportStepFlag(); + if (!$flag) { + // This step doesn't have an associated flag. + return false; + } + + $commit = $this->commit; + if (!$commit->isPartiallyImported($flag)) { + // This commit doesn't have the flag set yet. + return false; + } + + + if (!$this->shouldQueueFollowupTasks()) { + // This task was queued by administrative tools, so do the work even + // if it duplicates existing work. + return false; + } + + $this->log( + "%s\n", + pht( + 'Skipping import step; this step was previously completed for '. + 'this commit.')); + + return true; + } + abstract protected function parseCommit( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit); protected function isBadCommit(PhabricatorRepositoryCommit $commit) { $repository = new PhabricatorRepository(); $bad_commit = queryfx_one( $repository->establishConnection('w'), 'SELECT * FROM %T WHERE fullCommitName = %s', PhabricatorRepository::TABLE_BADCOMMIT, $commit->getMonogram()); return (bool)$bad_commit; } public function renderForDisplay(PhabricatorUser $viewer) { $suffix = parent::renderForDisplay($viewer); $commit = id(new DiffusionCommitQuery()) ->setViewer($viewer) ->withIDs(array(idx($this->getTaskData(), 'commitID'))) ->executeOne(); if (!$commit) { return $suffix; } $link = DiffusionView::linkCommit( $commit->getRepository(), $commit->getCommitIdentifier()); return array($link, $suffix); } } diff --git a/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php b/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php index 6a0161fd06..f58856d614 100644 --- a/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php +++ b/src/applications/repository/worker/commitchangeparser/PhabricatorRepositoryCommitChangeParserWorker.php @@ -1,151 +1,155 @@ log("%s\n", pht('Parsing "%s"...', $commit->getMonogram())); if ($this->isBadCommit($commit)) { $this->log(pht('This commit is marked bad!')); return; } - $results = $this->parseCommitChanges($repository, $commit); - if ($results) { - $this->writeCommitChanges($repository, $commit, $results); + if (!$this->shouldSkipImportStep()) { + $results = $this->parseCommitChanges($repository, $commit); + if ($results) { + $this->writeCommitChanges($repository, $commit, $results); + } + + $commit->writeImportStatusFlag($this->getImportStepFlag()); + + PhabricatorSearchWorker::queueDocumentForIndexing($commit->getPHID()); } $this->finishParse(); } public function parseChangesForUnitTest( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit) { return $this->parseCommitChanges($repository, $commit); } public static function lookupOrCreatePaths(array $paths) { $repository = new PhabricatorRepository(); $conn_w = $repository->establishConnection('w'); $result_map = self::lookupPaths($paths); $missing_paths = array_fill_keys($paths, true); $missing_paths = array_diff_key($missing_paths, $result_map); $missing_paths = array_keys($missing_paths); if ($missing_paths) { foreach (array_chunk($missing_paths, 128) as $path_chunk) { $sql = array(); foreach ($path_chunk as $path) { $sql[] = qsprintf($conn_w, '(%s, %s)', $path, md5($path)); } queryfx( $conn_w, 'INSERT IGNORE INTO %T (path, pathHash) VALUES %Q', PhabricatorRepository::TABLE_PATH, implode(', ', $sql)); } $result_map += self::lookupPaths($missing_paths); } return $result_map; } private static function lookupPaths(array $paths) { $repository = new PhabricatorRepository(); $conn_w = $repository->establishConnection('w'); $result_map = array(); foreach (array_chunk($paths, 128) as $path_chunk) { $chunk_map = queryfx_all( $conn_w, 'SELECT path, id FROM %T WHERE pathHash IN (%Ls)', PhabricatorRepository::TABLE_PATH, array_map('md5', $path_chunk)); foreach ($chunk_map as $row) { $result_map[$row['path']] = $row['id']; } } return $result_map; } protected function finishParse() { $commit = $this->commit; - - $commit->writeImportStatusFlag( - PhabricatorRepositoryCommit::IMPORTED_CHANGE); - - PhabricatorSearchWorker::queueDocumentForIndexing($commit->getPHID()); - if ($this->shouldQueueFollowupTasks()) { $this->queueTask( 'PhabricatorRepositoryCommitOwnersWorker', array( 'commitID' => $commit->getID(), )); } } private function writeCommitChanges( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, array $changes) { $repository_id = (int)$repository->getID(); $commit_id = (int)$commit->getID(); // NOTE: This SQL is being built manually instead of with qsprintf() // because some SVN changes affect an enormous number of paths (millions) // and this showed up as significantly slow on a profile at some point. $changes_sql = array(); foreach ($changes as $change) { $values = array( $repository_id, (int)$change->getPathID(), $commit_id, nonempty((int)$change->getTargetPathID(), 'null'), nonempty((int)$change->getTargetCommitID(), 'null'), (int)$change->getChangeType(), (int)$change->getFileType(), (int)$change->getIsDirect(), (int)$change->getCommitSequence(), ); $changes_sql[] = '('.implode(', ', $values).')'; } $conn_w = $repository->establishConnection('w'); queryfx( $conn_w, 'DELETE FROM %T WHERE commitID = %d', PhabricatorRepository::TABLE_PATHCHANGE, $commit_id); foreach (PhabricatorLiskDAO::chunkSQL($changes_sql) as $chunk) { queryfx( $conn_w, 'INSERT INTO %T (repositoryID, pathID, commitID, targetPathID, targetCommitID, changeType, fileType, isDirect, commitSequence) VALUES %Q', PhabricatorRepository::TABLE_PATHCHANGE, $chunk); } } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryCommitMessageParserWorker.php index 4e92a0f32e..e28cd78cc7 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryCommitMessageParserWorker.php @@ -1,354 +1,364 @@ shouldSkipImportStep()) { + $viewer = PhabricatorUser::getOmnipotentUser(); - $refs_raw = DiffusionQuery::callConduitWithDiffusionRequest( - $viewer, - DiffusionRequest::newFromDictionary( + $refs_raw = DiffusionQuery::callConduitWithDiffusionRequest( + $viewer, + DiffusionRequest::newFromDictionary( + array( + 'repository' => $repository, + 'user' => $viewer, + )), + 'diffusion.querycommits', array( - 'repository' => $repository, - 'user' => $viewer, - )), - 'diffusion.querycommits', - array( - 'repositoryPHID' => $repository->getPHID(), - 'phids' => array($commit->getPHID()), - 'bypassCache' => true, - 'needMessages' => true, - )); - - if (empty($refs_raw['data'])) { - throw new Exception( - pht( - 'Unable to retrieve details for commit "%s"!', - $commit->getPHID())); - } + 'repositoryPHID' => $repository->getPHID(), + 'phids' => array($commit->getPHID()), + 'bypassCache' => true, + 'needMessages' => true, + )); + + if (empty($refs_raw['data'])) { + throw new Exception( + pht( + 'Unable to retrieve details for commit "%s"!', + $commit->getPHID())); + } - $ref = DiffusionCommitRef::newFromConduitResult(head($refs_raw['data'])); + $ref = DiffusionCommitRef::newFromConduitResult(head($refs_raw['data'])); + $this->updateCommitData($ref); + } - $this->parseCommitWithRef($repository, $commit, $ref); + if ($this->shouldQueueFollowupTasks()) { + $this->queueTask( + $this->getFollowupTaskClass(), + array( + 'commitID' => $commit->getID(), + )); + } } final protected function updateCommitData(DiffusionCommitRef $ref) { $commit = $this->commit; $author = $ref->getAuthor(); $message = $ref->getMessage(); $committer = $ref->getCommitter(); $hashes = $ref->getHashes(); $data = id(new PhabricatorRepositoryCommitData())->loadOneWhere( 'commitID = %d', $commit->getID()); if (!$data) { $data = new PhabricatorRepositoryCommitData(); } $data->setCommitID($commit->getID()); $data->setAuthorName(id(new PhutilUTF8StringTruncator()) ->setMaximumBytes(255) ->truncateString((string)$author)); $data->setCommitDetail('authorEpoch', $ref->getAuthorEpoch()); $data->setCommitDetail('authorName', $ref->getAuthorName()); $data->setCommitDetail('authorEmail', $ref->getAuthorEmail()); $data->setCommitDetail( 'authorPHID', $this->resolveUserPHID($commit, $author)); $data->setCommitMessage($message); if (strlen($committer)) { $data->setCommitDetail('committer', $committer); $data->setCommitDetail('committerName', $ref->getCommitterName()); $data->setCommitDetail('committerEmail', $ref->getCommitterEmail()); $data->setCommitDetail( 'committerPHID', $this->resolveUserPHID($commit, $committer)); } $repository = $this->repository; $author_phid = $data->getCommitDetail('authorPHID'); $committer_phid = $data->getCommitDetail('committerPHID'); $user = new PhabricatorUser(); if ($author_phid) { $user = $user->loadOneWhere( 'phid = %s', $author_phid); } $differential_app = 'PhabricatorDifferentialApplication'; $revision_id = null; $low_level_query = null; if (PhabricatorApplication::isClassInstalled($differential_app)) { $low_level_query = id(new DiffusionLowLevelCommitFieldsQuery()) ->setRepository($repository) ->withCommitRef($ref); $field_values = $low_level_query->execute(); $revision_id = idx($field_values, 'revisionID'); if (!empty($field_values['reviewedByPHIDs'])) { $data->setCommitDetail( 'reviewerPHID', reset($field_values['reviewedByPHIDs'])); } $data->setCommitDetail('differential.revisionID', $revision_id); } if ($author_phid != $commit->getAuthorPHID()) { $commit->setAuthorPHID($author_phid); } $commit->setSummary($data->getSummary()); $commit->save(); // Figure out if we're going to try to "autoclose" related objects (e.g., // close linked tasks and related revisions) and, if not, record why we // aren't. Autoclose can be disabled for various reasons at the repository // or commit levels. $force_autoclose = idx($this->getTaskData(), 'forceAutoclose', false); if ($force_autoclose) { $autoclose_reason = PhabricatorRepository::BECAUSE_AUTOCLOSE_FORCED; } else { $autoclose_reason = $repository->shouldSkipAutocloseCommit($commit); } $data->setCommitDetail('autocloseReason', $autoclose_reason); $should_autoclose = $force_autoclose || $repository->shouldAutocloseCommit($commit); // When updating related objects, we'll act under an omnipotent user to // ensure we can see them, but take actions as either the committer or // author (if we recognize their accounts) or the Diffusion application // (if we do not). $actor = PhabricatorUser::getOmnipotentUser(); $acting_as_phid = nonempty( $committer_phid, $author_phid, id(new PhabricatorDiffusionApplication())->getPHID()); $conn_w = id(new DifferentialRevision())->establishConnection('w'); // NOTE: The `differential_commit` table has a unique ID on `commitPHID`, // preventing more than one revision from being associated with a commit. // Generally this is good and desirable, but with the advent of hash // tracking we may end up in a situation where we match several different // revisions. We just kind of ignore this and pick one, we might want to // revisit this and do something differently. (If we match several revisions // someone probably did something very silly, though.) $revision = null; if ($revision_id) { $revision_query = id(new DifferentialRevisionQuery()) ->withIDs(array($revision_id)) ->setViewer($actor) ->needReviewerStatus(true) ->needActiveDiffs(true); $revision = $revision_query->executeOne(); if ($revision) { if (!$data->getCommitDetail('precommitRevisionStatus')) { $data->setCommitDetail( 'precommitRevisionStatus', $revision->getStatus()); } $commit_drev = DiffusionCommitHasRevisionEdgeType::EDGECONST; id(new PhabricatorEdgeEditor()) ->addEdge($commit->getPHID(), $commit_drev, $revision->getPHID()) ->save(); queryfx( $conn_w, 'INSERT IGNORE INTO %T (revisionID, commitPHID) VALUES (%d, %s)', DifferentialRevision::TABLE_COMMIT, $revision->getID(), $commit->getPHID()); $status_closed = ArcanistDifferentialRevisionStatus::CLOSED; $should_close = ($revision->getStatus() != $status_closed) && $should_autoclose; if ($should_close) { $commit_close_xaction = id(new DifferentialTransaction()) ->setTransactionType(DifferentialTransaction::TYPE_ACTION) ->setNewValue(DifferentialAction::ACTION_CLOSE) ->setMetadataValue('isCommitClose', true); $commit_close_xaction->setMetadataValue( 'commitPHID', $commit->getPHID()); $commit_close_xaction->setMetadataValue( 'committerPHID', $committer_phid); $commit_close_xaction->setMetadataValue( 'committerName', $data->getCommitDetail('committer')); $commit_close_xaction->setMetadataValue( 'authorPHID', $author_phid); $commit_close_xaction->setMetadataValue( 'authorName', $data->getAuthorName()); if ($low_level_query) { $commit_close_xaction->setMetadataValue( 'revisionMatchData', $low_level_query->getRevisionMatchData()); $data->setCommitDetail( 'revisionMatchData', $low_level_query->getRevisionMatchData()); } $extraction_engine = id(new DifferentialDiffExtractionEngine()) ->setViewer($actor) ->setAuthorPHID($acting_as_phid); $content_source = $this->newContentSource(); $update_data = $extraction_engine->updateRevisionWithCommit( $revision, $commit, array( $commit_close_xaction, ), $content_source); foreach ($update_data as $key => $value) { $data->setCommitDetail($key, $value); } } } } if ($should_autoclose) { $this->closeTasks( $actor, $acting_as_phid, $repository, $commit, $message); } $data->save(); $commit->writeImportStatusFlag( PhabricatorRepositoryCommit::IMPORTED_MESSAGE); } private function resolveUserPHID( PhabricatorRepositoryCommit $commit, $user_name) { return id(new DiffusionResolveUserQuery()) ->withCommit($commit) ->withName($user_name) ->execute(); } private function closeTasks( PhabricatorUser $actor, $acting_as, PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, $message) { $maniphest = 'PhabricatorManiphestApplication'; if (!PhabricatorApplication::isClassInstalled($maniphest)) { return; } $prefixes = ManiphestTaskStatus::getStatusPrefixMap(); $suffixes = ManiphestTaskStatus::getStatusSuffixMap(); $matches = id(new ManiphestCustomFieldStatusParser()) ->parseCorpus($message); $task_statuses = array(); foreach ($matches as $match) { $prefix = phutil_utf8_strtolower($match['prefix']); $suffix = phutil_utf8_strtolower($match['suffix']); $status = idx($suffixes, $suffix); if (!$status) { $status = idx($prefixes, $prefix); } foreach ($match['monograms'] as $task_monogram) { $task_id = (int)trim($task_monogram, 'tT'); $task_statuses[$task_id] = $status; } } if (!$task_statuses) { return; } $tasks = id(new ManiphestTaskQuery()) ->setViewer($actor) ->withIDs(array_keys($task_statuses)) ->needProjectPHIDs(true) ->execute(); foreach ($tasks as $task_id => $task) { $xactions = array(); $edge_type = ManiphestTaskHasCommitEdgeType::EDGECONST; $edge_xaction = id(new ManiphestTransaction()) ->setTransactionType(PhabricatorTransactions::TYPE_EDGE) ->setMetadataValue('edge:type', $edge_type) ->setNewValue( array( '+' => array( $commit->getPHID() => $commit->getPHID(), ), )); $status = $task_statuses[$task_id]; if ($status) { if ($task->getStatus() != $status) { $xactions[] = id(new ManiphestTransaction()) ->setTransactionType(ManiphestTransaction::TYPE_STATUS) ->setMetadataValue('commitPHID', $commit->getPHID()) ->setNewValue($status); $edge_xaction->setMetadataValue('commitPHID', $commit->getPHID()); } } $xactions[] = $edge_xaction; $content_source = $this->newContentSource(); $editor = id(new ManiphestTransactionEditor()) ->setActor($actor) ->setActingAsPHID($acting_as) ->setContinueOnNoEffect(true) ->setContinueOnMissingFields(true) ->setUnmentionablePHIDMap( array($commit->getPHID() => $commit->getPHID())) ->setContentSource($content_source); $editor->applyTransactions($task, $xactions); } } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php index b41f4c421c..0c83a27fab 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryGitCommitMessageParserWorker.php @@ -1,22 +1,10 @@ updateCommitData($ref); - - if ($this->shouldQueueFollowupTasks()) { - $this->queueTask( - 'PhabricatorRepositoryGitCommitChangeParserWorker', - array( - 'commitID' => $commit->getID(), - )); - } + protected function getFollowupTaskClass() { + return 'PhabricatorRepositoryGitCommitChangeParserWorker'; } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php index 72d187d896..654083e736 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php @@ -1,22 +1,10 @@ updateCommitData($ref); - - if ($this->shouldQueueFollowupTasks()) { - $this->queueTask( - 'PhabricatorRepositoryMercurialCommitChangeParserWorker', - array( - 'commitID' => $commit->getID(), - )); - } + protected function getFollowupTaskClass() { + return 'PhabricatorRepositoryMercurialCommitChangeParserWorker'; } } diff --git a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php index 648c8adaaf..7006957b50 100644 --- a/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php +++ b/src/applications/repository/worker/commitmessageparser/PhabricatorRepositorySvnCommitMessageParserWorker.php @@ -1,22 +1,10 @@ updateCommitData($ref); - - if ($this->shouldQueueFollowupTasks()) { - $this->queueTask( - 'PhabricatorRepositorySvnCommitChangeParserWorker', - array( - 'commitID' => $commit->getID(), - )); - } + protected function getFollowupTaskClass() { + return 'PhabricatorRepositorySvnCommitChangeParserWorker'; } }