病虫害AI识别系统-微信小程序
本文最后更新于 2024-10-01,文章内容可能已经过时。
病虫害AI识别系统-微信小程序
开发工具及语言
开发工具:
微信小程序开发工具
HBuilder X
涉及语言:
vue3,uniapp,mysql,python,nodejs
注:电脑必须由这些环境
前端部署
首先确保您的电脑正确安装node如果没有安装请去以下途径安装
https://nodejs.org/zh-cn
解压程序,在根目录运行
安装完成使用以下命令更换国内镜像源
npm config set registry https://registry.npmmirror.com
设置完成后,在文件根目录调出cmd
输入
npm install -g @vue/cli
然后安装所需要的模块
npm update
使用HBuilder X
打开项目修改 src\dataurl.js
中的IP为服务端IP
const BASE_URL = 'http://127.0.0.1:3000';
export default BASE_URL;
然后以微信小程序模式运行
后端部署
1. 环境要求
- Python 3.x
- MySQL 数据库
- Flask
- Flask-CORS
- Flask-MySQL-Connector
- Werkzeug
- requests
- jwt
确保您的服务器上已经安装了上述依赖项,如果没有,可以使用以下命令安装:
pip install Flask Flask-CORS flask-mysql-connector werkzeug requests PyJWT
2. 数据库配置
首先,创建一个名为 my_vx_app
的数据库,并在数据库中创建以下表:
用户表:users
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
phone VARCHAR(20) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
nickname VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
识别记录表:processed_images
CREATE TABLE processed_images (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
image_path VARCHAR(255) NOT NULL,
processed_text TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
3. 配置server.py
和ai-api.py
server.py
# server.py
from flask import Flask, jsonify, request, send_from_directory, session
from flask_cors import CORS
from flask_mysql_connector import MySQL
from werkzeug.security import generate_password_hash, check_password_hash
import requests
import base64
import os
import uuid
import jwt
from datetime import datetime, timedelta
from functools import wraps
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key' # 请替换为您的密钥
# MySQL 数据库配置
app.config['MYSQL_HOST'] = 'localhost'
app.config['MYSQL_USER'] = 'root'
app.config['MYSQL_PASSWORD'] = '1234' # 替换为您的 MySQL 密码
app.config['MYSQL_DATABASE'] = 'my_vx_app'
mysql = MySQL(app)
# 允许跨域请求
CORS(app, supports_credentials=True)
# 设置图片上传目录
UPLOAD_FOLDER = 'uploads'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# 生成 Token
def generate_token(user_id):
payload = {
'exp': datetime.utcnow() + timedelta(hours=1), # Token 有效期1小时
'iat': datetime.utcnow(),
'sub': user_id
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
# 验证 Token 的装饰器
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
if not token:
return jsonify({"success": False, "message": "Token is missing"}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user_id = data['sub']
except jwt.ExpiredSignatureError:
return jsonify({"success": False, "message": "Token has expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"success": False, "message": "Invalid token"}), 401
return f(current_user_id, *args, **kwargs)
return decorated
# 注册用户
@app.route('/api/register', methods=['POST'])
def register():
data = request.json
phone = data.get('phone')
password = data.get('password')
nickname = data.get('nickname')
conn = mysql.connection
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,))
existing_user = cursor.fetchone()
if existing_user:
return jsonify({"success": False, "message": "该手机号已注册"}), 400
hashed_password = generate_password_hash(password)
cursor.execute("INSERT INTO users (phone, password, nickname) VALUES (%s, %s, %s)",
(phone, hashed_password, nickname))
conn.commit()
cursor.close()
return jsonify({"success": True, "message": "注册成功", "userInfo": {"phone": phone, "nickname": nickname}}), 201
# 用户登录
@app.route('/api/login', methods=['POST'])
def login():
data = request.json
phone = data.get('phone')
password = data.get('password')
conn = mysql.connection
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,))
user = cursor.fetchone()
cursor.close()
if not user or not check_password_hash(user['password'], password):
return jsonify({"success": False, "message": "账号或密码错误"}), 401
token = generate_token(user['id'])
return jsonify({"success": True, "message": "登录成功", "userInfo": {"phone": user['phone'], "nickname": user['nickname'], "token": token}}), 200
@app.route('/api/changePassword', methods=['POST'])
@token_required
def change_password(current_user_id):
data = request.json
old_password = data.get('oldPassword')
new_password = data.get('newPassword')
# 查询用户信息
conn = mysql.connection
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (current_user_id,))
user = cursor.fetchone()
if not user or not check_password_hash(user['password'], old_password):
return jsonify({"success": False, "message": "旧密码错误"}), 401
# 更新密码
hashed_new_password = generate_password_hash(new_password)
cursor.execute("UPDATE users SET password = %s WHERE id = %s", (hashed_new_password, current_user_id))
conn.commit()
cursor.close()
return jsonify({"success": True, "message": "密码修改成功"}), 200
# 用户登出
@app.route('/api/logout', methods=['POST'])
@token_required
def logout(current_user_id):
# 清除会话或 Token(这里根据实际需要实现)
return jsonify({"success": True, "message": "已退出登录"}), 200
# 获取识别记录
@app.route('/api/getRecords', methods=['GET'])
@token_required
def get_records(current_user_id):
conn = mysql.connection
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, image_path, processed_text, created_at FROM processed_images WHERE user_id = %s ORDER BY created_at DESC", (current_user_id,))
records = cursor.fetchall()
cursor.close()
for record in records:
record['image_path'] = f"{request.url_root}uploads/{os.path.basename(record['image_path'])}"
return jsonify({"success": True, "records": records}), 200
# 获取单条记录的详细信息
@app.route('/api/getRecordDetail', methods=['GET'])
@token_required
def get_record_detail(current_user_id):
record_id = request.args.get('id')
conn = mysql.connection
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, image_path, processed_text, created_at FROM processed_images WHERE id = %s AND user_id = %s", (record_id, current_user_id))
record = cursor.fetchone()
cursor.close()
if not record:
return jsonify({"success": False, "message": "记录不存在"}), 404
record['image_path'] = f"{request.url_root}uploads/{os.path.basename(record['image_path'])}"
return jsonify({"success": True, "record": record}), 200
# 提供静态文件服务
@app.route('/uploads/<filename>', methods=['GET'])
def get_uploaded_file(filename):
try:
return send_from_directory(UPLOAD_FOLDER, filename)
except Exception as e:
return jsonify({"success": False, "message": f"Error loading image: {str(e)}"}), 500
# 处理用户上传的图片
@app.route('/api/processImage', methods=['POST'])
@token_required
def process_image(current_user_id):
file = request.files.get('file')
if not file:
return jsonify({"success": False, "message": "没有上传文件"}), 400
image_id = str(uuid.uuid4())
filename = f"{image_id}.png"
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
with open(file_path, "rb") as img_file:
base64_image = base64.b64encode(img_file.read()).decode('utf-8')
external_service_url = 'http://127.0.0.1:5000/process'
try:
response = requests.post(external_service_url, json={"image": base64_image})
response_data = response.json()
if response.status_code == 200 and 'text' in response_data:
processed_text = response_data['text']
conn = mysql.connection
cursor = conn.cursor()
cursor.execute("INSERT INTO processed_images (user_id, image_path, processed_text) VALUES (%s, %s, %s)",
(current_user_id, file_path, processed_text))
conn.commit()
cursor.close()
return jsonify({"success": True, "imageUrl": f"http://127.0.0.1:3000/uploads/{filename}", "text": processed_text}), 200
else:
return jsonify({"success": False, "message": "外部服务处理失败"}), 500
except requests.RequestException as e:
print(f"请求外部服务接口失败: {e}")
return jsonify({"success": False, "message": "外部服务请求失败"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3000)
ai-api.py(只用于测试,实际对接您的图像api)
# ai-api.py
from flask import Flask, request, jsonify
import base64
import time
app = Flask(__name__)
# 模拟外部接口处理图像的接口
@app.route('/process', methods=['POST'])
def process_image():
data = request.json
image_base64 = data.get("image")
if not image_base64:
return jsonify({"success": False, "message": "未收到图像数据"}), 400
try:
# 模拟图像处理
image_data = base64.b64decode(image_base64)
# 在这里可以模拟保存或者处理图像
time.sleep(2) # 模拟处理时间
# 模拟处理结果文本
processed_text = "这是模拟的处理结果文本"
# 返回处理后的结果
return jsonify({"success": True, "text": processed_text}), 200
except Exception as e:
print(f"图像处理失败: {e}")
return jsonify({"success": False, "message": "图像处理失败"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
- 确保将
server.py
中的以下配置项替换为您自己的值:app.config['SECRET_KEY']
- 替换为您的密钥app.config['MYSQL_HOST']
- 数据库主机app.config['MYSQL_USER']
- 数据库用户名app.config['MYSQL_PASSWORD']
- 数据库密码
4. 运行 Flask 应用程序
首先,确保在项目根目录中创建一个名为 uploads
的文件夹,用于存储用户上传的图片:
mkdir uploads
然后,启动 server.py
和 ai-api.py
:
python server.py
python ai-api.py
server.py
将在http://localhost:3000
上运行ai-api.py
将在http://localhost:5000
上运行
5. 访问 API
以下是一些可用的 API 端点:
- 注册:
POST /api/register
- 登录:
POST /api/login
- 修改密码:
POST /api/changePassword
- 登出:
POST /api/logout
- 上传图片并识别:
POST /api/processImage
- 获取识别记录:
GET /api/getRecords
- 获取单条记录详细信息:
GET /api/getRecordDetail
6. 注意事项
ai-api.py
模拟了一个外部图像处理服务,因此在生产环境中,您可能需要替换为实际的图像处理服务。- 在实际生产环境中,请确保使用更强的加密密钥,并启用 HTTPS 以提高安全性。
按照以上步骤,您应该能够顺利部署和运行您的应用程序。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,完整转载请注明来自 枫の屋
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果