diff --git a/src/applications/fact/extract/PhabricatorFactUpdateIterator.php b/src/applications/fact/extract/PhabricatorFactUpdateIterator.php index b7c59a4499..be93cf62c1 100644 --- a/src/applications/fact/extract/PhabricatorFactUpdateIterator.php +++ b/src/applications/fact/extract/PhabricatorFactUpdateIterator.php @@ -1,86 +1,91 @@ object = $object; + $this->set = new LiskDAOSet(); + $this->object = $object->putInSet($this->set); $this->position = '0:0'; } public function setPosition($position) { $this->position = $position; return $this; } protected function didRewind() { $this->cursor = $this->position; } protected function getCursorFromObject($object) { return $object->getDateModified().':'.$object->getID(); } public function key() { return $this->getCursorFromObject($this->current()); } protected function loadPage() { list($after_epoch, $after_id) = explode(':', $this->cursor); + $this->set->clearSet(); + // NOTE: We ignore recent updates because once we process an update we'll // never process rows behind it again. We need to read only rows which // we're sure no new rows will be inserted behind. If we read a row that // was updated on the current second, another update later on in this second // could affect an object with a lower ID, and we'd skip that update. To // avoid this, just ignore any rows which have been updated in the last few // seconds. This also reduces the amount of work we need to do if an object // is repeatedly updated; we will just look at the end state without // processing the intermediate states. Finally, this gives us reasonable // protections against clock skew between the machine the daemon is running // on and any machines performing writes. $page = $this->object->loadAllWhere( '((dateModified > %d) OR (dateModified = %d AND id > %d)) AND (dateModified < %d - %d) ORDER BY dateModified ASC, id ASC LIMIT %d', $after_epoch, $after_epoch, $after_id, time(), $this->ignoreUpdatesDuration, $this->getPageSize()); if ($page) { $this->cursor = $this->getCursorFromObject(end($page)); } return $page; } } diff --git a/src/infrastructure/storage/lisk/LiskDAOSet.php b/src/infrastructure/storage/lisk/LiskDAOSet.php index 1b343b96a4..6d8ebc4483 100644 --- a/src/infrastructure/storage/lisk/LiskDAOSet.php +++ b/src/infrastructure/storage/lisk/LiskDAOSet.php @@ -1,85 +1,101 @@ addToSet($author); * foreach ($reviewers as $reviewer) { * $users->addToSet($reviewer); * } * foreach ($ccs as $cc) { * $users->addToSet($cc); * } * // Preload e-mails of all involved users and return e-mails of author. * $author_emails = $author->loadRelatives( * new PhabricatorUserEmail(), * 'userPHID', * 'getPHID'); */ final class LiskDAOSet { private $daos = array(); private $relatives = array(); + private $subsets = array(); public function addToSet(LiskDAO $dao) { $this->daos[] = $dao; $dao->putInSet($this); return $this; } + /** + * The main purpose of this method is to break cyclic dependency. + * It removes all objects from this set and all subsets created by it. + */ + final public function clearSet() { + $this->daos = array(); + $this->relatives = array(); + foreach ($this->subsets as $set) { + $set->clearSet(); + } + return $this; + } + + /** * See @{method:LiskDAO::loadRelatives}. */ public function loadRelatives( LiskDAO $object, $foreign_column, $key_method = 'getID', $where = '') { $relatives = &$this->relatives[ get_class($object)."-{$foreign_column}-{$key_method}-{$where}"]; if ($relatives === null) { $ids = array(); foreach ($this->daos as $dao) { $id = $dao->$key_method(); if ($id !== null) { $ids[$id] = $id; } } if (!$ids) { $relatives = array(); } else { $set = new LiskDAOSet(); + $this->subsets[] = $set; $relatives = $object->putInSet($set)->loadAllWhere( '%C IN (%Ls) %Q', $foreign_column, $ids, ($where != '' ? 'AND '.$where : '')); $relatives = mgroup($relatives, 'get'.$foreign_column); } } return $relatives; } }