utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import io
  2. import re
  3. import os
  4. from typing import List, Dict
  5. from urllib.parse import urlparse
  6. from PIL import Image
  7. from common.log import logger
  8. def fsize(file):
  9. if isinstance(file, io.BytesIO):
  10. return file.getbuffer().nbytes
  11. elif isinstance(file, str):
  12. return os.path.getsize(file)
  13. elif hasattr(file, "seek") and hasattr(file, "tell"):
  14. pos = file.tell()
  15. file.seek(0, os.SEEK_END)
  16. size = file.tell()
  17. file.seek(pos)
  18. return size
  19. else:
  20. raise TypeError("Unsupported type")
  21. def compress_imgfile(file, max_size):
  22. if fsize(file) <= max_size:
  23. return file
  24. file.seek(0)
  25. img = Image.open(file)
  26. rgb_image = img.convert("RGB")
  27. quality = 95
  28. while True:
  29. out_buf = io.BytesIO()
  30. rgb_image.save(out_buf, "JPEG", quality=quality)
  31. if fsize(out_buf) <= max_size:
  32. return out_buf
  33. quality -= 5
  34. def split_string_by_utf8_length(string, max_length, max_split=0):
  35. encoded = string.encode("utf-8")
  36. start, end = 0, 0
  37. result = []
  38. while end < len(encoded):
  39. if max_split > 0 and len(result) >= max_split:
  40. result.append(encoded[start:].decode("utf-8"))
  41. break
  42. end = min(start + max_length, len(encoded))
  43. # 如果当前字节不是 UTF-8 编码的开始字节,则向前查找直到找到开始字节为止
  44. while end < len(encoded) and (encoded[end] & 0b11000000) == 0b10000000:
  45. end -= 1
  46. result.append(encoded[start:end].decode("utf-8"))
  47. start = end
  48. return result
  49. def get_path_suffix(path):
  50. path = urlparse(path).path
  51. return os.path.splitext(path)[-1].lstrip('.')
  52. def convert_webp_to_png(webp_image):
  53. from PIL import Image
  54. try:
  55. webp_image.seek(0)
  56. img = Image.open(webp_image).convert("RGBA")
  57. png_image = io.BytesIO()
  58. img.save(png_image, format="PNG")
  59. png_image.seek(0)
  60. return png_image
  61. except Exception as e:
  62. logger.error(f"Failed to convert WEBP to PNG: {e}")
  63. raise
  64. def parse_markdown_text(text: str) -> List[Dict]:
  65. """
  66. 解析包含图片和文件链接的混合内容文本。code by sonnet3.5
  67. 参数:
  68. text (str): Markdown格式文本,包含图片和文件链接
  69. 返回:
  70. list: 包含不同类型内容(文本、图片、文件)的字典列表,每个字典包含类型和内容键值对
  71. example:
  72. text = "这是一篇图片与文件混合的文章\n这是图片1 ![Image1](/file/path/1.jpg)\n这是文件1 [file1](https://example.com/file.pdf)\n这是剩余的部分\n文件2 [file2](/file/path/2.docx)\n这是图片2 ![Image2](https://example.com/image2.png) 末尾文本")
  73. result = [
  74. {
  75. "type": "text",
  76. "content": "这是一篇图片与文件混合的文章\n 这是图片1"
  77. },
  78. {
  79. "type": "image",
  80. "content": "/file/path/1.jpg"
  81. },
  82. {
  83. "type": "text",
  84. "content": "这是文件1"
  85. },
  86. {
  87. "type": "file",
  88. "content": "https://example.com/file.pdf"
  89. },
  90. {
  91. "type": "text",
  92. "content": "这是剩余的部分\n 文件2"
  93. },
  94. {
  95. "type": "file",
  96. "content": "/file/path/2.docx"
  97. },
  98. {
  99. "type": "text",
  100. "content": "这是图片2"
  101. },
  102. {
  103. "type": "image",
  104. "content": "https://example.com/image2.png"
  105. },
  106. {
  107. "type": "text",
  108. "content": "末尾文本"
  109. }
  110. ]
  111. """
  112. # 定义正则表达式模式,匹配图片和文件链接的Markdown语法
  113. # (!\[.*?\]\((.*?)\)) 匹配图片: ![alt text](url)
  114. # (\[.*?\]\((.*?)\)) 匹配文件链接: [text](url)
  115. pattern = r'(!\[.*?\]\((.*?)\)|\[.*?\]\((.*?)\))'
  116. # 使用正则表达式分割文本
  117. # 这将产生一个列表,其中包含文本、完整匹配、图片URL和文件URL
  118. parts = re.split(pattern, text)
  119. # 初始化结果列表和当前文本变量
  120. result = []
  121. current_text = ""
  122. # 遍历分割后的部分,每次跳过4个元素
  123. # 因为每个匹配项产生4个部分:文本、完整匹配、图片URL(如果有)、文件URL(如果有)
  124. for i in range(0, len(parts), 4):
  125. # 如果存在文本部分,添加到当前文本
  126. if parts[i].strip():
  127. current_text += parts[i].strip()
  128. # 检查是否存在匹配项(图片或文件)
  129. if i + 1 < len(parts) and parts[i + 1]:
  130. # 如果有累积的文本,添加到结果列表
  131. if current_text:
  132. result.append({"type": "text", "content": current_text})
  133. current_text = "" # 重置当前文本
  134. # 检查是否为图片
  135. if parts[i + 2]:
  136. result.append({"type": "image", "content": parts[i + 2]})
  137. # 如果不是图片,则为文件
  138. elif parts[i + 3]:
  139. result.append({"type": "file", "content": parts[i + 3]})
  140. # 处理最后可能剩余的文本
  141. if current_text:
  142. result.append({"type": "text", "content": current_text})
  143. return result