diff --git a/src/applications/drydock/storage/DrydockLease.php b/src/applications/drydock/storage/DrydockLease.php index c5834f0136..af0b322b62 100644 --- a/src/applications/drydock/storage/DrydockLease.php +++ b/src/applications/drydock/storage/DrydockLease.php @@ -1,396 +1,406 @@ releaseOnDestruction = true; return $this; } public function __destruct() { if (!$this->releaseOnDestruction) { return; } if (!$this->canRelease()) { return; } $actor = PhabricatorUser::getOmnipotentUser(); $drydock_phid = id(new PhabricatorDrydockApplication())->getPHID(); $command = DrydockCommand::initializeNewCommand($actor) ->setTargetPHID($this->getPHID()) ->setAuthorPHID($drydock_phid) ->setCommand(DrydockCommand::COMMAND_RELEASE) ->save(); $this->scheduleUpdate(); } public function getLeaseName() { return pht('Lease %d', $this->getID()); } protected function getConfiguration() { return array( self::CONFIG_AUX_PHID => true, self::CONFIG_SERIALIZATION => array( 'attributes' => self::SERIALIZATION_JSON, ), self::CONFIG_COLUMN_SCHEMA => array( 'status' => 'text32', 'until' => 'epoch?', 'resourceType' => 'text128', 'ownerPHID' => 'phid?', 'resourcePHID' => 'phid?', ), self::CONFIG_KEY_SCHEMA => array( 'key_resource' => array( 'columns' => array('resourcePHID', 'status'), ), ), ) + parent::getConfiguration(); } public function setAttribute($key, $value) { $this->attributes[$key] = $value; return $this; } public function getAttribute($key, $default = null) { return idx($this->attributes, $key, $default); } public function generatePHID() { return PhabricatorPHID::generateNewPHID(DrydockLeasePHIDType::TYPECONST); } public function getInterface($type) { return $this->getResource()->getInterface($this, $type); } public function getResource() { return $this->assertAttached($this->resource); } public function attachResource(DrydockResource $resource = null) { $this->resource = $resource; return $this; } public function hasAttachedResource() { return ($this->resource !== null); } public function getUnconsumedCommands() { return $this->assertAttached($this->unconsumedCommands); } public function attachUnconsumedCommands(array $commands) { $this->unconsumedCommands = $commands; return $this; } public function isReleasing() { foreach ($this->getUnconsumedCommands() as $command) { if ($command->getCommand() == DrydockCommand::COMMAND_RELEASE) { return true; } } return false; } public function queueForActivation() { if ($this->getID()) { throw new Exception( pht('Only new leases may be queued for activation!')); } $this ->setStatus(DrydockLeaseStatus::STATUS_PENDING) ->save(); $task = PhabricatorWorker::scheduleTask( 'DrydockAllocatorWorker', array( 'leasePHID' => $this->getPHID(), ), array( 'objectPHID' => $this->getPHID(), )); return $this; } public function isActivating() { switch ($this->getStatus()) { case DrydockLeaseStatus::STATUS_PENDING: case DrydockLeaseStatus::STATUS_ACQUIRED: return true; } return false; } public function isActive() { switch ($this->getStatus()) { case DrydockLeaseStatus::STATUS_ACTIVE: return true; } return false; } public function waitUntilActive() { while (true) { $lease = $this->reload(); if (!$lease) { throw new Exception(pht('Failed to reload lease.')); } $status = $lease->getStatus(); switch ($status) { case DrydockLeaseStatus::STATUS_ACTIVE: return; case DrydockLeaseStatus::STATUS_RELEASED: throw new Exception(pht('Lease has already been released!')); case DrydockLeaseStatus::STATUS_DESTROYED: throw new Exception(pht('Lease has already been destroyed!')); case DrydockLeaseStatus::STATUS_BROKEN: throw new Exception(pht('Lease has been broken!')); case DrydockLeaseStatus::STATUS_PENDING: case DrydockLeaseStatus::STATUS_ACQUIRED: break; default: throw new Exception( pht( 'Lease has unknown status "%s".', $status)); } sleep(1); } } public function setActivateWhenAcquired($activate) { $this->activateWhenAcquired = true; return $this; } public function needSlotLock($key) { $this->slotLocks[] = $key; return $this; } public function acquireOnResource(DrydockResource $resource) { $expect_status = DrydockLeaseStatus::STATUS_PENDING; $actual_status = $this->getStatus(); if ($actual_status != $expect_status) { throw new Exception( pht( 'Trying to acquire a lease on a resource which is in the wrong '. 'state: status must be "%s", actually "%s".', $expect_status, $actual_status)); } if ($this->activateWhenAcquired) { $new_status = DrydockLeaseStatus::STATUS_ACTIVE; } else { $new_status = DrydockLeaseStatus::STATUS_ACQUIRED; } if ($new_status == DrydockLeaseStatus::STATUS_ACTIVE) { if ($resource->getStatus() == DrydockResourceStatus::STATUS_PENDING) { throw new Exception( pht( 'Trying to acquire an active lease on a pending resource. '. 'You can not immediately activate leases on resources which '. 'need time to start up.')); } } $this->openTransaction(); $this ->setResourcePHID($resource->getPHID()) ->setStatus($new_status) ->save(); DrydockSlotLock::acquireLocks($this->getPHID(), $this->slotLocks); $this->slotLocks = array(); $this->saveTransaction(); $this->isAcquired = true; if ($new_status == DrydockLeaseStatus::STATUS_ACTIVE) { $this->didActivate(); } return $this; } public function isAcquiredLease() { return $this->isAcquired; } public function activateOnResource(DrydockResource $resource) { $expect_status = DrydockLeaseStatus::STATUS_ACQUIRED; $actual_status = $this->getStatus(); if ($actual_status != $expect_status) { throw new Exception( pht( 'Trying to activate a lease which has the wrong status: status '. 'must be "%s", actually "%s".', $expect_status, $actual_status)); } if ($resource->getStatus() == DrydockResourceStatus::STATUS_PENDING) { // TODO: Be stricter about this? throw new Exception( pht( 'Trying to activate a lease on a pending resource.')); } $this->openTransaction(); $this ->setStatus(DrydockLeaseStatus::STATUS_ACTIVE) ->save(); DrydockSlotLock::acquireLocks($this->getPHID(), $this->slotLocks); $this->slotLocks = array(); $this->saveTransaction(); $this->isActivated = true; $this->didActivate(); return $this; } public function isActivatedLease() { return $this->isActivated; } public function canRelease() { if (!$this->getID()) { return false; } switch ($this->getStatus()) { case DrydockLeaseStatus::STATUS_RELEASED: case DrydockLeaseStatus::STATUS_DESTROYED: return false; default: return true; } } public function canUpdate() { switch ($this->getStatus()) { case DrydockLeaseStatus::STATUS_ACTIVE: return true; default: return false; } } public function scheduleUpdate($epoch = null) { PhabricatorWorker::scheduleTask( 'DrydockLeaseUpdateWorker', array( 'leasePHID' => $this->getPHID(), 'isExpireTask' => ($epoch !== null), ), array( 'objectPHID' => $this->getPHID(), - 'delayUntil' => $epoch, + 'delayUntil' => ($epoch ? (int)$epoch : null), )); } + public function setAwakenTaskIDs(array $ids) { + $this->setAttribute('internal.awakenTaskIDs', $ids); + return $this; + } + private function didActivate() { $viewer = PhabricatorUser::getOmnipotentUser(); $need_update = false; $commands = id(new DrydockCommandQuery()) ->setViewer($viewer) ->withTargetPHIDs(array($this->getPHID())) ->withConsumed(false) ->execute(); if ($commands) { $need_update = true; } if ($need_update) { $this->scheduleUpdate(); } $expires = $this->getUntil(); if ($expires) { $this->scheduleUpdate($expires); } + + $awaken_ids = $this->getAttribute('internal.awakenTaskIDs'); + if (is_array($awaken_ids) && $awaken_ids) { + PhabricatorWorker::awakenTaskIDs($awaken_ids); + } } /* -( PhabricatorPolicyInterface )----------------------------------------- */ public function getCapabilities() { return array( PhabricatorPolicyCapability::CAN_VIEW, PhabricatorPolicyCapability::CAN_EDIT, ); } public function getPolicy($capability) { if ($this->getResource()) { return $this->getResource()->getPolicy($capability); } // TODO: Implement reasonable policies. return PhabricatorPolicies::getMostOpenPolicy(); } public function hasAutomaticCapability($capability, PhabricatorUser $viewer) { if ($this->getResource()) { return $this->getResource()->hasAutomaticCapability($capability, $viewer); } return false; } public function describeAutomaticCapability($capability) { return pht('Leases inherit policies from the resources they lease.'); } } diff --git a/src/applications/drydock/storage/DrydockResource.php b/src/applications/drydock/storage/DrydockResource.php index ab968c2249..ab2230ce32 100644 --- a/src/applications/drydock/storage/DrydockResource.php +++ b/src/applications/drydock/storage/DrydockResource.php @@ -1,282 +1,282 @@ true, self::CONFIG_SERIALIZATION => array( 'attributes' => self::SERIALIZATION_JSON, 'capabilities' => self::SERIALIZATION_JSON, ), self::CONFIG_COLUMN_SCHEMA => array( 'name' => 'text255', 'ownerPHID' => 'phid?', 'status' => 'text32', 'type' => 'text64', 'until' => 'epoch?', ), self::CONFIG_KEY_SCHEMA => array( 'key_type' => array( 'columns' => array('type', 'status'), ), 'key_blueprint' => array( 'columns' => array('blueprintPHID', 'status'), ), ), ) + parent::getConfiguration(); } public function generatePHID() { return PhabricatorPHID::generateNewPHID(DrydockResourcePHIDType::TYPECONST); } public function getAttribute($key, $default = null) { return idx($this->attributes, $key, $default); } public function getAttributesForTypeSpec(array $attribute_names) { return array_select_keys($this->attributes, $attribute_names); } public function setAttribute($key, $value) { $this->attributes[$key] = $value; return $this; } public function getCapability($key, $default = null) { return idx($this->capbilities, $key, $default); } public function getInterface(DrydockLease $lease, $type) { return $this->getBlueprint()->getInterface($this, $lease, $type); } public function getBlueprint() { return $this->assertAttached($this->blueprint); } public function attachBlueprint(DrydockBlueprint $blueprint) { $this->blueprint = $blueprint; return $this; } public function getUnconsumedCommands() { return $this->assertAttached($this->unconsumedCommands); } public function attachUnconsumedCommands(array $commands) { $this->unconsumedCommands = $commands; return $this; } public function isReleasing() { foreach ($this->getUnconsumedCommands() as $command) { if ($command->getCommand() == DrydockCommand::COMMAND_RELEASE) { return true; } } return false; } public function setActivateWhenAllocated($activate) { $this->activateWhenAllocated = $activate; return $this; } public function needSlotLock($key) { $this->slotLocks[] = $key; return $this; } public function allocateResource() { if ($this->getID()) { throw new Exception( pht( 'Trying to allocate a resource which has already been persisted. '. 'Only new resources may be allocated.')); } $expect_status = DrydockResourceStatus::STATUS_PENDING; $actual_status = $this->getStatus(); if ($actual_status != $expect_status) { throw new Exception( pht( 'Trying to allocate a resource from the wrong status. Status must '. 'be "%s", actually "%s".', $expect_status, $actual_status)); } if ($this->activateWhenAllocated) { $new_status = DrydockResourceStatus::STATUS_ACTIVE; } else { $new_status = DrydockResourceStatus::STATUS_PENDING; } $this->openTransaction(); $this ->setStatus($new_status) ->save(); DrydockSlotLock::acquireLocks($this->getPHID(), $this->slotLocks); $this->slotLocks = array(); $this->saveTransaction(); $this->isAllocated = true; if ($new_status == DrydockResourceStatus::STATUS_ACTIVE) { $this->didActivate(); } return $this; } public function isAllocatedResource() { return $this->isAllocated; } public function activateResource() { if (!$this->getID()) { throw new Exception( pht( 'Trying to activate a resource which has not yet been persisted.')); } $expect_status = DrydockResourceStatus::STATUS_PENDING; $actual_status = $this->getStatus(); if ($actual_status != $expect_status) { throw new Exception( pht( 'Trying to activate a resource from the wrong status. Status must '. 'be "%s", actually "%s".', $expect_status, $actual_status)); } $this->openTransaction(); $this ->setStatus(DrydockResourceStatus::STATUS_ACTIVE) ->save(); DrydockSlotLock::acquireLocks($this->getPHID(), $this->slotLocks); $this->slotLocks = array(); $this->saveTransaction(); $this->isActivated = true; $this->didActivate(); return $this; } public function isActivatedResource() { return $this->isActivated; } public function canRelease() { switch ($this->getStatus()) { case DrydockResourceStatus::STATUS_RELEASED: case DrydockResourceStatus::STATUS_DESTROYED: return false; default: return true; } } public function scheduleUpdate($epoch = null) { PhabricatorWorker::scheduleTask( 'DrydockResourceUpdateWorker', array( 'resourcePHID' => $this->getPHID(), 'isExpireTask' => ($epoch !== null), ), array( 'objectPHID' => $this->getPHID(), - 'delayUntil' => $epoch, + 'delayUntil' => ($epoch ? (int)$epoch : null), )); } private function didActivate() { $viewer = PhabricatorUser::getOmnipotentUser(); $need_update = false; $commands = id(new DrydockCommandQuery()) ->setViewer($viewer) ->withTargetPHIDs(array($this->getPHID())) ->withConsumed(false) ->execute(); if ($commands) { $need_update = true; } if ($need_update) { $this->scheduleUpdate(); } $expires = $this->getUntil(); if ($expires) { $this->scheduleUpdate($expires); } } public function canUpdate() { switch ($this->getStatus()) { case DrydockResourceStatus::STATUS_ACTIVE: return true; default: return false; } } /* -( PhabricatorPolicyInterface )----------------------------------------- */ public function getCapabilities() { return array( PhabricatorPolicyCapability::CAN_VIEW, PhabricatorPolicyCapability::CAN_EDIT, ); } public function getPolicy($capability) { return $this->getBlueprint()->getPolicy($capability); } public function hasAutomaticCapability($capability, PhabricatorUser $viewer) { return $this->getBlueprint()->hasAutomaticCapability( $capability, $viewer); } public function describeAutomaticCapability($capability) { return pht('Resources inherit the policies of their blueprints.'); } } diff --git a/src/applications/harbormaster/step/HarbormasterBuildStepImplementation.php b/src/applications/harbormaster/step/HarbormasterBuildStepImplementation.php index 744ad2474f..b47f1d00d1 100644 --- a/src/applications/harbormaster/step/HarbormasterBuildStepImplementation.php +++ b/src/applications/harbormaster/step/HarbormasterBuildStepImplementation.php @@ -1,274 +1,284 @@ currentWorkerTaskID = $id; + return $this; + } + + public function getCurrentWorkerTaskID() { + return $this->currentWorkerTaskID; + } public static function getImplementations() { return id(new PhutilClassMapQuery()) ->setAncestorClass(__CLASS__) ->execute(); } public static function getImplementation($class) { $base = idx(self::getImplementations(), $class); if ($base) { return (clone $base); } return null; } public static function requireImplementation($class) { if (!$class) { throw new Exception(pht('No implementation is specified!')); } $implementation = self::getImplementation($class); if (!$implementation) { throw new Exception(pht('No such implementation "%s" exists!', $class)); } return $implementation; } /** * The name of the implementation. */ abstract public function getName(); public function getBuildStepGroupKey() { return HarbormasterOtherBuildStepGroup::GROUPKEY; } /** * The generic description of the implementation. */ public function getGenericDescription() { return ''; } /** * The description of the implementation, based on the current settings. */ public function getDescription() { return $this->getGenericDescription(); } /** * Run the build target against the specified build. */ abstract public function execute( HarbormasterBuild $build, HarbormasterBuildTarget $build_target); /** * Gets the settings for this build step. */ public function getSettings() { return $this->settings; } public function getSetting($key, $default = null) { return idx($this->settings, $key, $default); } /** * Loads the settings for this build step implementation from a build * step or target. */ final public function loadSettings($build_object) { $this->settings = $build_object->getDetails(); return $this; } /** * Return the name of artifacts produced by this command. * * Something like: * * return array( * 'some_name_input_by_user' => 'host'); * * Future steps will calculate all available artifact mappings * before them and filter on the type. * * @return array The mappings of artifact names to their types. */ public function getArtifactInputs() { return array(); } public function getArtifactOutputs() { return array(); } public function getDependencies(HarbormasterBuildStep $build_step) { $dependencies = $build_step->getDetail('dependsOn', array()); $inputs = $build_step->getStepImplementation()->getArtifactInputs(); $inputs = ipull($inputs, null, 'key'); $artifacts = $this->getAvailableArtifacts( $build_step->getBuildPlan(), $build_step, null); foreach ($artifacts as $key => $type) { if (!array_key_exists($key, $inputs)) { unset($artifacts[$key]); } } $artifact_steps = ipull($artifacts, 'step'); $artifact_steps = mpull($artifact_steps, 'getPHID'); $dependencies = array_merge($dependencies, $artifact_steps); return $dependencies; } /** * Returns a list of all artifacts made available in the build plan. */ public static function getAvailableArtifacts( HarbormasterBuildPlan $build_plan, $current_build_step, $artifact_type) { $steps = id(new HarbormasterBuildStepQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withBuildPlanPHIDs(array($build_plan->getPHID())) ->execute(); $artifacts = array(); $artifact_arrays = array(); foreach ($steps as $step) { if ($current_build_step !== null && $step->getPHID() === $current_build_step->getPHID()) { continue; } $implementation = $step->getStepImplementation(); $array = $implementation->getArtifactOutputs(); $array = ipull($array, 'type', 'key'); foreach ($array as $name => $type) { if ($type !== $artifact_type && $artifact_type !== null) { continue; } $artifacts[$name] = array('type' => $type, 'step' => $step); } } return $artifacts; } /** * Convert a user-provided string with variables in it, like: * * ls ${dirname} * * ...into a string with variables merged into it safely: * * ls 'dir with spaces' * * @param string Name of a `vxsprintf` function, like @{function:vcsprintf}. * @param string User-provided pattern string containing `${variables}`. * @param dict List of available replacement variables. * @return string String with variables replaced safely into it. */ protected function mergeVariables($function, $pattern, array $variables) { $regexp = '/\\$\\{(?P[a-z\\.]+)\\}/'; $matches = null; preg_match_all($regexp, $pattern, $matches); $argv = array(); foreach ($matches['name'] as $name) { if (!array_key_exists($name, $variables)) { throw new Exception(pht("No such variable '%s'!", $name)); } $argv[] = $variables[$name]; } $pattern = str_replace('%', '%%', $pattern); $pattern = preg_replace($regexp, '%s', $pattern); return call_user_func($function, $pattern, $argv); } public function getFieldSpecifications() { return array(); } protected function formatSettingForDescription($key, $default = null) { return $this->formatValueForDescription($this->getSetting($key, $default)); } protected function formatValueForDescription($value) { if (strlen($value)) { return phutil_tag('strong', array(), $value); } else { return phutil_tag('em', array(), pht('(null)')); } } public function supportsWaitForMessage() { return false; } public function shouldWaitForMessage(HarbormasterBuildTarget $target) { if (!$this->supportsWaitForMessage()) { return false; } return (bool)$target->getDetail('builtin.wait-for-message'); } protected function shouldAbort( HarbormasterBuild $build, HarbormasterBuildTarget $target) { return $build->getBuildGeneration() !== $target->getBuildGeneration(); } protected function resolveFutures( HarbormasterBuild $build, HarbormasterBuildTarget $target, array $futures) { $futures = new FutureIterator($futures); foreach ($futures->setUpdateInterval(5) as $key => $future) { if ($future === null) { $build->reload(); if ($this->shouldAbort($build, $target)) { throw new HarbormasterBuildAbortedException(); } } } } /* -( Automatic Targets )-------------------------------------------------- */ public function getBuildStepAutotargetStepKey() { return null; } public function getBuildStepAutotargetPlanKey() { throw new PhutilMethodNotImplementedException(); } public function shouldRequireAutotargeting() { return false; } } diff --git a/src/applications/harbormaster/step/HarbormasterLeaseWorkingCopyBuildStepImplementation.php b/src/applications/harbormaster/step/HarbormasterLeaseWorkingCopyBuildStepImplementation.php index 1f5b139008..b01ae26da6 100644 --- a/src/applications/harbormaster/step/HarbormasterLeaseWorkingCopyBuildStepImplementation.php +++ b/src/applications/harbormaster/step/HarbormasterLeaseWorkingCopyBuildStepImplementation.php @@ -1,106 +1,111 @@ getSettings(); // TODO: We should probably have a separate temporary storage area for // execution stuff that doesn't step on configuration state? $lease_phid = $build_target->getDetail('exec.leasePHID'); if ($lease_phid) { $lease = id(new DrydockLeaseQuery()) ->setViewer($viewer) ->withPHIDs(array($lease_phid)) ->executeOne(); if (!$lease) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Lease "%s" could not be loaded.', $lease_phid)); } } else { $working_copy_type = id(new DrydockWorkingCopyBlueprintImplementation()) ->getType(); $lease = id(new DrydockLease()) ->setResourceType($working_copy_type) ->setOwnerPHID($build_target->getPHID()); $variables = $build_target->getVariables(); $repository_phid = idx($variables, 'repository.phid'); $commit = idx($variables, 'repository.commit'); $lease ->setAttribute('repositoryPHID', $repository_phid) ->setAttribute('commit', $commit); + $task_id = $this->getCurrentWorkerTaskID(); + if ($task_id) { + $lease->setAwakenTaskIDs(array($task_id)); + } + $lease->queueForActivation(); $build_target ->setDetail('exec.leasePHID', $lease->getPHID()) ->save(); } if ($lease->isActivating()) { // TODO: Smart backoff? throw new PhabricatorWorkerYieldException(15); } if (!$lease->isActive()) { // TODO: We could just forget about this lease and retry? throw new PhabricatorWorkerPermanentFailureException( pht( 'Lease "%s" never activated.', $lease->getPHID())); } $artifact = $build_target->createArtifact( $viewer, $settings['name'], HarbormasterWorkingCopyArtifact::ARTIFACTCONST, array( 'drydockLeasePHID' => $lease->getPHID(), )); } public function getArtifactOutputs() { return array( array( 'name' => pht('Working Copy'), 'key' => $this->getSetting('name'), 'type' => HarbormasterWorkingCopyArtifact::ARTIFACTCONST, ), ); } public function getFieldSpecifications() { return array( 'name' => array( 'name' => pht('Artifact Name'), 'type' => 'text', 'required' => true, ), ); } } diff --git a/src/applications/harbormaster/worker/HarbormasterTargetWorker.php b/src/applications/harbormaster/worker/HarbormasterTargetWorker.php index db355b145c..0f4d4092fa 100644 --- a/src/applications/harbormaster/worker/HarbormasterTargetWorker.php +++ b/src/applications/harbormaster/worker/HarbormasterTargetWorker.php @@ -1,113 +1,114 @@ loadBuildTarget(); } catch (Exception $ex) { return null; } return $viewer->renderHandle($target->getPHID()); } private function loadBuildTarget() { $data = $this->getTaskData(); $id = idx($data, 'targetID'); $target = id(new HarbormasterBuildTargetQuery()) ->withIDs(array($id)) ->setViewer($this->getViewer()) ->executeOne(); if (!$target) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Bad build target ID "%d".', $id)); } return $target; } protected function doWork() { $target = $this->loadBuildTarget(); $build = $target->getBuild(); $viewer = $this->getViewer(); $target->setDateStarted(time()); try { if ($target->getBuildGeneration() !== $build->getBuildGeneration()) { throw new HarbormasterBuildAbortedException(); } $status_pending = HarbormasterBuildTarget::STATUS_PENDING; if ($target->getTargetStatus() == $status_pending) { $target->setTargetStatus(HarbormasterBuildTarget::STATUS_BUILDING); $target->save(); } $implementation = $target->getImplementation(); + $implementation->setCurrentWorkerTaskID($this->getCurrentWorkerTaskID()); $implementation->execute($build, $target); $next_status = HarbormasterBuildTarget::STATUS_PASSED; if ($implementation->shouldWaitForMessage($target)) { $next_status = HarbormasterBuildTarget::STATUS_WAITING; } $target->setTargetStatus($next_status); if ($target->isComplete()) { $target->setDateCompleted(PhabricatorTime::getNow()); } $target->save(); } catch (PhabricatorWorkerYieldException $ex) { // If the target wants to yield, let that escape without further // processing. We'll resume after the task retries. throw $ex; } catch (HarbormasterBuildFailureException $ex) { // A build step wants to fail explicitly. $target->setTargetStatus(HarbormasterBuildTarget::STATUS_FAILED); $target->setDateCompleted(PhabricatorTime::getNow()); $target->save(); } catch (HarbormasterBuildAbortedException $ex) { // A build step is aborting because the build has been restarted. $target->setTargetStatus(HarbormasterBuildTarget::STATUS_ABORTED); $target->setDateCompleted(PhabricatorTime::getNow()); $target->save(); } catch (Exception $ex) { phlog($ex); try { $log = $build->createLog($target, 'core', 'exception'); $start = $log->start(); $log->append((string)$ex); $log->finalize($start); } catch (Exception $log_ex) { phlog($log_ex); } $target->setTargetStatus(HarbormasterBuildTarget::STATUS_FAILED); $target->setDateCompleted(time()); $target->save(); } id(new HarbormasterBuildEngine()) ->setViewer($viewer) ->setBuild($build) ->continueBuild(); } } diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index 4e71440604..c448e7221a 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,211 +1,286 @@ currentWorkerTask = $task; + return $this; + } + + public function getCurrentWorkerTask() { + return $this->currentWorkerTask; + } + + public function getCurrentWorkerTaskID() { + $task = $this->getCurrentWorkerTask(); + if (!$task) { + return null; + } + return $task->getID(); + } + abstract protected function doWork(); final public function __construct($data) { $this->data = $data; } final protected function getTaskData() { return $this->data; } final protected function getTaskDataValue($key, $default = null) { $data = $this->getTaskData(); if (!is_array($data)) { throw new PhabricatorWorkerPermanentFailureException( pht('Expected task data to be a dictionary.')); } return idx($data, $key, $default); } final public function executeTask() { $this->doWork(); } final public static function scheduleTask( $task_class, $data, $options = array()) { + PhutilTypeSpec::checkMap( + $options, + array( + 'priority' => 'optional int|null', + 'objectPHID' => 'optional string|null', + 'delayUntil' => 'optional int|null', + )); + $priority = idx($options, 'priority'); if ($priority === null) { $priority = self::PRIORITY_DEFAULT; } $object_phid = idx($options, 'objectPHID'); $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) ->setData($data) ->setPriority($priority) ->setObjectPHID($object_phid); $delay = idx($options, 'delayUntil'); if ($delay) { $task->setLeaseExpires($delay); } if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); while (true) { try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { list($queued_class, $queued_data, $queued_priority) = $queued_task; $queued_options = array('priority' => $queued_priority); self::scheduleTask($queued_class, $queued_data, $queued_options); } break; } catch (PhabricatorWorkerYieldException $ex) { phlog( pht( 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); sleep($ex->getDuration()); } } // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } public function renderForDisplay(PhabricatorUser $viewer) { return null; } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } /** * Queue a task to be executed after this one succeeds. * * The followup task will be queued only if this task completes cleanly. * * @param string Task class to queue. * @param array Data for the followup task. * @param int|null Priority for the followup task. * @return this */ final protected function queueTask($class, array $data, $priority = null) { $this->queuedTasks[] = array($class, $data, $priority); return $this; } /** * Get tasks queued as followups by @{method:queueTask}. * * @return list> Queued task specifications. */ final public function getQueuedTasks() { return $this->queuedTasks; } + + /** + * Awaken tasks that have yielded. + * + * Reschedules the specified tasks if they are currently queued in a yielded, + * unleased, unretried state so they'll execute sooner. This can let the + * queue avoid unnecessary waits. + * + * This method does not provide any assurances about when these tasks will + * execute, or even guarantee that it will have any effect at all. + * + * @param list List of task IDs to try to awaken. + * @return void + */ + final public static function awakenTaskIDs(array $ids) { + if (!$ids) { + return; + } + + $table = new PhabricatorWorkerActiveTask(); + $conn_w = $table->establishConnection('w'); + + // NOTE: At least for now, we're keeping these tasks yielded, just + // pretending that they threw a shorter yield than they really did. + + // Overlap the windows here to handle minor client/server time differences + // and because it's likely correct to push these tasks to the head of their + // respective priorities. There is a good chance they are ready to execute. + $window = phutil_units('1 hour in seconds'); + $epoch_ago = (PhabricatorTime::getNow() - $window); + + queryfx( + $conn_w, + 'UPDATE %T SET leaseExpires = %d + WHERE id IN (%Ld) + AND leaseOwner = %s + AND leaseExpires > %d + AND failureCount = 0', + $table->getTableName(), + $epoch_ago, + $ids, + self::YIELD_OWNER, + $epoch_ago); + } + } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php index 0705a27780..99b0ee76ea 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -1,222 +1,225 @@ self::IDS_COUNTER, self::CONFIG_TIMESTAMPS => false, self::CONFIG_KEY_SCHEMA => array( 'dataID' => array( 'columns' => array('dataID'), 'unique' => true, ), 'taskClass' => array( 'columns' => array('taskClass'), ), 'leaseExpires' => array( 'columns' => array('leaseExpires'), ), 'leaseOwner' => array( 'columns' => array('leaseOwner(16)'), ), 'key_failuretime' => array( 'columns' => array('failureTime'), ), 'leaseOwner_2' => array( 'columns' => array('leaseOwner', 'priority', 'id'), ), ) + $parent[self::CONFIG_KEY_SCHEMA], ); $config[self::CONFIG_COLUMN_SCHEMA] = array( // T6203/NULLABILITY // This isn't nullable in the archive table, so at a minimum these // should be the same. 'dataID' => 'uint32?', ) + $parent[self::CONFIG_COLUMN_SCHEMA]; return $config + $parent; } public function setServerTime($server_time) { $this->serverTime = $server_time; $this->localTime = time(); return $this; } public function setLeaseDuration($lease_duration) { $this->checkLease(); $server_lease_expires = $this->serverTime + $lease_duration; $this->setLeaseExpires($server_lease_expires); // NOTE: This is primarily to allow unit tests to set negative lease // durations so they don't have to wait around for leases to expire. We // check that the lease is valid above. return $this->forceSaveWithoutLease(); } public function save() { $this->checkLease(); return $this->forceSaveWithoutLease(); } public function forceSaveWithoutLease() { $is_new = !$this->getID(); if ($is_new) { $this->failureCount = 0; } if ($is_new && ($this->getData() !== null)) { $data = new PhabricatorWorkerTaskData(); $data->setData($this->getData()); $data->save(); $this->setDataID($data->getID()); } return parent::save(); } protected function checkLease() { if ($this->leaseOwner) { $current_server_time = $this->serverTime + (time() - $this->localTime); if ($current_server_time >= $this->leaseExpires) { throw new Exception( pht( 'Trying to update Task %d (%s) after lease expiration!', $this->getID(), $this->getTaskClass())); } } } public function delete() { throw new Exception( pht( 'Active tasks can not be deleted directly. '. 'Use %s to move tasks to the archive.', 'archiveTask()')); } public function archiveTask($result, $duration) { if ($this->getID() === null) { throw new Exception( pht("Attempting to archive a task which hasn't been saved!")); } $this->checkLease(); $archive = id(new PhabricatorWorkerArchiveTask()) ->setID($this->getID()) ->setTaskClass($this->getTaskClass()) ->setLeaseOwner($this->getLeaseOwner()) ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) ->setPriority($this->getPriority()) ->setObjectPHID($this->getObjectPHID()) ->setResult($result) ->setDuration($duration); // NOTE: This deletes the active task (this object)! $archive->save(); return $archive; } public function executeTask() { // We do this outside of the try .. catch because we don't have permission // to release the lease otherwise. $this->checkLease(); $did_succeed = false; $worker = null; try { $worker = $this->getWorkerInstance(); + $worker->setCurrentWorkerTask($this); $maximum_failures = $worker->getMaximumRetryCount(); if ($maximum_failures !== null) { if ($this->getFailureCount() > $maximum_failures) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Task % has exceeded the maximum number of failures (%d).', $this->getID(), $maximum_failures)); } } $lease = $worker->getRequiredLeaseTime(); if ($lease !== null) { $this->setLeaseDuration($lease); } $t_start = microtime(true); $worker->executeTask(); $t_end = microtime(true); $duration = (int)(1000000 * ($t_end - $t_start)); $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $duration); $did_succeed = true; } catch (PhabricatorWorkerPermanentFailureException $ex) { $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_FAILURE, 0); $result->setExecutionException($ex); } catch (PhabricatorWorkerYieldException $ex) { $this->setExecutionException($ex); + $this->setLeaseOwner(PhabricatorWorker::YIELD_OWNER); + $retry = $ex->getDuration(); $retry = max($retry, 5); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } catch (Exception $ex) { $this->setExecutionException($ex); $this->setFailureCount($this->getFailureCount() + 1); $this->setFailureTime(time()); $retry = null; if ($worker) { $retry = $worker->getWaitBeforeRetry($this); } $retry = coalesce( $retry, PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry()); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } // NOTE: If this throws, we don't want it to cause the task to fail again, // so execute it out here and just let the exception escape. if ($did_succeed) { foreach ($worker->getQueuedTasks() as $task) { list($class, $data) = $task; PhabricatorWorker::scheduleTask( $class, $data, array( 'priority' => $this->getPriority(), )); } } return $result; } }