Django 数据库连接池

Page content

数据库长连接

长连接是指程序之间的连接在建立之后,就一直打开,被后续程序重用。使用长连接的初衷是减少连接的开销。
先看看官方文档是怎么讲Django长连接的。 翻译得可能不太得体,原文参见Django databases

Django长连接

长连接(Persistent connections)是为了避免在每个请求中都重新建立数据库连接的开销。 在Django中,数据库连接由CONN_MAX_AGE控制,这个参数定义了每个连接的最长寿命。可以为每个DB单独设置CONN_MAX_AGE

CONN_MAX_AGE的默认值是0,在每个请求结束时,将关闭数据库连接。 要启用长连接,请将CONN_MAX_AGE设置为正数秒。对于不限时的长连接,请将其设置为None

连接管理

Django在首次进行数据库查询时会建立与数据库的连接。它保持此连接打开,并在后续请求中重用它。 连接一旦超过CONN_MAX_AGE定义的最大寿命或不再可用,Django就会关闭连接。

具体来讲,没有连接分两种情况:1、这是第一个连接,2、或者先前的连接已关闭。 如果Django没有连接,它会在需要时自动建立与数据库的连接。

在每个请求开始时,Django会关闭那些达到其最大寿命的连接。 如果你的数据库在一段时间后会关闭空闲连接,则应将CONN_MAX_AGE设置为较小的值,以便Django不会试图使用已由DB终止的连接。(这个问题可能只会影响流量非常低的网站。)

在每个请求结束时,Django会在连接达到最大寿命或者处于不可恢复的错误状态时关闭连接。如果在处理请求时发生任何数据库错误,Django会检查连接是否仍然有效,如果没有则关闭它。 因此,数据库错误最多只影响一个请求;如果DB连接变得不可用,则下一个请求将获取新连接。

注意

由于每个线程都维护自己的连接,因此你的数据库必须至少支持与工作线程一样多的并发连接。

有时,大多数views都不会访问数据库,例如,因为它是外部系统的数据库,或者归功于缓存。 在这种情况下,应该将CONN_MAX_AGE设置为较小的值甚至0,因为维护不太可能重用的连接没有意义。这将有助于将与数据库的并发连接数量保持在较小的值。

开发模式的Server为它处理的每个请求创建一个新线程,无视长连接的作用。在开发过程中不需要启用长连接。

当Django建立与数据库的连接时,它会根据所使用的后端设置恰当的参数。 如果启用长连接,则不会再对每个请求重复设置。 如果修改连接的隔离级别或时区等参数,则应在每个请求结束时恢复Django的默认值,在每个请求开始时强制使用适当的值,或者禁用长连接。

数据库连接池

为啥Django不支持连接池

我们看看Google Group里各路大神的讨论吧。

  • 第三方工具已经提供了,更专注做得更好。Django并不需要做全栈。
  • 用从pool里取连接代替新建连接,向pool归还连接代替关闭连接,然后在worker在整个请求期间都持有连接并不是真正的连接池。 这需要跟worker数一样多的数据库连接,除了能在各个worker循环使用外,基本跟长连接是等效的。 长连接也有自己的优点,消除了新建连接的开销,避免的池化的复杂性,适用于不需要手动管理事务的中小型站点。
  • 首先要操心的不是数据库,AWS之类的云计算已经很牛了,按需扩容,多关注下缓存吧。
  • MySQL的连接非常轻量和高效,大量的Web应用都没有使用连接池。
  • ……

连接池的优点

数据库连接池(Connection pooling)的核心思想是连接复用,通过建立一个数据库连接池以及一套连接使用、分配和管理策略, 使得该连接池中的连接可以得到高效、安全的复用。主要有以下优势:

  1. 减少资源开销:减少连接的创建,避免了数据库连接初始化和释放过程的时间和资源开销,加快系统的响应速度
  2. 统一连接管理:预先设定超时时间、连接数量,避免数据库连接操作中可能出现的资源泄露,增强系统的稳定性

其实,我们在连接池里建立的连接生存周期也可以比较长,这样能充分利用长连接的优点减少连接的开销; 同时,连接池又可以帮助我们更方便管理DB连接,减少后端服务到MySQL的连接数。

为何要在Django中用连接池

说了这么多,有两点是确定的:

  1. Django原生支持长连接,但不支持连接池
  2. 连接池还是有很多优点的,也有很多成熟的三方库支持

Django服务,一般情况下每个线程都维护自己的连接,有多少线程就会就有多少连接;如果采用分布式部署,线程数较多,则会建立较多的连接。不仅非常消耗资源,还可能出现MySQL连接数不够用的情况。

在使用Gunicorn作为Django的前置服务时,对于数万的异步请求,Gunicorn有自己的worker pool,每个异步worker进程会创建若干协程来处理请求。协程比线程和进程更加轻量,Gunicorn因此能很好的处理大量并行的请求。(Gunicorn如果配置同步的worker,每一个进程只需要一个DB连接,这样总的连接数会变少,但是并发能力就上不去了。)

但是,对于每一个协程转发的请求,Django ORM都需要创建并维护一个数据库连接。成千上万个协程,意味着成千上万个连接,MySQL数据库的连接很快就耗尽。 这样就会出现大量创建数据库连接失败,从而导致请求失败。在客户端与服务端之间的网络缓慢时,情况会更加严重。

举个例子:2个服务 * 100个容器 * 10个进程 * 20个协程 = 40000个连接(Mysql5.5,Mysql5.6,Mysql5.7:默认的最大连接数都是151,理论上限为:100000;实际干到2W就很不容易了)。

假设我们能让20个线程,复用一个size=10的连接池,这样就能减少一半的数据库连接。这对支持服务水平扩展,降低数据库负载是非常有帮助的。 那么我们接下来看看,如何在Django中引入DB连接池。

Django连接池方案

SQLAlchemy Patch

SQLAlchemy有一个成熟的连接池实现,支持Django使用连接池,首先考虑的就是SQLAlchemy

Github上的轮子:

从源码看,patch主要做了三件事情:

  1. 创建和返回新建的SQLAlchemy pool
  2. connection pool取connect
  3. hack掉Django自己的connect方法

所以,patch主要在是connect()方法上做了文章,实际使用的依然是Django的ORM,而不是SQLAlchemy的ORM。 知道了原理,那我们可以对照着自己造个轮子,动手撸一个简化版本的连接池patch吧。

自己造轮子

新建一个db_pool_patch.py

# -*- coding: utf-8 -*-

from django.conf import settings
from sqlalchemy.pool import manage

POOL_PESSIMISTIC_MODE = getattr(settings, "DJ_ORM_POOL_PESSIMISTIC", False)
POOL_SETTINGS = getattr(settings, 'DJ_ORM_POOL_OPTIONS', {})
POOL_SETTINGS.setdefault("recycle", 3600)


def is_iterable(value):
    """Check if value is iterable."""
    try:
        _ = iter(value)
        return True
    except TypeError:
        return False


class HashableDict(dict):
    def __hash__(self):
        items = [(n, tuple(v)) for n, v in self.items() if is_iterable(v)]
        return hash(tuple(items))


class ManagerProxy(object):
    def __init__(self, manager):
        self.manager = manager

    def __getattr__(self, key):
        return getattr(self.manager, key)

    def connect(self, *args, **kwargs):
        if 'conv' in kwargs:
            kwargs['conv'] = HashableDict(kwargs['conv'])

        if 'ssl' in kwargs:
            kwargs['ssl'] = HashableDict(kwargs['ssl'])

        return self.manager.connect(*args, **kwargs)


def patch_mysql():
    from django.db.backends.mysql import base as mysql_base

    if not hasattr(mysql_base, "_Database"):
        mysql_base._Database = mysql_base.Database
        manager = manage(mysql_base._Database, **POOL_SETTINGS)
        mysql_base.Database = ManagerProxy(manager)


def patch_sqlite3():
    from django.db.backends.sqlite3 import base as sqlite3_base

    if not hasattr(sqlite3_base, "_Database"):
        sqlite3_base._Database = sqlite3_base.Database
        sqlite3_base.Database = manage(sqlite3_base._Database, **POOL_SETTINGS)


def install_patch():
    patch_mysql()
    patch_sqlite3()

这里,我们用不到60行代码,就实现了对mysqlsqlite3的patch。

为了方便跟踪connection pool的工作情况,还可以建一个db_pool_listen.py添加一些监听函数并打印log。

# -*- coding: utf-8 -*-

from django.conf import settings
from sqlalchemy import event, exc
from sqlalchemy.pool import Pool

from log import logger


@event.listens_for(Pool, "checkout")
def _on_checkout(dbapi_connection, connection_record, connection_proxy):
    logger.debug("connection retrieved from pool")

    if settings.POOL_PESSIMISTIC_MODE:
        cursor = dbapi_connection.cursor()
        try:
            cursor.execute("SELECT 1")
        except:
            # raise DisconnectionError - pool will try
            # connecting again up to three times before raising.
            raise exc.DisconnectionError()
        finally:
            cursor.close()


@event.listens_for(Pool, "checkin")
def _on_checkin(*args, **kwargs):
    logger.debug("connection returned to pool")


@event.listens_for(Pool, "connect")
def _on_connect(*args, **kwargs):
    logger.debug("connection created")

settings.py中配置一下pool的参数:

DJ_ORM_POOL_OPTIONS = {
    "pool_size": 20,
    "max_overflow": 0,
    "recycle": 3600,  # the default value
}
POOL_PESSIMISTIC_MODE = True

在Django项目的wsgi.py中安装我们新建的patch:

import db_pool_patch
db_pool_patch.install_patch()

if settings.DEBUG:
    import db_pool_listen

application = get_wsgi_application()

测试结果及改进

然后访问接口http://127.0.0.1:8000/book/get?book_id=1,就可以观察到下面的log啦。

2019-08-29 22:17:10,140 db_pool_patch - db_pool_listen.py - DEBUG - connection retrieved from pool
(0.000) SELECT "book_tab"."book_id", "book_tab"."title", "book_tab"."author" FROM "book_tab" WHERE "book_tab"."book_id" = 1; args=(1,)
2019-08-29 22:17:10,141 db_pool_patch - views.py - INFO - some shit code begin to run ...
2019-08-29 22:17:15,144 db_pool_patch - views.py - INFO - hmm, shit code finished, it took 5 seconds
[29/Aug/2019 22:17:15] "GET /book/get?book_id=1 HTTP/1.1" 200 62
2019-08-29 22:17:15,146 db_pool_patch - db_pool_listen.py - DEBUG - connection returned to pool

nice, 我们的连接池已经可以正常工作了。但是从log里可以看出,我们的patch有个问题,在整个请求处理期间一直持有连接,容易导致pool里的连接不够用。

def test_view(request):
    obj = TestModel.objects.get(id=1)
    result = do_something_that_takes_1000_seconds(obj)  # 这时DB连接依然会被占用1000s,但实际已经不需要了
    return result

在test_view()中有三个step:

  • Step1:访问db取到数据
  • Step2:处理业务
  • Step3:返回结果

实际上Step1中取完数据后,就不需要继续占用DB连接了。在访问完数据库后,可以马上归还连接,而不是必须在请求完成时再归还。

如前文提到的那样:

用从pool里取连接代替新建连接,向pool归还连接代替关闭连接,然后在worker在整个请求期间都持有连接并算不上真正的连接池。 这需要跟worker数一样多的数据库连接,除了能在各个worker循环使用外,基本跟长连接是等效的。

那么,我们怎么解决这个问题呢?如何实现:在访问完数据库后,马上归还连接,而不是在请求完成时再归还。 因为我们使用的依然是Django ORM,在请求期间持有连接本质上是Django ORM的行为。 所以可以patch一下Django ORM,执行完数据库操作后,及时释放连接。

from django.db.models.sql import compiler
from django.db.models.sql.constants import MULTI


def install_django_orm_patch():
    execute_sql = compiler.SQLCompiler.execute_sql

    # Django 1.11
    def patched_execute_sql(self, result_type=MULTI, chunked_fetch=False):
        result = execute_sql(self, result_type, chunked_fetch)
        if not self.connection.in_atomic_block:
            self.connection.close()  # return connection to pool by db_pool_patch
        return result

    compiler.SQLCompiler.execute_sql = patched_execute_sql

    insert_execute_sql = compiler.SQLInsertCompiler.execute_sql

    def patched_insert_execute_sql(self, return_id=False):
        result = insert_execute_sql(self, return_id)
        if not self.connection.in_atomic_block:
            self.connection.close()  # return connection to pool by db_pool_patch
        return result

    compiler.SQLInsertCompiler.execute_sql = patched_insert_execute_sql

再次访问接口,打印的log如下:

2019-08-29 22:13:22,373 db_pool_patch - db_pool_listen.py - DEBUG - connection retrieved from pool
(0.000) SELECT "book_tab"."book_id", "book_tab"."title", "book_tab"."author" FROM "book_tab" WHERE "book_tab"."book_id" = 1; args=(1,)
2019-08-29 22:13:22,374 db_pool_patch - db_pool_listen.py - DEBUG - connection returned to pool
2019-08-29 22:13:22,374 db_pool_patch - views.py - INFO - some shit code begin to run ...
2019-08-29 22:13:27,378 db_pool_patch - views.py - INFO - hmm, shit code finished, it took 5 seconds
[29/Aug/2019 22:13:27] "GET /book/get?book_id=1 HTTP/1.1" 200 62

不错,可以看出在添加patch之后,view层在处理请求时,DB连接在shit code开始运行前,就已经归还给连接池了;这样就降低了单个连接的持有时间,能显著提高连接的使用率和pool的性能。

连接池溢出的问题

问题:每次执行完SQL后会尝试将连接释放,但是如果execute_sql执行失败,不会立即释放连接,等到垃圾回收时才将连接放回了连接池。最终可能导致连接池溢出。

  • 解决方法:在已有的补丁函数中稍作调整:捕获execute_sql/insert_execute_sql异常,释放connection。
from django.db.models.sql import compiler
from django.db.models.sql.constants import MULTI


def install_django_orm_patch():
    execute_sql = compiler.SQLCompiler.execute_sql

    # Django 1.11
    def patched_execute_sql(self, result_type=MULTI, chunked_fetch=False):
        try:
			result = execute_sql(self, return_id)
		except Exception as e:
			log.error('[execute_sql]execute_sql_error| exception = %s', e)
			self.connection.close()
			raise
        
        if not self.connection.in_atomic_block:
            self.connection.close()  # return connection to pool by db_pool_patch
        return result

    compiler.SQLCompiler.execute_sql = patched_execute_sql

    insert_execute_sql = compiler.SQLInsertCompiler.execute_sql

    def patched_insert_execute_sql(self, return_id=False):
        try:
			result = insert_execute_sql(self, return_id)
		except Exception as e:
			log.error('[insert_execute_sql]insert_execute_sql_error| exception = %s', e)
			self.connection.close()
			raise
        
        if not self.connection.in_atomic_block:
            self.connection.close()  # return connection to pool by db_pool_patch
        return result

    compiler.SQLInsertCompiler.execute_sql = patched_insert_execute_sql

小结

本文主要介绍了Django长连接,比较了长连接和连接池,实现了基于SQLAlchemy Patch的Django数据库连接池方案,并做了一定的改进。