Django ORM

Django ORM

Basic information

  • Django version: 2.1

Introduction

下述是一個例子,查詢在User table中所有User的資訊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from django.contrib.auth import get_user_model
from django.http import HttpResponse
from django.views.generic import View
User = get_user_model()
class MyView(View):
def get(self, request, *args, **kwargs):
users = User.objects.all()
for user in users:
print("id: {}, username: {}".format(user.id, user.username))
return HttpResponse(status=200)

Model Manager

User變量是settings.AUTH_USER_MODEL中指定的Model,預設為AbstractUser @django/contrib/auth/models.py。
其父類AbstractBaseUser @django/contrib/auth/base_user.py 為models.Model;換言之,AbstractUser本質上是models.Model。
在AbstractUser中明確指定objects為UserManager class。

@django/contrib/auth/models.py

1
2
3
4
5
6
class AbstractUser(AbstractBaseUser, PermissionsMixin):
objects = UserManager()
# ...
class UserManager(BaseUserManager):
# ...

@django/contrib/auth/base_user.py

1
2
class AbstractBaseUser(models.Model):
# ...

但若在其他自定義的Model子類中,不指定objects,那麼在使用Model時,objects會是什麼呢?
查看models.Model @django/db/models/base.py
可以得知若沒有指定objects,會在ModelBase的_prepare()方法中使用Manager @django/db/models/manager.py作為預設的object,及為manager變量
同時Manager class也為上述UserManager的父類

@django/db/models/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ModelBase(type):
# ...
def _prepare(cls):
# ...
if not opts.managers:
if any(f.name == 'objects' for f in opts.fields):
raise ValueError(
"Model %s must specify a custom Manager, because it has a "
"field named 'objects'." % cls.__name__
)
manager = Manager()
manager.auto_created = True
cls.add_to_class('objects', manager)
class Model(metaclass=ModelBase):
# ...
@classmethod
def from_db(cls, db, field_names, values):
if len(values) != len(cls._meta.concrete_fields):
values_iter = iter(values)
values = [
next(values_iter) if f.attname in field_names else DEFERRED
for f in cls._meta.concrete_fields
]
new = cls(*values)
new._state.adding = False
new._state.db = db
return new

@django/db/models/manager.py

1
2
class Manager(BaseManager.from_queryset(QuerySet)):
pass

透過上述的代碼追蹤,可以得知:

  • Model都有objects變量,且其一定是Manager的子類

Manager Object Initialization Flow

Manager會繼承一個動態生成的class,而BaseManager.from_queryset(QuerySet)就是返回動態生成class的方法。
在from_queryset中使用Python type方法,初始化名稱為class_name變量值的Class object,以BaseManager為父類,並透過_get_queryset_methods(…)方法將QuerySet添加到自身中

@django/db/models/manager.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class BaseManager:
@classmethod
def _get_queryset_methods(cls, queryset_class):
def create_method(name, method):
def manager_method(self, *args, **kwargs):
return getattr(self.get_queryset(), name)(*args, **kwargs)
manager_method.__name__ = method.__name__
manager_method.__doc__ = method.__doc__
return manager_method
new_methods = {}
for name, method in inspect.getmembers(queryset_class, predicate=inspect.isfunction):
# Only copy missing methods.
if hasattr(cls, name):
continue
# Only copy public methods or methods with the attribute `queryset_only=False`.
queryset_only = getattr(method, 'queryset_only', None)
if queryset_only or (queryset_only is None and name.startswith('_')):
continue
# Copy the method onto the manager.
new_methods[name] = create_method(name, method)
return new_methods
@classmethod
def from_queryset(cls, queryset_class, class_name=None):
if class_name is None:
class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
return type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})

透過上述的代碼追蹤,可以得知:

  • 創建Manager時,使用Meta class技巧,將QuerySet的方法添加到Manager中,方便使用者直接透過Manager使用QuerySet所提供的方法。

Model Class Diagram

QuerySet

上述代碼追蹤到了objects,接著下面說明.all(…)

操作Manager的方法,其實就是操作QuerySet的方法,下面使用常見的all()方法追蹤QuerySet的流程。
調用all()方法時,QuerySet會連續調用內部方法到_clone(),_clone()頭部註釋已經說明,此方法會回傳當前QuerySet的副本。
故執行all()方法時,其實是拿到當前QuerySet的副本,沒有與真實的Database做任何交互。

@django/db/models/query.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class QuerySet:
# ...
def __init__(self, model=None, query=None, using=None, hints=None):
# ...
self.model = model
self._db = using
self.query = query or sql.Query(self.model)
self._iterable_class = ModelIterable
def all(self):
"""
Return a new QuerySet that is a copy of the current one. This allows a
QuerySet to proxy for a model manager in some cases.
"""
return self._chain()
def _chain(self, **kwargs):
"""
Return a copy of the current QuerySet that's ready for another
operation.
"""
obj = self._clone()
# ...
return obj
def _clone(self):
"""
Return a copy of the current QuerySet. A lightweight alternative
to deepcopy().
"""
c = self.__class__(model=self.model, query=self.query.chain(), using=self._db, hints=self._hints)
c._sticky_filter = self._sticky_filter
c._for_write = self._for_write
c._prefetch_related_lookups = self._prefetch_related_lookups[:]
c._known_related_objects = self._known_related_objects
c._iterable_class = self._iterable_class
c._fields = self._fields
return c
def __iter__(self):
"""
The queryset iterator protocol uses three nested iterators in the
default case:
1. sql.compiler.execute_sql()
- Returns 100 rows at time (constants.GET_ITERATOR_CHUNK_SIZE)
using cursor.fetchmany(). This part is responsible for
doing some column masking, and returning the rows in chunks.
2. sql.compiler.results_iter()
- Returns one row at time. At this point the rows are still just
tuples. In some cases the return values are converted to
Python values at this location.
3. self.iterator()
- Responsible for turning the rows into model objects.
"""
self._fetch_all()
return iter(self._result_cache)
def _fetch_all(self):
if self._result_cache is None:
self._result_cache = list(self._iterable_class(self))
if self._prefetch_related_lookups and not self._prefetch_done:
self._prefetch_related_objects()

在外部使用者透過for loop對all()方法的回傳值進行操作時,會調用QuerySet的__iter__(),QuerySet就會與Database進行交互。
__iter__()方法會調用內部方法_fetch_all(),_fetch_all()透過self._iterable_class(self)進行Database查詢。
self._iterable_class在QuerySet初始化時指定為ModelIterable。
接著使用list(…)會調用ModelIterable的__iter__()。

Query Class Diagram

ModelIterable Class

下述以ModelIterable為主軸進行追蹤

  1. compiler = queryset.query.get_compiler(using=db)
    • 使用queryset.query.get_compiler(using=db)設定compiler
      • queryset.query在QuerySet初始化時已經指定為sql.Query(self.model),其為Query class @@django/db/models/sql/query.py
      • 故queryset.query.get_compiler(…)方法就是Query的get_compiler(…)
      • 在get_compiler(…)內部使用connections[using]設定connection變量
        • connections @django/db/__init__.py,真實為ConnectionHandler @django/db/utils.py
        • connections[using]會調用ConnectionHandler的__getitem__()
          • databases變量值為Django settings的DATABASES值,為Dictionary類型,alias變量值為’default’
          • backend變量則為’ENGINE’指定的Class object,例如’django.db.backends.mysql’
          • conn變量值則為backend對應的DatabaseWrapper instance,例如DatabaseWrapper @django/db/backends/mysql/base.py
      • 逐層返回到Query的get_compiler(…)方法,執行connection.ops.compiler(…)(…)
        • connection即是DatabaseWrapper
        • connection.ops則是在DatabaseWrapper的父類BaseDatabaseWrapper @django/db/backends/base/base.py設值,來源為DatabaseOperations @django/db/backends/mysql/operations.py
        • connection.ops.compiler(…)為DatabaseOperations父類BaseDatabaseOperations @django/db/backends/base/operations.py的方法,其方法會回傳子類宣告compiler_module中的compiler_name class,此時compiler_module的值為”django.db.backends.mysql.compiler”,compiler_name的值為’SQLCompiler’,故回傳值為SQLCompiler @django/db/backends/mysql/compiler.py
      • connection.ops.compiler(…)(…)就是SQLCompiler(…),即是對SQLCompiler instance進行初始化
  2. compiler = queryset.query.get_compiler(using=db),根據上述追蹤,compiler即是SQLCompiler instance
  3. results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
    • 調用SQLCompiler父類SQLCompiler @django/db/models/sql/compiler.py 的execute_sql(…)
    • SQLCompiler的execute_sql(…),這邊關注cursor = self.connection.cursor()
      • self.connction為初始化時設定,即是在Query的get_compiler的connection傳入,當時的connection為DatabaseWrapper instance
      • 所以self.connection.cursor()則是調用DatabaseWrapper instance的cursor()方法
      • DatabaseWrapper的父類BaseDatabaseWrapper的cursor()方法透過一連串的自身與子類方法調用,最後會調用到自身的connect()方法
        • get_connection_params()是讀取Djangp Settings的Database配置信息
        • get_new_connection()則是使用第三方的MySQLdb lib與實體Database進行交互
        • 上述兩個方法會將self.connection創建好,則是MySQLdb.connect(…)回傳的instance
      • DatabaseWrapper的create_cursor方法
        • 呼叫self.connection的cursor()方法,即是Database.connect(…).cursor()結果
    • 回到SQLCompiler的execute_sql(…),關注到result = cursor_iter(…)
      • 在cursor_iter中,會調用上述的cursor的fetchmany(…)方法,其會返回透過第三方MySQLdb lib到MySQL查詢獲取的結果
      • cursor_iter的第二個入參sentinel則是self.connection.features.empty_fetchmany_value,對應的是DatabaseWrapper instance的features.empty_fetchmany_value,則是DatabaseFeatures @django/db/backends/mysql/features.py的empty_fetchmany_value,其值為()
      • 最終將查詢結果返回給SQLCompiler的execute_sql(…)
  4. ModelIterable在execute_sql(…)得到真實Database回傳的數據後,會透過model_cls.from_db(…)將取回的數據,逐行包裝成Model instance,此時的model_cls為ModelBase @django/db/models/base.py

上述即是User.objects.all()以及使用For loop的整體流程追蹤。
上述提到的相關代碼如下:

@django/db/models/query.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class BaseIterable:
def __init__(self, queryset, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
self.queryset = queryset
# ...
class ModelIterable(BaseIterable):
"""Iterable that yields a model instance for each row."""
def __iter__(self):
queryset = self.queryset
db = queryset.db
compiler = queryset.query.get_compiler(using=db)
# Execute the query. This will also fill compiler.select, klass_info,
# and annotations.
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
# ...
for row in compiler.results_iter(results):
obj = model_cls.from_db(db, init_list, row[model_fields_start:model_fields_end])
for rel_populator in related_populators:
rel_populator.populate(row, obj)
if annotation_col_map:
for attr_name, col_pos in annotation_col_map.items():
setattr(obj, attr_name, row[col_pos])
# Add the known related objects to the model.
for field, rel_objs, rel_getter in known_related_objects:
# Avoid overwriting objects loaded by, e.g., select_related().
if field.is_cached(obj):
continue
rel_obj_id = rel_getter(obj)
try:
rel_obj = rel_objs[rel_obj_id]
except KeyError:
pass # May happen in qs1 | qs2 scenarios.
else:
setattr(obj, field.name, rel_obj)
yield obj

@django/db/models/sql/query.py

1
2
3
4
5
6
7
8
9
10
class Query:
# ...
compiler = 'SQLCompiler'
def get_compiler(self, using=None, connection=None):
if using is None and connection is None:
raise ValueError("Need either using or connection")
if using:
connection = connections[using]
return connection.ops.compiler(self.compiler)(self, connection, using)

@django/db/init.py

1
connections = ConnectionHandler()

@django/db/utils.py

1
2
3
4
5
6
7
8
9
class ConnectionHandler:
# ...
def __getitem__(self, alias):
# ...
db = self.databases[alias]
backend = load_backend(db['ENGINE'])
conn = backend.DatabaseWrapper(db, alias)
setattr(self._connections, alias, conn)
return conn

@django/db/backends/mysql/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
try:
import MySQLdb as Database
except ImportError as err:
raise ImproperlyConfigured(
'Error loading MySQLdb module.\n'
'Did you install mysqlclient?'
) from err
class DatabaseWrapper(BaseDatabaseWrapper):
# ...
features_class = DatabaseFeatures
ops_class = DatabaseOperations
def get_connection_params(self):
# ...
settings_dict = self.settings_dict
# ...
return kwargs
def get_new_connection(self, conn_params):
return Database.connect(**conn_params)
def create_cursor(self, name=None):
cursor = self.connection.cursor()
return CursorWrapper(cursor)

@django/db/backends/base/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BaseDatabaseWrapper:
# ...
def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS,
allow_thread_sharing=False):
# ...
self.features = self.features_class(self)
self.ops = self.ops_class(self)
def cursor(self):
"""Create a cursor, opening a connection if necessary."""
return self._cursor()
def _cursor(self, name=None):
self.ensure_connection()
with self.wrap_database_errors:
return self._prepare_cursor(self.create_cursor(name))
def ensure_connection(self):
"""Guarantee that a connection to the database is established."""
if self.connection is None:
with self.wrap_database_errors:
self.connect()
def connect(self):
# ...
conn_params = self.get_connection_params()
self.connection = self.get_new_connection(conn_params)
self.set_autocommit(self.settings_dict['AUTOCOMMIT'])
self.init_connection_state()
connection_created.send(sender=self.__class__, connection=self)

@django/db/backends/mysql/features.py

1
2
class DatabaseFeatures(BaseDatabaseFeatures):
empty_fetchmany_value = ()

@django/db/backends/mysql/operations.py

1
2
class DatabaseOperations(BaseDatabaseOperations):
compiler_module = "django.db.backends.mysql.compiler"

@django/db/backends/base/operations.py

1
2
3
4
5
6
7
8
9
10
class BaseDatabaseOperations:
def compiler(self, compiler_name):
"""
Return the SQLCompiler class corresponding to the given name,
in the namespace corresponding to the `compiler_module` attribute
on this backend.
"""
if self._cache is None:
self._cache = import_module(self.compiler_module)
return getattr(self._cache, compiler_name)

@django/db/backends/mysql/compiler.py

1
2
class SQLCompiler(compiler.SQLCompiler):
# pass

@django/db/models/sql/compiler.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class SQLCompiler:
def __init__(self, query, connection, using):
self.query = query
self.connection = connection
# ...
def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
# ...
if chunked_fetch:
cursor = self.connection.chunked_cursor()
else:
cursor = self.connection.cursor()
try:
cursor.execute(sql, params)
except Exception:
# Might fail for server-side cursors (e.g. connection closed)
cursor.close()
raise
# ...
result = cursor_iter(
cursor, self.connection.features.empty_fetchmany_value,
self.col_count if self.has_extra_select else None,
chunk_size,
)
if not chunked_fetch or not self.connection.features.can_use_chunked_reads:
try:
# If we are using non-chunked reads, we return the same data
# structure as normally, but ensure it is all read into memory
# before going any further. Use chunked_fetch if requested,
# unless the database doesn't support it.
return list(result)
finally:
# done with the cursor
cursor.close()
return result
def cursor_iter(cursor, sentinel, col_count, itersize):
"""
Yield blocks of rows from a cursor and ensure the cursor is closed when
done.
"""
try:
for rows in iter((lambda: cursor.fetchmany(itersize)), sentinel):
yield rows if col_count is None else [r[:col_count] for r in rows]
finally:
cursor.close()