django企业开发实战-爱代码爱编程
写在前面
初次阅读此书是三年前,当时没经历过完整的项目 觉得这书就是扯淡 后来经历过项目加班与毒打 今天再翻开此书 觉得实乃不可多得之物 花些时间啃下来吧
django版本 3.2
本博客开源项目地址 kimsmith/django企业实战 (gitee.com)
有的代码因为版本混乱报错 自己调调
需求
需求文档
写文档,列举需要实现的功能,详细列举,不考虑技术实现细节
需求评审与分析
主要是将需求文档落实到技术细节,评审需求需要的技术栈,然后评审需求是否可实现,预估每个子需求的工作量等
此处可以考虑后续的衍生需求,考虑技术实现是否可行是否困难,技术需要的工作量等
功能分析
技术人员对需求评审的结果进行技术实现分析,模块划分等
模块划分可以基于数据实体制作ER图,或者建立UML图
模块划分
作用是将一个大项目分成几个小模块,让手下的人去按模块开发
框架基础和技术选型
需要选择 语言 框架 数据库 然后考虑团队开发与实现能力等
wsgi
wsgi,全称Web Server Gateway Interface,Web服务器网关接口,是用来规定web server应如何和程序交互的网关协议。可以理解为一个web应用的容器,适配了程序和操作系统之间功能,将操作系统一些功能抽象为接口提供给程序使用
可以使用wsgi,目的是使用实现统一协议的web server,不然换着乱
简单的web server
一个基本的socket监听程序
# coding:utf-8
import socket
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'hello, world<h1> from tjh </h1>'
resp_params = [
'HTTP/1.0 200 OK',
'Date: Sun, 31 jul 2024 09:35:33 GMT',
'Content-Type: text/html; charset=utf-8',
'Content-Length: ()\r\n'.format(len(body.encode())),
body
]
resp = '\r\n'.join(resp_params)
def handle_connection(conn, addr):
req = b''
while EOL1 not in req and EOL2 not in req:
req += conn.recv(1024)
print(req)
conn.send(resp.encode())
conn.close()
def main():
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ss.bind(('127.0.0.1', 8000))
ss.listen(5) # conn max queue number
print('http://127.0.0.1:8000')
try:
while True:
conn, addr = ss.accept()
handle_connection(conn, addr)
finally:
ss.close()
if __name__ == '__main__':
main()
效果
注意 Content-Type为text/plain 还是text/html
多线程版web server
还是阻塞模式,非阻塞会报错 还没处理
# coding:utf-8
import socket
import errno
import threading
import time
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'hello, world<h1> from tjh </h1>-from {thread_name}'
resp_params = [
'HTTP/1.0 200 OK',
'Date: Sun, 31 jul 2024 09:35:33 GMT',
'Content-Type: text/html; charset=utf-8',
'Content-Length: {length}\r\n',
body
]
resp = '\r\n'.join(resp_params)
def handle_connection(conn, addr):
req = b''
while EOL1 not in req and EOL2 not in req:
req += conn.recv(1024)
print(req)
current_thread = threading.currentThread()
content_length = len(body.format(thread_name=current_thread.name).encode())
print(current_thread.name)
conn.send(resp.format(thread_name=current_thread.name, length=content_length).encode())
conn.close()
def main():
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# ss.setblocking(0) # set socket mode as non block
# ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ss.bind(('127.0.0.1', 8000))
ss.listen(10) # conn max queue number
print('http://127.0.0.1:8000')
# ss.setblocking(0) # set socket mode as non block
try:
i = 0
while True:
try:
conn, addr = ss.accept()
except socket.error as e:
if e.args[0] != errno.EAGAIN:
raise
continue
i += 1
print(i)
t = threading.Thread(target=handle_connection, args=(conn, addr), name='thread-%s' % i)
t.start()
finally:
ss.close()
if __name__ == '__main__':
main()
效果
简单wsgi application
wsgi协议分为两部分,一个是web server,一个是web application。接受请求时,会通过wsgi协议将数据发给web application,application处理完后,设置对应状体和header,之后返回body给web server,web server拿到数据后,进行http协议封装,返回完整http response
# coding: utf-8
import os
import sys
from app import simple_app
def wsgi_to_bytes(s):
return s.encode()
def run_with_cgi(app):
environ = dict(os.environ.items())
environ['wsgi.input'] = sys.stdin.buffer
environ['wsgi.errors'] = sys.stderr
environ['wsgi.version'] = (1, 0)
environ['wsgi.multithread'] = False
environ['wsgi.multiprocess'] = True
environ['wsgi.run_once'] = True
if environ.get('HTTPS', 'off') in ('on', '1'):
environ['wsgi.url_scheme'] = 'https'
else:
environ['wsgi.url_scheme'] = 'http'
headers_set = []
headers_sent = []
def write(data):
out = sys.stdout.buffer
if not headers_set:
raise AssertionError('write() before start_response()')
elif not headers_sent:
status, resp_headers = headers_sent[:] = headers_set
out.write(wsgi_to_bytes('Status: %s\r\n' % status))
for header in resp_headers:
out.write(wsgi_to_bytes('%s: %s\r\n' % header))
out.write(wsgi_to_bytes('\r\n'))
out.write(data)
out.flush()
def start_response(status, resp_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
raise (exc_info[0], exc_info[1], exc_info[2])
finally:
exc_info = None
elif headers_set:
raise AssertionError('headers already set')
headers_set[:] = [status, resp_headers]
return write
result = app(environ, start_response)
try:
for data in result:
if data:
write(data)
if not headers_sent:
write('')
finally:
if hasattr(result, 'close'):
result.close()
if __name__ == '__main__':
run_with_cgi(simple_app)
理解wsgi
wsgi规定,application必须是一个可调用对象,则这个可调用对象可以是函数或实现了__call__方法的实例
wsgi中间件和werkzeug
flask
截至目前 有了两种方法提供web服务:直接通过socket处理请求,或者通过实现wsgi application部分协议
入门推荐
py微型框架有比如web.py, bottle,flask等 flask是一个不错的微型框架
tornado
高性能。tornado不是基于wsgi协议的框架,但提供了wsgi的支持,特性是异步和非阻塞。可以使用自带的http server进行部署而不是wsgi,因为wsgi是一个同步接口
和flask相比,tornado更侧重性能,整体并不比flask丰富,flask更多的支持对业务的满足
django
和微型框架不同,django框架不是仅需要两三个py文件就能跑起来的web框架,django功能更全也更大
django起步
管理系统后台开发
先安装django,再django-admin startproject project_name创建初始项目
cd到project下,创建一个app
models.py
from django.db import models
# Create your models here.
class Student(models.Model):
SEX_ITEMS = [(1, '男'), (2, '女'), (0, '未知')]
STATUS_ITEMS = [(0, '申请'), (1, '通过'), (2, '拒绝')]
name = models.CharField(max_length=128, verbose_name='姓名')
sex = models.IntegerField(choices=SEX_ITEMS, verbose_name='性别')
profession = models.CharField(max_length=128, verbose_name='职业')
email = models.EmailField(verbose_name='Email')
qq = models.CharField(max_length=128, verbose_name='QQ')
phone = models.CharField(max_length=128, verbose_name='电话')
status = models.IntegerField(choices=STATUS_ITEMS, default=0, verbose_name='审核状态')
created_time = models.DateTimeField(auto_now_add=True, editable=False, verbose_name='创建时间')
def __str__(self):
return '<Student: {}>'.format(self.name)
class Meta:
verbose_name = verbose_name_plural = '学员信息'
admin.py
from django.contrib import admin
# Register your models here.
from .models import Student
admin.site.register(Student)
将创建的app在setting中加到installed_apps下
创建数据库迁移文件 python manage.py makemigrations
创建表 python manage.py migrate
创建超级用户 python manage.py createsuperuser
运行测试服务器,访问127.0.0.1:8000 发现是默认页面
访问127.0.0.1:8000/admin会跳转到刚开发的管理员页面,然后用刚创的管理员用户名和密码登录
管理员页面语言是英文,时区也是UTC,可在setting进行配置
重新启动测试服务器,再次登入管理员页面,发现语言变成中文
管理系统前台开发
创一个默认视图函数index
from django.shortcuts import render
# Create your views here.
def index(request):
words = 'hello, world!'
return render(request, 'index.html', context={'words': words})
视图函数使用了模板文件index.html,在app的templates目录下创建index.html,render渲染时会自动去寻找模板文件。django渲染时会去每个app下查找模板,寻找的目标app是在setting中注册的app,并且是按顺序寻找(从上到下)。如果两个不同app下有同名模板,则第一个app的模板会覆盖第二个app的模板
先在app下创建目录templates再在templates中创建index.html
<!DOCTYPE html>
<html>
<head>
<title>学员管理系统</title>
</head>
<body>
{{ words }}
</body>
</html>
模板用到了django模板语法 {{ words }},这个变量是从视图函数的context传过来的上下文
在urls.py中配置url
from django.contrib import admin
from django.urls import path, re_path
from admin_backend.views import index
urlpatterns = [
re_path(r'^$', index, name='index'),
path('admin/', admin.site.urls),
]
重新运行测试服务器,访问127.0.0.1:8000查看效果
在admin后台站点尝试手动创建几个学生数据用于练手,需要修改views.py
def index(request):
students = Student.objects.all()
return render(request, 'index.html', context={'students': students})
<body>
<ul>
{% for student in students %}
<li>{{ student.name }} - {{student.get_status_display }}</li>
{% endfor %}
</ul>
</body>
注意到student的方法get_status_display,实际模型只有status字段没有这个方法,方法是django自己拼出来的,方法作用是展示这个字段实际打算展示的值。
还有一个,模板里的方法可以不用写括号,django会自己帮忙调用
开发提交数据的功能,使用模型表单
from django import forms
from .models import Student
class StudentForm(forms.Form):
# name = forms.CharField(label='姓名', max_length=128)
# sex = forms.ChoiceField(label='性别', choices=Student.SEX_ITEMS)
# profession = forms.CharField(label='职业', max_length=128)
# email = forms.EmailField(label='邮箱', max_length=128)
# qq = forms.CharField(label='QQ', max_length=128)
# phone = forms.CharField(label='手机', max_length=128)
class Meta:
model = Student
fields = ('name', 'sex', 'profession', 'email', 'qq', 'phone')
模型表单可以每一条字段一行,也可以直接复用models.py中对应的模型字段
当需要form和model不完全一致时,可以在模型表单里单独修改,比如增加qq号必须是纯数字校验
class StudentForm(forms.Form):
def clean_qq(self):
cleaned_data = self.cleaned_data['qq']
if not cleaned_data.isdigit():
raise forms.ValidationError('必须是数字')
class Meta:
model = Student
fields = ('name', 'sex', 'profession', 'email', 'qq', 'phone')
clean_qq是模型表单会自动调用处理每个字段的方法
有了表单,修改views.py逻辑,开始使用模型表单
from django.shortcuts import render
from django.http import HttpResponseRedirect
from django.urls import reverse
from .models import Student
from .forms import StudentForm
# Create your views here.
def index(request):
students = Student.objects.all()a
if request.method == 'POST':
form = StudentForm(request.POST)
if form.is_valid():
cleaned_data = form.cleaned_data
student = Student()
student.name = cleaned_data['name']
student.sex = cleaned_data['sex']
student.email = cleaned_data['email']
student.profession = cleaned_data['profession']
student.qq = cleaned_data['qq']
student.phone = cleaned_data['phone']
student.save()
return HttpResponseRedirect(reverse('index'))
else:
form = StudentForm()
context = {
'students': students,
'form': form,
}
return render(request, 'index.html', context=context)
form.cleaned_data是用户在页面输入完数据提交后,django根据字段类型转换后的字段
reverse方法避免了将url地址硬编码到视图函数中,如果要修改url地址,仅需在urls.py中改即可,无需修改views.py
表单模型提供了表单验证能力,减少了前端进行表单验证代码量
上述代码中,手动将字段值从form实例更新到模型实例可以省略,直接保存表单模型实例就可以
# Create your views here.
def index(request):
students = Student.objects.all()
if request.method == 'POST':
form = StudentForm(request.POST)
if form.is_valid():
form.save()
return HttpResponseRedirect(reverse('index'))
else:
form = StudentForm()
context = {
'students': students,
'form': form,
}
return render(request, 'index.html', context=context)
当用户get的时候,页面是填写表单,提交时,保存表单
index.html
<!DOCTYPE html>
<html>
<head>
<title>学员管理系统</title>
</head>
<body>
<h3><a href="/admin/">Admin</a></h3>
<ul>
{% for student in students %}
<li>{{ student.name }} - {{ student.get_status_display }}</li>
{% endfor %}
</ul>
<hr />
<form action="/" method="post">
{% csrf_token %}
{{ form }}
<input type="submit" value="Submit"/>
</form>
</body>
</html>
csrf_token如果没有这个,提交的数据是无效的,这可以用来防止跨站伪造请求攻击
访问127.0.0.1:8000看下效果
发现提交后没有报错 也没有显示新提交的信息,原因是字段非法,视图函数也没报错
注意到视图函数中包含了查询数据的操作,当需要过滤查询条件,或者需要缓存数据,都需要修改视图函数代码,此时可将数据获取逻辑封装到model中
模型层代码
class Student(models.Model):
@classmethod
def get_all(cls):
return cls.objects.all()
# 省略其他代码
视图函数代码
def index(request):
students = Student.all()
管理系统进阶开发
class-based view
当有多个类似的view函数,可以考虑使用class-based抽象
# urls.py
urlpatterns = [
re_path(r'^$', index, name='index'),
# re_path(r'^$', IndexView.as_view(), name='index'),
path('admin/', admin.site.urls),
]
# views.py
class IndexView(View):
template_name = 'index.html'
def get_context(self):
students = Student.get_all()
context = {'students': students}
return context
def get(self, request):
context = self.get_context()
form = StudentForm()
context.update({'form': form})
return render(request, self.template_name, context=context)
def post(self, request):
form = StudentForm(request.POST)
if form.is_valid():
form.save()
return HttpResponseRedirect(reverse('index'))
context = self.get_context()
context.update({'form': form})
return render(request, self.template_name, context=context)
创建一个middleware练手,用来统计django接受请求到返回请求的时间
先创建文件middlewares.py,与views.py同级别
# middlewares.py
import time
from django.urls import reverse
from django.utils.deprecation import MiddlewareMixin
class TimeItMiddleware(MiddlewareMixin):
def process_request(self, request):
self.start_time = time.time()
return
def process_view(self, request, func, *args, **kwargs):
if request.path != reverse('index'):
return None
start = time.time()
response = func(request)
costed = time.time() - start
print('process view: {:.2f}s'.format(costed))
return response
def process_exception(self, request, exception):
pass
def process_tempmlate_response(self, request, response):
return response
def process_response(self, request, response):
costed = time.time() - self.start_time
print('request to response cost: {:.2f}s'.format(costed))
return response
请求来和返回相应一般会按顺序调用这些函数:process_request, process_view, process_exception, process_template_response, process_response
process_request是请求来第一个处理函数,此函数可放些请求头校验等任务
process_view是process_request之后执行,用来执行view函数,func参数就是视图函数。可以在此处进行视图函数执行耗时统计
process_template_response 当拿到视图函数的返回函数,且返回函数为模板渲染的函数,则会执行该函数
process_response 当处理完模板渲染的视图函数后,会执行该方法
process_exception 当在视图函数处理过程或返回模板渲染时发生异常,才会执行proces_exception函数。但如果在process_view中手动调用func,就不会触发process_exception
middleware写完后,在settings中的MIDDLEWARE加上middleware
运行web,查看控制台打印访问耗时
写单元测试
运行单元测试时,django会创建一个基于内存的测试数据库,也就是说,这对生产部署的环境没有影响
但对于mysql数据库,django会直接用配置的数据库用户名和密码创建一个名为test_modelname_db的数据库用于测试,此时需要保证有见表和建数据库的权限
也可以在settings指定测试创建的数据库名
对于测试用例,django提供了一个名为TestCase的基类,可以继承这个类实现自己的测试用例。基类的方法和python的单元测试方法差不多
# models.py
@property
def sex_show(self):
return dict(self.SEX_ITEMS)[self.sex]
# tests.py
from django.test import TestCase, client
from .models import Student
# Create your tests here.
class StudentTestCase(TestCase):
def setUp(self):
Student.objects.create(
name='tjh',
sex=1,
email='test@qq.com',
profession='coder',
qq='2222',
phone='3333',
)
def test_create_and_sex_show(self):
student = Student.objects.creat(
name='tjh',
sex=1,
email='aaa@dd.com',
profession='coder',
qq='4444',
phone='5555'
)
self.assertEqual(student.sex_show, '男', "性别字段内容和展示不一致")
def test_filter(self):
student = Student.objects.creat(
name='tjh',
sex=1,
email='aaa@dd.com',
profession='coder',
qq='4444',
phone='5555'
)
name = 'tjhaa'
students = Student.objects.filter(name=name)
self.assertEqual(students.count(), 1, 'student count should be 1')
view层测试
# tests.py
def test_get_index(self):
client = Client()
resp = client.get('/')
self.assertEqual(resp.status_code, 200, 'status code should be 200')
def test_post_student(self):
client = Client()
data = dict(
name='test_for_post',
sex=1,
email='123@qq.com',
profession='coder',
qq='1111',
phone='9999'
)
resp = client.post('/', data)
self.assertEqual(resp.status_code, 302, 'status code should be 302')
resp = client.get('/')
self.assertTrue(b'test_for_post' in resp.context, 'resp content should include test_for_post')