上一篇完成了成绩管理的编写,这一篇开始编写统计分析功能。按照总体概述里的功能设计图,统计分析需要有增删改查、分析、导出功能。

请求方式:POST,请求接口:api/statistics,表单请求参数:

字段 注释 是否必传
name 成绩名称
class_name 班级名称

编写app/api/statistics.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yujichang

from flask import jsonify, request, send_file
from . import api
from .. import db
from app.util import insert_statistics, reanalysis_results, select_statistics
from app.models import Class, Statistics
from datetime import datetime


@api.route('/statistics', methods=['POST'])
def add_statistics():
    """
    分析增加接口,post表单需上传
    成绩名称: name
    班级名称: class_name
    """
    r_name = request.form.get('name')
    c_name = request.form.get('class_name')

    # 判断数据库是否存在对应的班级和考试,必须有班级和考试名称才能添加分析
    _class = Class.query.filter_by(name=c_name).first()
    if not _class:
        res = {'code': 0, 'msg': 'Class do not exist and cannot be analyse'}
        return jsonify(res)

    is_result = _class.results.filter_by(name=r_name).first()
    if not is_result:
        res = {'code': 0, 'msg': 'Result do not exist and cannot be analyse'}
        return jsonify(res)

    # 有班级和考试名称情况下,判断是否已经进行过分析
    is_statistics = _class.statistics.filter_by(name=r_name).first()
    if is_statistics:
        res = {'code': 0, 'msg': 'The record already analyse'}
        return jsonify(res)

    # 添加记录
    c_id = _class.id
    try:
        res = insert_statistics(r_name, c_id)
    except Exception as e:
        res = {'code': 0, 'msg': str(e)}

    return jsonify(res), 201

在增加接口中调用了util的insert_statistics函数,编写app/util.py,添加以下代码:

def analyse_results(r_name, class_id):
    """
    成绩统计分析
    :param r_name: 成绩名称
    :param class_id: 班级id
    """
    sql = "select s.name as student_name, s.is_exam, r.name as result_name, r.score, r.class_id from students s " \
          "join results r on r.student_id = s.id " \
          "where r.name = '{0}' and r.class_id = {1}".format(r_name, class_id)
    df = pd.read_sql(sql=sql, con=db.engine)

    # 删除免考和缺考
    df2 = df.dropna(subset=['score'], axis=0)
    q = []
    for i in range(len(df2)):
        if df2.iloc[i, 1] == 1:
            q.append(i)
    df2 = df2.drop(q)

    # 统计总人数和考试人数
    total_num = len(df)
    exam_num = len(df2)

    # 统计总分和平均分
    total_score = df2['score'].sum()
    mean_score = df2['score'].mean().round(2)

    # 统计合格人数和合格率
    pass_num = len(df2[df2['score'] >= 60])
    pass_per = '{:.2%}'.format(pass_num / exam_num)

    # 判断等级
    bins = [0, 60, 75, 85, 101]
    group_names = ['D', 'C', 'B', 'A']
    df2['level'] = pd.cut(df2['score'], bins, right=False, labels=group_names)

    # 统计等级人数和占比率
    a_num = len(df2[df2['level'] == 'A'])
    a_per = '{:.2%}'.format(a_num / exam_num)

    b_num = len(df2[df2['level'] == 'B'])
    b_per = '{:.2%}'.format(b_num / exam_num)

    c_num = len(df2[df2['level'] == 'C'])
    c_per = '{:.2%}'.format(c_num / exam_num)

    d_num = len(df2[df2['level'] == 'D'])
    d_per = '{:.2%}'.format(d_num / exam_num)

    data = collections.OrderedDict(name=r_name, total_score=total_score, average_score=mean_score,
                                   total_number=total_num, exam_number=exam_num, pass_number=pass_num,
                                   pass_percent=pass_per, a_number=a_num,a_percent=a_per,b_number=b_num, b_percent=b_per,
                                   c_number=c_num, c_percent=c_per, d_number=d_num, d_percent=d_per,
                                   insert_time=datetime.now(), class_id=class_id)

    df3 = pd.DataFrame(data, index=[0])

    return df3


def insert_statistics(r_name, class_id):
    """
    增加成绩统计分析
    """
    df = analyse_results(r_name, class_id)
    df.to_sql('statistics', con=db.engine, if_exists='append', index=False)

    return {'code': 1, 'msg': 'Add statistics were success'}

请求方式:DELETE,请求接口:api/statistics/id
app/api/statistics.py添加以下代码:

@api.route('/statistics/<int:sid>', methods=['DELETE'])
def delete_statistics(sid):
    """
    分析删除接口,需要在 URL 里提供分析记录的 id
    """
    # 判断是否存在记录
    _statistics = Statistics.query.get(sid)
    if not _statistics:
        res = {'code': 0, 'msg': 'Statistics do not exist'}
        return jsonify(res), 404

    # 删除记录
    db.session.delete(_statistics)
    db.session.commit()

    res = {'code': 1, 'msg': 'Statistics delete success'}
    return jsonify(res)

请求方式:PUT,请求接口:api/statistics/id:
app/api/statistics.py添加以下代码:

@api.route('/statistics/<int:sid>', methods=['PUT'])
def update_statistics(sid):
    """
    分析更新接口,需要在 URL 里提供分析记录的 id
    """
    # 判断是否存在记录
    _statistics = Statistics.query.get(sid)
    if not _statistics:
        res = {'code': 0, 'msg': 'Statistics do not exist'}
        return jsonify(res), 404

    r_name = _statistics.name
    c_id = _statistics.class_id

    # 先删除、在增加
    db.session.delete(_statistics)
    db.session.commit()
    insert_statistics(r_name, c_id)

    res = {'code': 1, 'msg': 'Statistics update success'}
    return jsonify(res)

请求方式:GET,请求接口:api/statistics,URL请求参数:

字段 注释 是否必传
name 成绩名称
class_name 班级名称
page 页码数
limit 每页数量

app/api/statistics.py添加以下代码:

@api.route('/statistics', methods=['GET'])
def show_statistics():
    """
    分析查询接口,不传参则返回全部记录,可传成绩名称和班级名称参数作为条件查询
    成绩名称: name
    班级名称: class_name
    """
    r_name = request.args.get('name')
    c_name = request.args.get('class_name')
    page = request.args.get('page')
    limit = request.args.get('limit')

    statistics_json, total = select_statistics(r_name, c_name, page, limit)
    if not statistics_json:
        msg = 'Class or Result do not exist'
    else:
        msg = 'Statistics show success'

    res = {'code': 1, 'msg': msg, 'data': statistics_json, 'total': total}
    return jsonify(res)

在查询接口中调用了util的select_statistics函数,编写app/util.py,添加以下代码:

def select_statistics(r_name=None, c_name=None, page=None, limit=None):
    """
    查询成绩统计分析
    :param r_name: 成绩名称
    :param c_name:  班级名称
    :param page: 页数
    :param limit: 每页显示数量
    """
    # 先判断班级是否存在
    _class = Class.query.filter_by(name=c_name).first()
    if _class:
        c_id = _class.id
    else:
        c_id = 0

    if r_name and c_id:
        _statistics = Statistics.query.filter_by(name=r_name, class_id=c_id)
        df = pd.read_sql(sql=str(_statistics), con=db.engine, params=(r_name, c_id))
    elif r_name:
        _statistics = Statistics.query.filter_by(name=r_name)
        df = pd.read_sql(sql=str(_statistics), con=db.engine, params={r_name})
    elif c_id:
        _statistics = Statistics.query.filter_by(class_id=c_id)
        df = pd.read_sql(sql=str(_statistics), con=db.engine, params={c_id})
    else:
        _statistics = Statistics.query
        df = pd.read_sql(sql=str(_statistics), con=db.engine)

    # 因为班级返回的是 id 而不是名称,所以需要查询 id 对应的 name
    _class = Class.query.with_entities(Class.id, Class.name).all()
    df2 = pd.DataFrame(_class)
    df2.columns = ['id', 'statistics_class_name']

    # 左连接
    df3 = pd.merge(df, df2, how='left', left_on='statistics_class_id', right_on='id')

    total = len(df3)
    if page and limit:
        df3 = df3[(int(page) - 1) * int(limit): (int(page) * int(limit))]

    if df3.empty:
        statistics_json = '[]'
    else:
        df3.drop(labels=['id', 'statistics_class_id', 'statistics_insert_time'], axis=1, inplace=True)
        statistics_json = df3.to_json(orient='records', force_ascii=False)

    return json.loads(statistics_json), total

分析

分析是对所有成绩重新进行分析,请求方式:GET,请求接口:api/statistics/analyse:
app/api/statistics.py添加以下代码:

@api.route('/statistics/analyse', methods=['GET'])
def analyse():
    """
    重新分析接口,对所有成绩重新进行分析
    """
    try:
        res = reanalysis_results()
    except Exception as e:
        res = {'code': 0, 'msg': str(e)}

    return jsonify(res)

调用了util的reanalysis_results函数,编写app/util.py,添加以下代码:

def reanalysis_results():
    """
    重新对所有成绩进行统计分析,重建分析表
    """
    # 循环所有班级考试,统计分析
    r_name_id = Result.query.group_by(Result.name, Result.class_id).all()
    df = pd.DataFrame(columns=['name', 'total_score', 'average_score', 'total_number', 'exam_number',
                               'pass_number', 'pass_percent', 'a_number', 'a_percent', 'b_number', 'b_percent',
                               'c_number', 'c_percent', 'd_number', 'd_percent', 'insert_time', 'class_id'])
    for i in range(len(r_name_id)):
        r_name = r_name_id[i].name
        r_class_id = r_name_id[i].class_id
        df_temp = analyse_results(r_name, r_class_id)
        df = pd.concat([df, df_temp], axis=0, ignore_index=True, sort=False)

    # 插入 id 列
    df['id'] = df.index + 1

    # 清空数据表,再插入新数据
    Statistics.query.delete()
    db.session.commit()
    df.to_sql('statistics', con=db.engine, if_exists='append', index=False)

    return {'code': 1, 'msg': 'The reanalysis results were success'}

导出

导出是将统计分析导入到excel,请求方式:GET,请求接口:api/statistics/export,URL请求参数:

字段 注释 是否必传
name 成绩名称
class_name 班级名称

app/api/statistics.py添加以下代码:

@api.route('/statistics/export', methods=['GET'])
def export_statistics():
    """
    导出分析excel,不传参则返回全部记录,可传成绩名称和班级名称参数作为条件查询
    成绩名称: name
    班级名称: class_name
    """
    from io import BytesIO
    import pandas as pd

    r_name = request.args.get('name')
    c_name = request.args.get('class_name')

    statistics_json, total = select_statistics(r_name, c_name)
    column_list = ['考试名称', '班级', '总分', '平均分', '班级人数', '考试人数', '合格人数', '合格率', 'A',
                   'A占比', 'B', 'B占比', 'C', 'C占比', 'D', 'D占比']
    if statistics_json:
        df = pd.DataFrame(statistics_json)
        df.drop(labels=['statistics_id'], axis=1, inplace=True)
        df.rename(columns={'statistics_name': '考试名称', 'statistics_total_score': '总分',
                           'statistics_average_score': '平均分', 'statistics_total_number': '班级人数',
                           'statistics_exam_number': '考试人数', 'statistics_pass_number': '合格人数',
                           'statistics_pass_percent': '合格率', 'statistics_a_number': 'A',
                           'statistics_a_percent': 'A占比', 'statistics_b_number': 'B',
                           'statistics_b_percent': 'B占比', 'statistics_c_number': 'C',
                           'statistics_c_percent': 'C占比', 'statistics_d_number': 'D',
                           'statistics_d_percent': 'D占比', 'statistics_class_name': '班级'}, inplace=True)
        df = df[column_list]
    else:
        df = pd.DataFrame(columns=column_list)

    output = BytesIO()
    writer = pd.ExcelWriter(output)
    df.to_excel(writer, index=False, sheet_name='sheet1')
    writer.close()
    output.seek(0)

    filename = 'statistics_' + datetime.now().strftime('%Y%m%d%H%M%S') + '.xlsx'

    return send_file(output, attachment_filename=filename, as_attachment=True)

本文完成统计分析的编写,至此整个成绩管理系统后端api部分已经完成,后续将研究前端部分。