if (! $connection instanceof ConnectionInterface) { $pool = $this->factory->getPool($name); $connection = $pool->get(); // 获取Hyperf\DbConnection\Connection类 try { // PDO is initialized as an anonymous function, so there is no IO exception, // but if other exceptions are thrown, the connection will not return to the connection pool properly. $connection = $connection->getConnection(); // 检查连接是否超时,超时则重连 Context::set($id, $connection); // 保存当前协程所用的连接 } finally { if (Coroutine::inCoroutine()) { defer(function()use($connection){ $connection->release(); }); } } }
if ($this->connection instanceof \Hyperf\Database\Connection) { // Reset event dispatcher after db reconnect. if ($this->container->has(EventDispatcherInterface::class)) { $dispatcher = $this->container->get(EventDispatcherInterface::class); $this->connection->setEventDispatcher($dispatcher); }
// Reset reconnector after db reconnect. // 重连器,是个重连数据库的闭包,如下所见,重连器会重新生成上面的连接闭包,并执行它来实现重连 $this->connection->setReconnector(function($connection){ $this->refresh($connection); }); }
/** * Parse the hosts configuration item into an array. * * @return array */ protectedfunctionparseHosts(array $config) { return Arr::wrap($config['host']); }
/** * Create a new Closure that resolves to a PDO instance where there is no configured host. * * @return \Closure */ protectedfunctioncreatePdoResolverWithoutHosts(array $config) { returnfunction()use($config){ return$this->createConnector($config)->connect($config); }; }
// For select statements, we'll simply execute the query and return an array // of the database result set. Each element in the array will be a single // row from the database table, and will either be an array or objects. $statement = $this->prepared($this->getPdoForSelect($useReadPdo) ->prepare($query));
/** * Get the current PDO connection used for reading. * * @return \PDO */ publicfunctiongetReadPdo() { if ($this->transactions > 0) { return$this->getPdo(); }
if ($this->recordsModified && $this->getConfig('sticky')) { return$this->getPdo(); }
if ($this->readPdo instanceof Closure) { return$this->readPdo = call_user_func($this->readPdo); }
// Here we will run this query. If an exception occurs we'll determine if it was // caused by a connection that has been lost. If that is the cause, we'll try // to re-establish connection and re-run the query with a fresh connection. try { $result = $this->runQueryCallback($query, $bindings, $callback); } catch (QueryException $e) { // 这里也会判断是不是连接失败引起的异常,是的话也会重连 $result = $this->handleQueryException( $e, $query, $bindings, $callback ); }
/** * Reconnect to the database if a PDO connection is missing. */ protectedfunctionreconnectIfMissingConnection() { if (is_null($this->pdo)) { $this->reconnect(); } }