上一篇完成了成绩管理的编写,这一篇开始编写统计分析功能。按照总体概述里的功能设计图,统计分析需要有增删改查、分析、导出功能。
增
请求方式:POST,请求接口:api/statistics,表单请求参数:
字段 | 注释 | 是否必传 |
---|---|---|
name | 成绩名称 | 是 |
class_name | 班级名称 | 是 |
编写app/api/statistics.py
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yujichang
from flask import jsonify, request, send_file
from . import api
from .. import db
from app.util import insert_statistics, reanalysis_results, select_statistics
from app.models import Class, Statistics
from datetime import datetime
@api.route('/statistics', methods=['POST'])
def add_statistics():
"""
分析增加接口,post表单需上传
成绩名称: name
班级名称: class_name
"""
r_name = request.form.get('name')
c_name = request.form.get('class_name')
# 判断数据库是否存在对应的班级和考试,必须有班级和考试名称才能添加分析
_class = Class.query.filter_by(name=c_name).first()
if not _class:
res = {'code': 0, 'msg': 'Class do not exist and cannot be analyse'}
return jsonify(res)
is_result = _class.results.filter_by(name=r_name).first()
if not is_result:
res = {'code': 0, 'msg': 'Result do not exist and cannot be analyse'}
return jsonify(res)
# 有班级和考试名称情况下,判断是否已经进行过分析
is_statistics = _class.statistics.filter_by(name=r_name).first()
if is_statistics:
res = {'code': 0, 'msg': 'The record already analyse'}
return jsonify(res)
# 添加记录
c_id = _class.id
try:
res = insert_statistics(r_name, c_id)
except Exception as e:
res = {'code': 0, 'msg': str(e)}
return jsonify(res), 201
在增加接口中调用了util的insert_statistics函数,编写app/util.py
,添加以下代码:
def analyse_results(r_name, class_id):
"""
成绩统计分析
:param r_name: 成绩名称
:param class_id: 班级id
"""
sql = "select s.name as student_name, s.is_exam, r.name as result_name, r.score, r.class_id from students s " \
"join results r on r.student_id = s.id " \
"where r.name = '{0}' and r.class_id = {1}".format(r_name, class_id)
df = pd.read_sql(sql=sql, con=db.engine)
# 删除免考和缺考
df2 = df.dropna(subset=['score'], axis=0)
q = []
for i in range(len(df2)):
if df2.iloc[i, 1] == 1:
q.append(i)
df2 = df2.drop(q)
# 统计总人数和考试人数
total_num = len(df)
exam_num = len(df2)
# 统计总分和平均分
total_score = df2['score'].sum()
mean_score = df2['score'].mean().round(2)
# 统计合格人数和合格率
pass_num = len(df2[df2['score'] >= 60])
pass_per = '{:.2%}'.format(pass_num / exam_num)
# 判断等级
bins = [0, 60, 75, 85, 101]
group_names = ['D', 'C', 'B', 'A']
df2['level'] = pd.cut(df2['score'], bins, right=False, labels=group_names)
# 统计等级人数和占比率
a_num = len(df2[df2['level'] == 'A'])
a_per = '{:.2%}'.format(a_num / exam_num)
b_num = len(df2[df2['level'] == 'B'])
b_per = '{:.2%}'.format(b_num / exam_num)
c_num = len(df2[df2['level'] == 'C'])
c_per = '{:.2%}'.format(c_num / exam_num)
d_num = len(df2[df2['level'] == 'D'])
d_per = '{:.2%}'.format(d_num / exam_num)
data = collections.OrderedDict(name=r_name, total_score=total_score, average_score=mean_score,
total_number=total_num, exam_number=exam_num, pass_number=pass_num,
pass_percent=pass_per, a_number=a_num,a_percent=a_per,b_number=b_num, b_percent=b_per,
c_number=c_num, c_percent=c_per, d_number=d_num, d_percent=d_per,
insert_time=datetime.now(), class_id=class_id)
df3 = pd.DataFrame(data, index=[0])
return df3
def insert_statistics(r_name, class_id):
"""
增加成绩统计分析
"""
df = analyse_results(r_name, class_id)
df.to_sql('statistics', con=db.engine, if_exists='append', index=False)
return {'code': 1, 'msg': 'Add statistics were success'}
删
请求方式:DELETE,请求接口:api/statistics/id
在app/api/statistics.py
添加以下代码:
@api.route('/statistics/<int:sid>', methods=['DELETE'])
def delete_statistics(sid):
"""
分析删除接口,需要在 URL 里提供分析记录的 id
"""
# 判断是否存在记录
_statistics = Statistics.query.get(sid)
if not _statistics:
res = {'code': 0, 'msg': 'Statistics do not exist'}
return jsonify(res), 404
# 删除记录
db.session.delete(_statistics)
db.session.commit()
res = {'code': 1, 'msg': 'Statistics delete success'}
return jsonify(res)
改
请求方式:PUT,请求接口:api/statistics/id:
在app/api/statistics.py
添加以下代码:
@api.route('/statistics/<int:sid>', methods=['PUT'])
def update_statistics(sid):
"""
分析更新接口,需要在 URL 里提供分析记录的 id
"""
# 判断是否存在记录
_statistics = Statistics.query.get(sid)
if not _statistics:
res = {'code': 0, 'msg': 'Statistics do not exist'}
return jsonify(res), 404
r_name = _statistics.name
c_id = _statistics.class_id
# 先删除、在增加
db.session.delete(_statistics)
db.session.commit()
insert_statistics(r_name, c_id)
res = {'code': 1, 'msg': 'Statistics update success'}
return jsonify(res)
查
请求方式:GET,请求接口:api/statistics,URL请求参数:
字段 | 注释 | 是否必传 |
---|---|---|
name | 成绩名称 | 否 |
class_name | 班级名称 | 否 |
page | 页码数 | 否 |
limit | 每页数量 | 否 |
在app/api/statistics.py
添加以下代码:
@api.route('/statistics', methods=['GET'])
def show_statistics():
"""
分析查询接口,不传参则返回全部记录,可传成绩名称和班级名称参数作为条件查询
成绩名称: name
班级名称: class_name
"""
r_name = request.args.get('name')
c_name = request.args.get('class_name')
page = request.args.get('page')
limit = request.args.get('limit')
statistics_json, total = select_statistics(r_name, c_name, page, limit)
if not statistics_json:
msg = 'Class or Result do not exist'
else:
msg = 'Statistics show success'
res = {'code': 1, 'msg': msg, 'data': statistics_json, 'total': total}
return jsonify(res)
在查询接口中调用了util的select_statistics函数,编写app/util.py
,添加以下代码:
def select_statistics(r_name=None, c_name=None, page=None, limit=None):
"""
查询成绩统计分析
:param r_name: 成绩名称
:param c_name: 班级名称
:param page: 页数
:param limit: 每页显示数量
"""
# 先判断班级是否存在
_class = Class.query.filter_by(name=c_name).first()
if _class:
c_id = _class.id
else:
c_id = 0
if r_name and c_id:
_statistics = Statistics.query.filter_by(name=r_name, class_id=c_id)
df = pd.read_sql(sql=str(_statistics), con=db.engine, params=(r_name, c_id))
elif r_name:
_statistics = Statistics.query.filter_by(name=r_name)
df = pd.read_sql(sql=str(_statistics), con=db.engine, params={r_name})
elif c_id:
_statistics = Statistics.query.filter_by(class_id=c_id)
df = pd.read_sql(sql=str(_statistics), con=db.engine, params={c_id})
else:
_statistics = Statistics.query
df = pd.read_sql(sql=str(_statistics), con=db.engine)
# 因为班级返回的是 id 而不是名称,所以需要查询 id 对应的 name
_class = Class.query.with_entities(Class.id, Class.name).all()
df2 = pd.DataFrame(_class)
df2.columns = ['id', 'statistics_class_name']
# 左连接
df3 = pd.merge(df, df2, how='left', left_on='statistics_class_id', right_on='id')
total = len(df3)
if page and limit:
df3 = df3[(int(page) - 1) * int(limit): (int(page) * int(limit))]
if df3.empty:
statistics_json = '[]'
else:
df3.drop(labels=['id', 'statistics_class_id', 'statistics_insert_time'], axis=1, inplace=True)
statistics_json = df3.to_json(orient='records', force_ascii=False)
return json.loads(statistics_json), total
分析
分析是对所有成绩重新进行分析,请求方式:GET,请求接口:api/statistics/analyse:
在app/api/statistics.py
添加以下代码:
@api.route('/statistics/analyse', methods=['GET'])
def analyse():
"""
重新分析接口,对所有成绩重新进行分析
"""
try:
res = reanalysis_results()
except Exception as e:
res = {'code': 0, 'msg': str(e)}
return jsonify(res)
调用了util的reanalysis_results函数,编写app/util.py
,添加以下代码:
def reanalysis_results():
"""
重新对所有成绩进行统计分析,重建分析表
"""
# 循环所有班级考试,统计分析
r_name_id = Result.query.group_by(Result.name, Result.class_id).all()
df = pd.DataFrame(columns=['name', 'total_score', 'average_score', 'total_number', 'exam_number',
'pass_number', 'pass_percent', 'a_number', 'a_percent', 'b_number', 'b_percent',
'c_number', 'c_percent', 'd_number', 'd_percent', 'insert_time', 'class_id'])
for i in range(len(r_name_id)):
r_name = r_name_id[i].name
r_class_id = r_name_id[i].class_id
df_temp = analyse_results(r_name, r_class_id)
df = pd.concat([df, df_temp], axis=0, ignore_index=True, sort=False)
# 插入 id 列
df['id'] = df.index + 1
# 清空数据表,再插入新数据
Statistics.query.delete()
db.session.commit()
df.to_sql('statistics', con=db.engine, if_exists='append', index=False)
return {'code': 1, 'msg': 'The reanalysis results were success'}
导出
导出是将统计分析导入到excel,请求方式:GET,请求接口:api/statistics/export,URL请求参数:
字段 | 注释 | 是否必传 |
---|---|---|
name | 成绩名称 | 否 |
class_name | 班级名称 | 否 |
在app/api/statistics.py
添加以下代码:
@api.route('/statistics/export', methods=['GET'])
def export_statistics():
"""
导出分析excel,不传参则返回全部记录,可传成绩名称和班级名称参数作为条件查询
成绩名称: name
班级名称: class_name
"""
from io import BytesIO
import pandas as pd
r_name = request.args.get('name')
c_name = request.args.get('class_name')
statistics_json, total = select_statistics(r_name, c_name)
column_list = ['考试名称', '班级', '总分', '平均分', '班级人数', '考试人数', '合格人数', '合格率', 'A',
'A占比', 'B', 'B占比', 'C', 'C占比', 'D', 'D占比']
if statistics_json:
df = pd.DataFrame(statistics_json)
df.drop(labels=['statistics_id'], axis=1, inplace=True)
df.rename(columns={'statistics_name': '考试名称', 'statistics_total_score': '总分',
'statistics_average_score': '平均分', 'statistics_total_number': '班级人数',
'statistics_exam_number': '考试人数', 'statistics_pass_number': '合格人数',
'statistics_pass_percent': '合格率', 'statistics_a_number': 'A',
'statistics_a_percent': 'A占比', 'statistics_b_number': 'B',
'statistics_b_percent': 'B占比', 'statistics_c_number': 'C',
'statistics_c_percent': 'C占比', 'statistics_d_number': 'D',
'statistics_d_percent': 'D占比', 'statistics_class_name': '班级'}, inplace=True)
df = df[column_list]
else:
df = pd.DataFrame(columns=column_list)
output = BytesIO()
writer = pd.ExcelWriter(output)
df.to_excel(writer, index=False, sheet_name='sheet1')
writer.close()
output.seek(0)
filename = 'statistics_' + datetime.now().strftime('%Y%m%d%H%M%S') + '.xlsx'
return send_file(output, attachment_filename=filename, as_attachment=True)
本文完成统计分析的编写,至此整个成绩管理系统后端api部分已经完成,后续将研究前端部分。