| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- # coding=utf-8
- import base64
- import ipaddress
- import re
- import socket
- import urllib
- from urllib.parse import urlparse, urlunparse
- import requests
- import uuid_utils.compat as uuid
- from django.db.models import QuerySet
- from django.http import HttpResponse
- from django.utils.translation import gettext_lazy as _
- from rest_framework import serializers
- from application.models import Application
- from common.exception.app_exception import NotFound404, AppApiException
- from knowledge.models import File, FileSourceType
- from tools.serializers.tool import UploadedFileField
- mime_types = {
- "html": "text/html", "htm": "text/html", "shtml": "text/html", "css": "text/css", "xml": "text/xml",
- "gif": "image/gif", "jpeg": "image/jpeg", "jpg": "image/jpeg", "js": "application/javascript",
- "atom": "application/atom+xml", "rss": "application/rss+xml", "mml": "text/mathml", "txt": "text/plain",
- "jad": "text/vnd.sun.j2me.app-descriptor", "wml": "text/vnd.wap.wml", "htc": "text/x-component",
- "avif": "image/avif", "png": "image/png", "svg": "image/svg+xml", "svgz": "image/svg+xml",
- "tif": "image/tiff", "tiff": "image/tiff", "wbmp": "image/vnd.wap.wbmp", "webp": "image/webp",
- "ico": "image/x-icon", "jng": "image/x-jng", "bmp": "image/x-ms-bmp", "woff": "font/woff",
- "woff2": "font/woff2", "jar": "application/java-archive", "war": "application/java-archive",
- "ear": "application/java-archive", "json": "application/json", "hqx": "application/mac-binhex40",
- "doc": "application/msword", "pdf": "application/pdf", "ps": "application/postscript",
- "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
- "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
- "eps": "application/postscript", "ai": "application/postscript", "rtf": "application/rtf",
- "m3u8": "application/vnd.apple.mpegurl", "kml": "application/vnd.google-earth.kml+xml",
- "kmz": "application/vnd.google-earth.kmz", "xls": "application/vnd.ms-excel",
- "eot": "application/vnd.ms-fontobject", "ppt": "application/vnd.ms-powerpoint",
- "odg": "application/vnd.oasis.opendocument.graphics",
- "odp": "application/vnd.oasis.opendocument.presentation",
- "ods": "application/vnd.oasis.opendocument.spreadsheet", "odt": "application/vnd.oasis.opendocument.text",
- "wmlc": "application/vnd.wap.wmlc", "wasm": "application/wasm", "7z": "application/x-7z-compressed",
- "cco": "application/x-cocoa", "jardiff": "application/x-java-archive-diff",
- "jnlp": "application/x-java-jnlp-file", "run": "application/x-makeself", "pl": "application/x-perl",
- "pm": "application/x-perl", "prc": "application/x-pilot", "pdb": "application/x-pilot",
- "rar": "application/x-rar-compressed", "rpm": "application/x-redhat-package-manager",
- "sea": "application/x-sea", "swf": "application/x-shockwave-flash", "sit": "application/x-stuffit",
- "tcl": "application/x-tcl", "tk": "application/x-tcl", "der": "application/x-x509-ca-cert",
- "pem": "application/x-x509-ca-cert", "crt": "application/x-x509-ca-cert",
- "xpi": "application/x-xpinstall", "xhtml": "application/xhtml+xml", "xspf": "application/xspf+xml",
- "zip": "application/zip", "bin": "application/octet-stream", "exe": "application/octet-stream",
- "dll": "application/octet-stream", "deb": "application/octet-stream", "dmg": "application/octet-stream",
- "iso": "application/octet-stream", "img": "application/octet-stream", "msi": "application/octet-stream",
- "msp": "application/octet-stream", "msm": "application/octet-stream", "mid": "audio/midi",
- "midi": "audio/midi", "kar": "audio/midi", "mp3": "audio/mp3", "ogg": "audio/ogg", "m4a": "audio/x-m4a",
- "ra": "audio/x-realaudio", "3gpp": "video/3gpp", "3gp": "video/3gpp", "ts": "video/mp2t",
- "mp4": "video/mp4", "mpeg": "video/mpeg", "mpg": "video/mpeg", "mov": "video/quicktime",
- "webm": "video/webm", "flv": "video/x-flv", "m4v": "video/x-m4v", "mng": "video/x-mng",
- "asx": "video/x-ms-asf", "asf": "video/x-ms-asf", "wmv": "video/x-ms-wmv", "avi": "video/x-msvideo",
- "wav": "audio/wav", "flac": "audio/flac", "aac": "audio/aac", "opus": "audio/opus",
- "csv": "text/csv", "tsv": "text/tab-separated-values", "ics": "text/calendar",
- }
- # 如果是音频文件并且有range请求,处理部分内容
- audio_types = ['mp3', 'wav', 'ogg', 'flac', 'aac', 'opus', 'm4a']
- class FileSerializer(serializers.Serializer):
- file = UploadedFileField(required=True, label=_('file'))
- meta = serializers.JSONField(required=False, allow_null=True)
- source_id = serializers.CharField(
- required=False, allow_null=True, label=_('source id'), default=FileSourceType.TEMPORARY_120_MINUTE.value
- )
- source_type = serializers.ChoiceField(
- choices=FileSourceType.choices, required=False, allow_null=True, label=_('source type'),
- default=FileSourceType.TEMPORARY_120_MINUTE
- )
- def upload(self, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- meta = self.data.get('meta', None)
- if not meta:
- meta = {'debug': True}
- file_id = meta.get('file_id', uuid.uuid7())
- file = File(
- id=file_id,
- file_name=self.data.get('file').name,
- meta=meta,
- source_id=self.data.get('source_id') or FileSourceType.TEMPORARY_120_MINUTE.value,
- source_type=self.data.get('source_type') or FileSourceType.TEMPORARY_120_MINUTE
- )
- file.save(self.data.get('file').read())
- return f'./oss/file/{file_id}'
- class Operate(serializers.Serializer):
- id = serializers.UUIDField(required=True)
- http_range = serializers.CharField(
- required=False, allow_blank=True, allow_null=True, label=_('HTTP Range'),
- help_text=_('HTTP Range header for partial content requests, e.g., "bytes=0-1023"')
- )
- def get(self, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- file_id = self.data.get('id')
- file = QuerySet(File).filter(id=file_id).first()
- if file is None:
- raise NotFound404(404, _('File not found'))
- file_type = file.file_name.split(".")[-1].lower()
- content_type = mime_types.get(file_type, 'application/octet-stream')
- encoded_filename = urllib.parse.quote(file.file_name)
- # 获取文件内容
- file_bytes = file.get_bytes()
- file_size = len(file_bytes)
- response = None
- if file_type in audio_types and self.data.get('http_range'):
- response = self.handle_audio(file_size, file_bytes, content_type, encoded_filename)
- if response:
- return response
- # 对于非范围请求或其他类型文件,返回完整内容
- headers = {
- 'Content-Type': content_type,
- 'Content-Disposition': f'{"inline" if file_type == "pdf" else "attachment"}; filename={encoded_filename}'
- }
- return HttpResponse(
- file_bytes,
- status=200,
- headers=headers
- )
- def handle_audio(self, file_size, file_bytes, content_type, encoded_filename):
- # 解析range请求 (格式如 "bytes=0-1023")
- range_match = re.match(r'bytes=(\d+)-(\d*)', self.data.get('http_range', ''))
- if range_match:
- start = int(range_match.group(1))
- end = int(range_match.group(2)) if range_match.group(2) else file_size - 1
- # 确保范围合法
- end = min(end, file_size - 1)
- length = end - start + 1
- # 创建部分响应
- response = HttpResponse(
- file_bytes[start:start + length],
- status=206,
- content_type=content_type
- )
- # 设置部分内容响应头
- response['Content-Range'] = f'bytes {start}-{end}/{file_size}'
- response['Accept-Ranges'] = 'bytes'
- response['Content-Length'] = str(length)
- response['Content-Disposition'] = f'inline; filename={encoded_filename}'
- return response
- def delete(self):
- self.is_valid(raise_exception=True)
- file_id = self.data.get('id')
- file = QuerySet(File).filter(id=file_id).first()
- if file is not None:
- file.delete()
- return True
- from requests.adapters import HTTPAdapter
- class SafeHTTPAdapter(HTTPAdapter):
- """
- 安全的 HTTP 适配器,防止 DNS 重绑定攻击
- 在建立连接前验证目标 IP 地址
- """
- def send(self, request, **kwargs):
- # 解析 URL 获取主机名
- parsed_url = urlparse(request.url)
- host = parsed_url.hostname
- if host:
- # 验证目标 IP 是否安全
- self._validate_host_ip(host)
- return super().send(request, **kwargs)
- def _validate_host_ip(self, host: str):
- """验证主机解析的 IP 地址是否安全"""
- try:
- # 获取所有 IP 地址(包括 IPv4 和 IPv6)
- addr_infos = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
- for addr_info in addr_infos:
- ip = addr_info[4][0]
- if self._is_unsafe_ip(ip):
- raise AppApiException(500, _('Access to internal IP addresses is blocked'))
- except AppApiException:
- raise
- except Exception as e:
- raise AppApiException(500, _('Failed to resolve host: {error}').format(error=str(e)))
- def _is_unsafe_ip(self, ip: str) -> bool:
- """检查 IP 地址是否属于不安全的范围"""
- try:
- ip_addr = ipaddress.ip_address(ip)
- return (
- ip_addr.is_private or
- ip_addr.is_loopback or
- ip_addr.is_reserved or
- ip_addr.is_link_local or
- ip_addr.is_multicast
- )
- except Exception:
- return True
- def get_url_content(url, application_id: str):
- application = Application.objects.filter(id=application_id).first()
- if application is None:
- return AppApiException(500, _('Application does not exist'))
- if not application.file_upload_enable:
- return AppApiException(500, _('File upload is not enabled'))
- file_limit = 50 * 1024 * 1024
- if application.file_upload_setting and application.file_upload_setting.get('fileLimit'):
- file_limit = application.file_upload_setting.get('fileLimit') * 1024 * 1024
- parsed = validate_url(url)
- # 创建带有安全检查的 session
- session = requests.Session()
- safe_adapter = SafeHTTPAdapter()
- session.mount('http://', safe_adapter)
- session.mount('https://', safe_adapter)
- try:
- response = session.get(
- url,
- timeout=3,
- allow_redirects=False
- )
- finally:
- session.close()
- final_host = urlparse(response.url).hostname
- if is_private_ip(final_host):
- raise ValueError("Blocked unsafe redirect to internal host")
- # 判断文件大小
- if int(response.headers.get('Content-Length', 0)) > file_limit:
- raise AppApiException(500, _('File size exceeds limit'))
- # 返回状态码 响应内容大小 响应的contenttype 还有字节流
- content_type = response.headers.get('Content-Type', '')
- # 根据内容类型决定如何处理
- if 'text' in content_type or 'json' in content_type:
- content = response.text
- else:
- # 二进制内容使用Base64编码
- content = base64.b64encode(response.content).decode('utf-8')
- return {
- 'status_code': response.status_code,
- 'Content-Length': response.headers.get('Content-Length', 0),
- 'Content-Type': content_type,
- 'content': content,
- }
- def is_private_ip(host: str) -> bool:
- """检测 IP 是否属于内网、环回、云 metadata 的危险地址"""
- try:
- ip = ipaddress.ip_address(socket.gethostbyname(host))
- return (
- ip.is_private or
- ip.is_loopback or
- ip.is_reserved or
- ip.is_link_local or
- ip.is_multicast
- )
- except Exception:
- return True
- def validate_and_normalize_url(url: str) -> str:
- """
- 严格验证并规范化 URL,防止 URL 解析绕过攻击
- 防御场景:
- - http://127.0.0.1:6666\@1.1.1.1/ (反斜杠绕过)
- - http://127.0.0.1:6666@1.1.1.1/ (认证信息混淆)
- - http://1.1.1.1#@127.0.0.1:6666/ (片段注入)
- """
- if not url:
- raise ValueError("URL is required")
- # 1. 拒绝包含危险字符的 URL
- dangerous_patterns = [
- r'\\', # 反斜杠
- r'\s', # 空白字符
- r'%00', # 空字节
- r'%0a', # 换行符
- r'%0d', # 回车符
- ]
- url_lower = url.lower()
- for pattern in dangerous_patterns:
- if re.search(pattern, url_lower):
- raise ValueError("URL contains dangerous characters")
- # 2. 解析 URL
- parsed = urlparse(url)
- # 3. 仅允许 http / https
- if parsed.scheme not in ("http", "https"):
- raise ValueError("Only http and https are allowed")
- # 4. 提取主机名(从 netloc 中)
- netloc = parsed.netloc
- # 5. 如果 netloc 中包含 @,说明有认证信息,需要特别处理
- if '@' in netloc:
- # 分离认证信息和主机
- auth_part, host_part = netloc.rsplit('@', 1)
- # 检查认证部分是否包含危险的 IP 或端口信息
- # 攻击者可能在认证部分放置内网地址
- if ':' in auth_part or '.' in auth_part:
- raise ValueError("Authentication part contains suspicious content")
- # 使用真实的主机部分
- actual_host = host_part.split(':')[0] if ':' in host_part else host_part
- else:
- # 没有认证信息,直接提取主机
- actual_host = parsed.hostname
- # 6. 验证主机名不为空
- if not actual_host:
- raise ValueError("Invalid URL: missing hostname")
- # 7. 验证主机不是 IP 地址形式的内网地址
- # 这样可以防止直接在 URL 中使用内网 IP
- try:
- # 尝试解析为 IP 地址
- ip_addr = ipaddress.ip_address(actual_host)
- if is_private_ip(actual_host):
- raise ValueError("Access to internal IP addresses is blocked")
- except ValueError as e:
- # 如果不是 IP 地址(是域名),则继续检查
- if "internal IP" in str(e):
- raise
- # 对于域名,检查其解析结果
- if is_private_ip(actual_host):
- raise ValueError("Access to internal IP addresses is blocked")
- # 8. 重新构建干净的 URL,移除可能的认证信息
- clean_netloc = actual_host
- if parsed.port:
- clean_netloc = f"{actual_host}:{parsed.port}"
- clean_url = urlunparse((
- parsed.scheme,
- clean_netloc,
- parsed.path,
- parsed.params,
- parsed.query,
- '' # 移除 fragment,防止片段注入
- ))
- return clean_url
- def validate_url(url: str):
- """验证 URL 是否安全(保留向后兼容)"""
- return validate_and_normalize_url(url)
|