| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import re
- def is_number(character: str):
- """
- 判断是否为数字
- """
- return bool(re.match(r'^[-+]?\d+(\.\d+)?$', character)) if character else False
- def check_new_parameter(check_v, key, value):
- """
- 请求前对数据进行校验
- params: check_v: 一个list [type,(limit_condition)]
- type 表示 value 应该用什么类型
- limit_condition 就是检验的条件,可以不存在,如果是list则表示为其中一个,tuple则表示在此范围内
- key: 是指被检查的参数的名称用以打印日志及返回报错
- value: 是现在key所对应的具体的值,也是被用来被检查的值
- return: None or error_msg
- example:
- [str, "name", (1, 64)] 表示 name应是字符串类型, 长度应大于等于1小于等于64
- [list, "sex", [0, 1,...] 表示 sex应是列表,且值应该存在于list[0, 1]中
- [int, "year", (1, 200)] 表示 year应是整数类型, 数值应大于等于1小于等于200
- """
- if check_v[0] == int:
- if is_number(str(value)):
- value = int(value)
- if not isinstance(value, check_v[0]):
- return "type error, %s should be %s, but now is %s" % (key, str(check_v[0]), value)
- if len(check_v) == 2:
- if check_v[0] == str:
- if isinstance(check_v[1], list):
- if value not in check_v[1]:
- return "Invalid param, %s is %s now, not in %s" % (key, value, str(check_v[1]))
- if isinstance(check_v[1], tuple):
- if len(value) < check_v[1][0] or len(value) > check_v[1][1]:
- return "Invalid param, %s is %s now, length is %s, range from %s to %s" % (key, value, len(value),
- check_v[0], check_v[1])
- if check_v[0] == int:
- if isinstance(check_v[1], list):
- if value not in check_v[1]:
- return "Invalid param, %s is %s now, not in %s" % (key, value, str(check_v[1]))
- if isinstance(check_v[1], tuple):
- if value < check_v[1][0] or value > check_v[1][1]:
- return "Invalid param, %s is %s now, range from %s to %s" % (key, value, check_v[0], check_v[1])
- if check_v[0] == list:
- if isinstance(check_v[1], list):
- for v in value:
- if v not in check_v[1]:
- return "Invalid param, %s is %s now, not in %s" % (key, v, str(check_v[1]))
- class CheckParams:
- """
- 检验参数
- """
- def __init__(self, request_body, params_dict, logger, logger_name=None):
- self.request_body = request_body
- self.params_dict = params_dict
- self.logger_name = logger_name
- self.res = {
- "code": 0,
- "message": "ok",
- "data": {"operate_id": ""}
- }
- self.logger = logger
- self.body = dict()
- def start(self):
- """
- 开始校验
- :return:
- """
- for k, v in self.params_dict.items():
- if k not in self.request_body:
- if v[0] is True:
- message = f"Invalid Access, {self.logger_name} %s is not Found in request" % k
- self.logger.error(message)
- self.res["message"] = message
- self.res["code"] = 400
- return False, self.res
- else:
- continue
- check_result = check_new_parameter(v[1:], k, self.request_body[k])
- if check_result is None:
- self.body[k] = self.request_body[k]
- else:
- message = f"Invalid Access, {self.logger_name} " + check_result
- self.logger.error(message)
- self.res["message"] = message
- self.res["code"] = 400
- return False, self.res
- return True, self.body
|