You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
6.6 KiB

9 months ago
9 months ago
10 months ago
8 months ago
9 months ago
10 months ago
9 months ago
10 months ago
8 months ago
9 months ago
9 months ago
8 months ago
9 months ago
10 months ago
10 months ago
9 months ago
10 months ago
9 months ago
8 months ago
10 months ago
9 months ago
9 months ago
9 months ago
9 months ago
8 months ago
9 months ago
8 months ago
9 months ago
9 months ago
10 months ago
10 months ago
9 months ago
10 months ago
10 months ago
9 months ago
10 months ago
8 months ago
10 months ago
8 months ago
9 months ago
8 months ago
9 months ago
8 months ago
9 months ago
8 months ago
10 months ago
8 months ago
8 months ago
8 months ago
10 months ago
10 months ago
9 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
9 months ago
10 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
8 months ago
8 months ago
9 months ago
8 months ago
9 months ago
  1. from fastapi import Depends, APIRouter, Query, Path
  2. from internal.models import *
  3. from internal.database import (
  4. fetch_one,
  5. fetch_all,
  6. execute_query,
  7. response_success,
  8. raise_if_exists,
  9. raise_if_not_found,
  10. )
  11. from dependencies import get_current_active_user
  12. import json
  13. router = APIRouter(prefix="/blogs", tags=["博客管理"])
  14. # 获取列表
  15. @router.get("/list")
  16. async def blog_list():
  17. # 列表参数:博客名称、博客内容、创建时间、博客图片、博客查看时间、博客阅读次数、博客字数、类型名称、标签名列表
  18. select_query = """
  19. SELECT blogs.id,blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
  20. blogs.wordcount, types.typename,JSON_ARRAYAGG(labels.labelname) AS labelnames
  21. FROM blogs
  22. LEFT JOIN `types` ON blogs.typeid = types.id
  23. LEFT JOIN blog_label ON blog_label.blogid = blogs.id
  24. LEFT JOIN labels ON blog_label.labelid = labels.id
  25. GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
  26. blogs.wordcount, types.typename ORDER BY create_at DESC;
  27. """
  28. blog_list = fetch_all(select_query)
  29. return response_success(blog_list, "blog get list success")
  30. # 博客新增
  31. @router.post("/add")
  32. async def blog_add(blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)):
  33. select_query = "SELECT * FROM blogs WHERE blogtitle = %s"
  34. existing_blog = fetch_one(select_query, (blog.blogtitle,))
  35. raise_if_exists(existing_blog, "Blog already exists")
  36. insert_query = (
  37. "INSERT INTO blogs (blogtitle, blogcontent,imglink, typeid, descr) VALUES (%s, %s, %s, %s,%s)"
  38. )
  39. insert_data = (blog.blogtitle, blog.blogcontent,
  40. blog.imglink, blog.typeid, blog.descr)
  41. blog_id = execute_query(insert_query, insert_data, lastrowid=True)
  42. for label in labels:
  43. insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)"
  44. execute_query(insert_label_query, (blog_id, label.id))
  45. return {"message": "Blog created successfully", "blog_id": blog_id}
  46. # 博客删除
  47. @router.delete("/delete/{id}")
  48. async def blog_delete(id: str = Path(description="博客id")):
  49. select_query = "SELECT * FROM blogs WHERE id = %s"
  50. existing_blog = fetch_one(select_query, (id,))
  51. raise_if_not_found(existing_blog, "blog not found")
  52. insert_query = "DELETE FROM blogs WHERE id = %s"
  53. execute_query(insert_query, (id,))
  54. return response_success(message="blog delete success")
  55. # 博客修改
  56. # @router.put("/update/{id}")
  57. # async def blog_put(blog: Blog, id: str = Path(description="博客id")):
  58. # select_query = "SELECT * FROM blogs WHERE id=%s"
  59. # existing_blog = fetch_one(select_query, (id,))
  60. # raise_if_not_found(existing_blog, "blog not found")
  61. # update_query = (
  62. # "UPDATE blogs SET blogtitle=%s,blogcontent=%s,typeid=%s,descr=%s WHERE id=%s;"
  63. # )
  64. # update_data = (blog.blogtitle, blog.blogcontent,
  65. # blog.typeid, blog.descr, id)
  66. # execute_query(update_query, update_data)
  67. # return response_success("blog update sucess")
  68. @router.put("/update/{id}")
  69. async def blog_update(id: int, blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)):
  70. # 检查要编辑的博客是否存在
  71. select_query = "SELECT * FROM blogs WHERE id = %s"
  72. existing_blog = fetch_one(select_query, (id,))
  73. raise_if_not_found(existing_blog, "blog not found")
  74. # 更新博客信息
  75. update_query = (
  76. "UPDATE blogs SET blogtitle = %s, blogcontent = %s, imglink = %s, typeid = %s, descr = %s WHERE id = %s"
  77. )
  78. update_data = (blog.blogtitle, blog.blogcontent,
  79. blog.imglink, blog.typeid, blog.descr, id)
  80. execute_query(update_query, update_data)
  81. # 首先删除原有的关联标签
  82. delete_query = "DELETE FROM blog_label WHERE blogid = %s"
  83. execute_query(delete_query, (id,))
  84. # 然后插入新的关联标签
  85. for label in labels:
  86. insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)"
  87. execute_query(insert_label_query, (id, label.id))
  88. return response_success("blog update sucess")
  89. # 博客模糊查询
  90. @router.get("/list/search")
  91. async def blog_list_search(
  92. blogtitle: str = Query(None, description="博客标题"),
  93. typename: str = Query(None, description="博客类型"),
  94. start_date: str = Query(None, description="开始时间"),
  95. end_date: str = Query(None, description="结束时间"),
  96. ):
  97. select_query = """
  98. SELECT blogs.id, blogtitle, blogcontent,wordcount, typename, create_at, update_at, blogs.descr,JSON_ARRAYAGG(labels.labelname) AS labelnames
  99. FROM blogs
  100. LEFT JOIN `types` ON blogs.typeid = types.id
  101. LEFT JOIN blog_label ON blogs.id = blog_label.blogid
  102. LEFT JOIN labels ON blog_label.labelid = labels.id
  103. WHERE 1=1
  104. """
  105. params = []
  106. if blogtitle:
  107. select_query += " AND blogtitle LIKE %s"
  108. params.append(f"%{blogtitle}%")
  109. if typename:
  110. select_query += " AND typename LIKE %s"
  111. params.append(f"%{typename}%")
  112. if start_date:
  113. select_query += " AND create_at >= %s"
  114. params.append(start_date)
  115. if end_date:
  116. select_query += " AND create_at <= %s"
  117. params.append(end_date)
  118. select_query += "GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent,blogs.wordcount, types.typename, blogs.create_at, blogs.update_at, blogs.descr ORDER BY create_at DESC"
  119. blog_list = fetch_all(select_query, params=params, fetchall=True)
  120. return response_success(data=blog_list, message="blog serach succuessfully!")
  121. # 根据id查询博客
  122. @router.get("/list/search/{id}")
  123. async def get_id_blog(id: str = Path(description="博客id")):
  124. select_query = """SELECT blogs.id, blogtitle, blogcontent,wordcount, blogs.typeid, blogs.descr,JSON_ARRAYAGG(labels.id) AS labelnames,imglink FROM blogs
  125. LEFT JOIN `types` ON blogs.typeid = types.id
  126. LEFT JOIN blog_label ON blogs.id = blog_label.blogid
  127. LEFT JOIN labels ON blog_label.labelid = labels.id
  128. WHERE blogs.id = %s
  129. GROUP BY blogs.id;
  130. """
  131. blog_list = execute_query(select_query, (id,))
  132. if blog_list and isinstance(blog_list, dict):
  133. if 'labelnames' in blog_list and isinstance(blog_list['labelnames'], str):
  134. try:
  135. blog_list['labelnames'] = json.loads(blog_list['labelnames'])
  136. except json.JSONDecodeError:
  137. blog_list['labelnames'] = []
  138. return response_success(data=blog_list, message="blog search success")
  139. # 我就测试一下