diff --git a/src/aphront/storage/connection/AphrontDatabaseConnection.php b/src/aphront/storage/connection/AphrontDatabaseConnection.php index 5252b2fd..e1a8bfd0 100644 --- a/src/aphront/storage/connection/AphrontDatabaseConnection.php +++ b/src/aphront/storage/connection/AphrontDatabaseConnection.php @@ -1,258 +1,268 @@ lastActiveEpoch = $epoch; + return $this; + } + + final public function getLastActiveEpoch() { + return $this->lastActiveEpoch; + } + public function queryData($pattern/* , $arg, $arg, ... */) { $args = func_get_args(); array_unshift($args, $this); return call_user_func_array('queryfx_all', $args); } public function query($pattern/* , $arg, $arg, ... */) { $args = func_get_args(); array_unshift($args, $this); return call_user_func_array('queryfx', $args); } public function supportsAsyncQueries() { return false; } public function supportsParallelQueries() { return false; } public function setReadOnly($read_only) { $this->readOnly = $read_only; return $this; } public function getReadOnly() { return $this->readOnly; } public function setQueryTimeout($query_timeout) { $this->queryTimeout = $query_timeout; return $this; } public function getQueryTimeout() { return $this->queryTimeout; } public function asyncQuery($raw_query) { throw new Exception(pht('Async queries are not supported.')); } public static function resolveAsyncQueries(array $conns, array $asyncs) { throw new Exception(pht('Async queries are not supported.')); } /* -( Global Locks )------------------------------------------------------- */ public function rememberLock($lock) { if (isset($this->locks[$lock])) { throw new Exception( pht( 'Trying to remember lock "%s", but this lock has already been '. 'remembered.', $lock)); } $this->locks[$lock] = true; return $this; } public function forgetLock($lock) { if (empty($this->locks[$lock])) { throw new Exception( pht( 'Trying to forget lock "%s", but this connection does not remember '. 'that lock.', $lock)); } unset($this->locks[$lock]); return $this; } public function forgetAllLocks() { $this->locks = array(); return $this; } public function isHoldingAnyLock() { return (bool)$this->locks; } /* -( Transaction Management )--------------------------------------------- */ /** * Begin a transaction, or set a savepoint if the connection is already * transactional. * * @return this * @task xaction */ public function openTransaction() { $state = $this->getTransactionState(); $point = $state->getSavepointName(); $depth = $state->getDepth(); $new_transaction = ($depth == 0); if ($new_transaction) { $this->query('START TRANSACTION'); } else { $this->query('SAVEPOINT '.$point); } $state->increaseDepth(); return $this; } /** * Commit a transaction, or stage a savepoint for commit once the entire * transaction completes if inside a transaction stack. * * @return this * @task xaction */ public function saveTransaction() { $state = $this->getTransactionState(); $depth = $state->decreaseDepth(); if ($depth == 0) { $this->query('COMMIT'); } return $this; } /** * Rollback a transaction, or unstage the last savepoint if inside a * transaction stack. * * @return this */ public function killTransaction() { $state = $this->getTransactionState(); $depth = $state->decreaseDepth(); if ($depth == 0) { $this->query('ROLLBACK'); } else { $this->query('ROLLBACK TO SAVEPOINT '.$state->getSavepointName()); } return $this; } /** * Returns true if the connection is transactional. * * @return bool True if the connection is currently transactional. * @task xaction */ public function isInsideTransaction() { $state = $this->getTransactionState(); return ($state->getDepth() > 0); } /** * Get the current @{class:AphrontDatabaseTransactionState} object, or create * one if none exists. * * @return AphrontDatabaseTransactionState Current transaction state. * @task xaction */ protected function getTransactionState() { if (!$this->transactionState) { $this->transactionState = new AphrontDatabaseTransactionState(); } return $this->transactionState; } /** * @task xaction */ public function beginReadLocking() { $this->getTransactionState()->beginReadLocking(); return $this; } /** * @task xaction */ public function endReadLocking() { $this->getTransactionState()->endReadLocking(); return $this; } /** * @task xaction */ public function isReadLocking() { return $this->getTransactionState()->isReadLocking(); } /** * @task xaction */ public function beginWriteLocking() { $this->getTransactionState()->beginWriteLocking(); return $this; } /** * @task xaction */ public function endWriteLocking() { $this->getTransactionState()->endWriteLocking(); return $this; } /** * @task xaction */ public function isWriteLocking() { return $this->getTransactionState()->isWriteLocking(); } } diff --git a/src/phage/agent/PhagePHPAgent.php b/src/phage/agent/PhagePHPAgent.php index 67552ec8..39a74e85 100644 --- a/src/phage/agent/PhagePHPAgent.php +++ b/src/phage/agent/PhagePHPAgent.php @@ -1,103 +1,104 @@ stdin = $stdin; } public function execute() { while (true) { if ($this->exec) { $iterator = new FutureIterator($this->exec); $iterator->setUpdateInterval(0.050); foreach ($iterator as $key => $future) { if ($future === null) { break; } $this->resolveFuture($key, $future); break; } } else { PhutilChannel::waitForAny(array($this->getMaster())); } $this->processInput(); } } private function getMaster() { if (!$this->master) { $raw_channel = new PhutilSocketChannel( $this->stdin, fopen('php://stdout', 'w')); $json_channel = new PhutilJSONProtocolChannel($raw_channel); $this->master = $json_channel; } return $this->master; } private function processInput() { $channel = $this->getMaster(); $open = $channel->update(); if (!$open) { throw new Exception(pht('Channel closed!')); } while (true) { $command = $channel->read(); if ($command === null) { break; } $this->processCommand($command); } } private function processCommand(array $spec) { switch ($spec['type']) { case 'EXEC': $key = $spec['key']; $cmd = $spec['command']; $future = new ExecFuture('%C', $cmd); $this->exec[$key] = $future; break; case 'EXIT': $this->terminateAgent(); break; } } private function resolveFuture($key, Future $future) { $result = $future->resolve(); $master = $this->getMaster(); $master->write( array( 'type' => 'RSLV', 'key' => $key, 'err' => $result[0], 'stdout' => $result[1], 'stderr' => $result[2], )); + unset($this->exec[$key]); } public function __destruct() { $this->terminateAgent(); } private function terminateAgent() { foreach ($this->exec as $key => $future) { $future->resolveKill(); } exit(0); } } diff --git a/src/xsprintf/queryfx.php b/src/xsprintf/queryfx.php index 52071118..e88c47e8 100644 --- a/src/xsprintf/queryfx.php +++ b/src/xsprintf/queryfx.php @@ -1,25 +1,27 @@ setLastActiveEpoch(time()); $conn->executeRawQuery($query); } function queryfx_all(AphrontDatabaseConnection $conn, $sql /* , ... */) { $argv = func_get_args(); call_user_func_array('queryfx', $argv); return $conn->selectAllResults(); } function queryfx_one(AphrontDatabaseConnection $conn, $sql /* , ... */) { $argv = func_get_args(); $ret = call_user_func_array('queryfx_all', $argv); if (count($ret) > 1) { throw new AphrontCountQueryException( pht('Query returned more than one row.')); } else if (count($ret)) { return reset($ret); } return null; }