加载容器

- bin/hyperf.php  
1
require BASE_PATH . '/config/container.php'
- vendor/hyperf/di/src/Definition/DefinitionSourceFactory.php:46
1
ProviderConfig::load()

寻找服务提供者配置

- vendor/hyperf/config/src/ProviderConfig.php:37  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
Composer::getMergedExtra('hyperf')['config'] ?? []
```
找的时候就去composer.lock读几个特定的键(恭喜composer.lock有了额外功能)
``` php
if (! $path) {
throw new \RuntimeException('composer.lock not found.');
}
self::$content = collect(json_decode(file_get_contents($path), true));
$packages = self::$content->offsetGet('packages') ?? [];
$packagesDev = self::$content->offsetGet('packages-dev') ?? [];
foreach (array_merge($packages, $packagesDev) as $package) {
$packageName = '';
foreach ($package ?? [] as $key => $value) {
if ($key === 'name') {
$packageName = $value;
continue;
}
switch ($key) {
case 'extra':
$packageName && self::$extra[$packageName] = $value;
break;
case 'scripts':
$packageName && self::$scripts[$packageName] = $value;
break;
case 'version':
$packageName && self::$versions[$packageName] = $value;
break;
}
}
}
```
composer.lock 配合一下
``` json
{
"name": "hyperf/db-connection",
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
},
"hyperf": {
"config": "Hyperf\\DbConnection\\ConfigProvider"
}
},

},
结果
1
2
3
4
5
6
7
8
9
[
0 => "Hyperf\AsyncQueue\ConfigProvider"
1 => "Hyperf\Cache\ConfigProvider"
2 => "Hyperf\Config\ConfigProvider"
3 => "Hyperf\Constants\ConfigProvider"
4 => "Hyperf\Crontab\ConfigProvider"
5 => "Hyperf\DbConnection\ConfigProvider"
// 省略...
]
DB相关配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use Hyperf\Database\ConnectionResolverInterface;
use Hyperf\Database\Connectors\ConnectionFactory;
use Hyperf\Database\Connectors\MySqlConnector;
use Hyperf\Database\Migrations\MigrationRepositoryInterface;
use Hyperf\DbConnection\Listener\RegisterConnectionResolverListener;
use Hyperf\DbConnection\Pool\PoolFactory;

[
'dependencies' => [
PoolFactory::class => PoolFactory::class,
ConnectionFactory::class => ConnectionFactory::class,
ConnectionResolverInterface::class => ConnectionResolver::class,
'db.connector.mysql' => MySqlConnector::class,
MigrationRepositoryInterface::class => DatabaseMigrationRepositoryFactory::class,
],
'listeners' => [
RegisterConnectionResolverListener::class,
],
// ...
];

应用启动时初始化数据库连接解析器 Register::setConnectionResolver

vendor/hyperf/db-connection/src/Listener/RegisterConnectionResolverListener.php:32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public function listen(): array
{
return [
BootApplication::class,
];
}

public function process(object $event)
{
if ($this->container->has(ConnectionResolverInterface::class)) {
Register::setConnectionResolver(
$this->container->get(ConnectionResolverInterface::class)
);
}
}

在Model上调用Query时,通过一连串调用,初始化一堆东西,包括数据库连接

vendor/hyperf/database/src/Model/Model.php:939  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static function query()
{
return (new static())->newQuery();
}

public function newQuery()
{
return $this->registerGlobalScopes($this->newQueryWithoutScopes());
}

public function newQueryWithoutScopes()
{
return $this->newModelQuery()->with($this->with)->withCount($this->withCount);
}

public function newModelQuery()
{
return $this->newModelBuilder($this->newBaseQueryBuilder())->setModel($this);
}

protected function newBaseQueryBuilder()
{
$connection = $this->getConnection();

return new QueryBuilder($connection, $connection->getQueryGrammar(), $connection->getPostProcessor());
}

// 这里初始化数据库连接
public function getConnection(): ConnectionInterface
{
return Register::resolveConnection($this->getConnectionName());
}
初始化的具体步骤,由上面第3步的解析器包办 vendor/hyperf/database/src/Model/Register.php:38
1
2
3
4
public static function resolveConnection($connection = null)
{
return static::$resolver->connection($connection);
}
上面说的是 `Hyperf\Database\Model\Model` 的实现,而 `Hyperf\DbConnection\Model\Model` 继承了它,并且覆盖 `getConnection` 方法,变成了 vendor/hyperf/db-connection/src/Model/Model.php:32
1
2
3
4
5
6
public function getConnection(): ConnectionInterface
{
$connectionName = $this->getConnectionName();
$resolver = $this->getContainer()->get(ConnectionResolverInterface::class);
return $resolver->connection($connectionName);
}
虽然做的事跟基类是一样的,不过变成了自己一把梭,第3步的解析器被无视了

实际获取一个数据库连接

vendor/hyperf/db-connection/src/ConnectionResolver.php:52
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public function connection($name = null)
{
if (is_null($name)) {
$name = $this->getDefaultConnection();
}

$connection = null;
$id = $this->getContextKey($name);
if (Context::has($id)) {
$connection = Context::get($id);
}

if (! $connection instanceof ConnectionInterface) {
$pool = $this->factory->getPool($name);
$connection = $pool->get(); // 获取Hyperf\DbConnection\Connection类
try {
// PDO is initialized as an anonymous function, so there is no IO exception,
// but if other exceptions are thrown, the connection will not return to the connection pool properly.
$connection = $connection->getConnection(); // 检查连接是否超时,超时则重连
Context::set($id, $connection); // 保存当前协程所用的连接
} finally {
if (Coroutine::inCoroutine()) {
defer(function () use ($connection) {
$connection->release();
});
}
}
}

return $connection;
}
vendor/hyperf/pool/src/Pool.php:58
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public function get(): ConnectionInterface
{
$connection = $this->getConnection();

if ($this->frequency instanceof FrequencyInterface) {
$this->frequency->hit(); // 记录一次从池子中获取连接
}

if ($this->frequency instanceof LowFrequencyInterface) {
if ($this->frequency->isLowFrequency()) {
$this->flush(); // 获取连接频率不高时,释放一些连接
}
}

return $connection;
}

private function getConnection(): ConnectionInterface
{
// 获取池子中连接
$num = $this->getConnectionsInChannel();

try {
// 没有空闲连接且连接数没达到上限时,创建连接
if ($num === 0 && $this->currentConnections < $this->option->getMaxConnections()) {
++$this->currentConnections;
return $this->createConnection();
}
} catch (Throwable $throwable) {
--$this->currentConnections;
throw $throwable;
}

// 从存储中pop出一个连接
$connection = $this->channel->pop($this->option->getWaitTimeout());
return $connection;
}

protected function createConnection(): ConnectionInterface
{
return new Connection($this->container, $this, $this->config);
}
vendor/hyperf/db-connection/src/Connection.php:84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public function getActiveConnection(): DbConnectionInterface
{
if ($this->check()) {
return $this;
}

if (! $this->reconnect()) {
throw new ConnectionException('Connection reconnect failed.');
}

return $this;
}

// 连接上次使用时间,跟配置的超时时间对比,作为超时判断
public function check(): bool
{
$maxIdleTime = $this->pool->getOption()->getMaxIdleTime();
$now = microtime(true);
if ($now > $maxIdleTime + $this->lastUseTime) {
return false;
}

$this->lastUseTime = $now;
return true;
}

public function reconnect(): bool
{
// 这个connection包含了连到数据库的闭包,在真实操作数据库时执行
$this->connection = $this->factory->make($this->config);

if ($this->connection instanceof \Hyperf\Database\Connection) {
// Reset event dispatcher after db reconnect.
if ($this->container->has(EventDispatcherInterface::class)) {
$dispatcher = $this->container->get(EventDispatcherInterface::class);
$this->connection->setEventDispatcher($dispatcher);
}

// Reset reconnector after db reconnect.
// 重连器,是个重连数据库的闭包,如下所见,重连器会重新生成上面的连接闭包,并执行它来实现重连
$this->connection->setReconnector(function ($connection) {
$this->refresh($connection);
});
}

$this->lastUseTime = microtime(true);
return true;
}

protected function refresh(\Hyperf\Database\Connection $connection)
{
$refresh = $this->factory->make($this->config);
if ($refresh instanceof \Hyperf\Database\Connection) {
$connection->disconnect();
$connection->setPdo($refresh->getPdo());
$connection->setReadPdo($refresh->getReadPdo());
}

$this->logger->warning('Database connection refreshed.');
}
vendor/hyperf/database/src/Connectors/ConnectionFactory.php:45
1
2
3
4
5
6
7
8
9
10
public function make(array $config, $name = null)
{
$config = $this->parseConfig($config, $name);

if (isset($config['read'])) {
return $this->createReadWriteConnection($config);
}

return $this->createSingleConnection($config);
}
连接数据库的闭包长这样 vendor/hyperf/database/src/Connectors/ConnectionFactory.php:196
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected function createPdoResolverWithHosts(array $config)
{
return function () use ($config) {
foreach (Arr::shuffle($hosts = $this->parseHosts($config)) as $key => $host) {
$config['host'] = $host;

try {
return $this->createConnector($config)->connect($config);
} catch (PDOException $e) {
continue;
}
}

throw $e;
};
}

/**
* Parse the hosts configuration item into an array.
*
* @return array
*/
protected function parseHosts(array $config)
{
return Arr::wrap($config['host']);
}

/**
* Create a new Closure that resolves to a PDO instance where there is no configured host.
*
* @return \Closure
*/
protected function createPdoResolverWithoutHosts(array $config)
{
return function () use ($config) {
return $this->createConnector($config)->connect($config);
};
}
闭包里的`Connector`是这样连数据库的 vendor/hyperf/database/src/Connectors/Connector.php:105
1
2
3
4
5
6
7
8
protected function createPdoConnection($dsn, $username, $password, $options)
{
if (class_exists(PDOConnection::class) && ! $this->isPersistentConnection($options)) {
return new PDOConnection($dsn, $username, $password, $options);
}

return new PDO($dsn, $username, $password, $options);
}
真实执行数据库操作,比如select时,会通过pdo属性的值获取连接,此时如果pdo还是个闭包,就会执行闭包,建立到数据库的网络连接,并且更新把pdo的值从闭包更新成 `PDO` 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public function select(string $query, array $bindings = [], bool $useReadPdo = true): array
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}

// For select statements, we'll simply execute the query and return an array
// of the database result set. Each element in the array will be a single
// row from the database table, and will either be an array or objects.
$statement = $this->prepared($this->getPdoForSelect($useReadPdo)
->prepare($query));

$this->bindValues($statement, $this->prepareBindings($bindings));

$statement->execute();

return $statement->fetchAll();
});
}

public function getPdo()
{
if ($this->pdo instanceof Closure) {
return $this->pdo = call_user_func($this->pdo);
}

return $this->pdo;
}

/**
* Get the current PDO connection used for reading.
*
* @return \PDO
*/
public function getReadPdo()
{
if ($this->transactions > 0) {
return $this->getPdo();
}

if ($this->recordsModified && $this->getConfig('sticky')) {
return $this->getPdo();
}

if ($this->readPdo instanceof Closure) {
return $this->readPdo = call_user_func($this->readPdo);
}

return $this->readPdo ?: $this->getPdo();
}

protected function run($query, $bindings, Closure $callback)
{
// 这里会判断PDO是否是空的
$this->reconnectIfMissingConnection();

// Here we will run this query. If an exception occurs we'll determine if it was
// caused by a connection that has been lost. If that is the cause, we'll try
// to re-establish connection and re-run the query with a fresh connection.
try {
$result = $this->runQueryCallback($query, $bindings, $callback);
} catch (QueryException $e) {
// 这里也会判断是不是连接失败引起的异常,是的话也会重连
$result = $this->handleQueryException(
$e,
$query,
$bindings,
$callback
);
}

return $result;
}

protected function handleQueryException($e, $query, $bindings, Closure $callback)
{
// 省略...
return $this->tryAgainIfCausedByLostConnection(
$e,
$query,
$bindings,
$callback
);
}

/**
* Handle a query exception that occurred during query execution.
*
* @param string $query
* @param array $bindings
* @throws QueryException
*/
protected function tryAgainIfCausedByLostConnection(QueryException $e, $query, $bindings, Closure $callback)
{
if ($this->causedByLostConnection($e->getPrevious())) {
$this->reconnect();

return $this->runQueryCallback($query, $bindings, $callback);
}

throw $e;
}

/**
* Reconnect to the database if a PDO connection is missing.
*/
protected function reconnectIfMissingConnection()
{
if (is_null($this->pdo)) {
$this->reconnect();
}
}