吾乃弘客联盟之士,昼夜不懈,以思破敌之策,防御外敌黑客之入侵。戒备之心常在,警惕之念不辍。犹铁壁坚墙,阻外敌之锐利,护卫我邦之安全。如此昼夜攻防,力求万无一失。
from flask import Flask, request, render_template, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
import ipaddress
import logging
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
app.config[‘SQLALCHEMY_DATABASE_URI’] = ‘sqlite:///ip_blocker.db’
app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
app.config[‘BLOCK_THRESHOLD’] = 5 # 5次失败尝试后封锁
app.config[‘BLOCK_DURATION’] = timedelta(hours=24) # 封锁24小时
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1) # 正确处理代理后的IP
db = SQLAlchemy(app)
logging.basicConfig(level=logging.INFO)
# 数据库模型
class IPAddress(db.Model):
id = db.Column(db.Integer, primary_key=True)
ip = db.Column(db.String(46), unique=True, nullable=False) # 支持IPv6
failed_attempts = db.Column(db.Integer, default=0)
last_attempt = db.Column(db.DateTime)
blocked_until = db.Column(db.DateTime)
is_permanent = db.Column(db.Boolean, default=False)
def is_blocked(self):
if not self.blocked_until:
return False
if self.is_permanent:
return True
return datetime.utcnow() < self.blocked_until
# 创建数据库
@app.before_first_request
def create_tables():
db.create_all()
# 获取客户端真实IP(考虑代理情况)
def get_client_ip():
# 检查常见代理头
headers = [
‘X-Real-IP’,
‘X-Forwarded-For’,
‘CF-Connecting-IP’, # Cloudflare
‘True-Client-IP’ # Akamai
]
for header in headers:
ip = request.headers.get(header)
if ip:
# 可能有多个IP(逗号分隔)
ips = [i.strip() for i in ip.split(‘,’)]
# 返回第一个有效IP
for candidate in ips:
try:
ipaddress.ip_address(candidate)
return candidate
except ValueError:
continue
# 没有代理头时使用远程地址
return request.remote_addr
# 记录失败尝试并检查是否封锁
def record_failed_attempt():
client_ip = get_client_ip()
logging.warning(f”可疑活动来自 IP: {client_ip}”)
# 查找或创建IP记录
ip_record = IPAddress.query.filter_by(ip=client_ip).first()
if not ip_record:
ip_record = IPAddress(ip=client_ip)
db.session.add(ip_record)
# 更新失败尝试
ip_record.failed_attempts += 1
ip_record.last_attempt = datetime.utcnow()
# 检查是否达到封锁阈值
if ip_record.failed_attempts >= app.config[‘BLOCK_THRESHOLD’]:
# 如果之前没有封锁或封锁已过期,则设置新的封锁时间
if not ip_record.blocked_until or ip_record.blocked_until < datetime.utcnow():
ip_record.blocked_until = datetime.utcnow() + app.config[‘BLOCK_DURATION’]
logging.warning(f”IP {client_ip} 已被封锁至 {ip_record.blocked_until}”)
db.session.commit()
return ip_record.is_blocked()
# 检查IP是否被封锁
def is_ip_blocked():
client_ip = get_client_ip()
ip_record = IPAddress.query.filter_by(ip=client_ip).first()
if ip_record and ip_record.is_blocked():
logging.warning(f”已封锁的IP尝试访问: {client_ip}”)
return True
return False
# 中间件:在请求处理前检查IP封锁
@app.before_request
def check_ip_block():
if is_ip_blocked():
return jsonify({
“error”: “访问被拒绝”,
“message”: “您的IP地址因可疑活动已被暂时封锁”,
“status”: 403
}), 403
# 示例登录端点(模拟攻击场景)
@app.route(‘/login’, methods=[‘POST’])
def login():
# 模拟登录失败
record_failed_attempt()
return jsonify({
“error”: “登录失败”,
“message”: “用户名或密码不正确”
}), 401
# 管理界面 – 查看封锁的IP
@app.route(‘/admin/blocked-ips’)
def blocked_ips():
blocked = IPAddress.query.filter(
IPAddress.blocked_until > datetime.utcnow()
).all()
permanent = IPAddress.query.filter_by(is_permanent=True).all()
return render_template(‘blocked_ips.html’,
blocked=blocked,
permanent=permanent,
now=datetime.utcnow())
# 管理界面 – 手动封锁IP
@app.route(‘/admin/block-ip’, methods=[‘POST’])
def block_ip():
ip = request.form.get(‘ip’)
permanent = ‘permanent’ in request.form
try:
# 验证IP地址格式
ipaddress.ip_address(ip)
except ValueError:
return jsonify({“error”: “无效的IP地址”}), 400
ip_record = IPAddress.query.filter_by(ip=ip).first()
if not ip_record:
ip_record = IPAddress(ip=ip)
db.session.add(ip_record)
if permanent:
ip_record.is_permanent = True
ip_record.blocked_until = None
else:
ip_record.blocked_until = datetime.utcnow() + timedelta(days=30)
db.session.commit()
return jsonify({“message”: f”IP {ip} 已成功封锁”})
# 管理界面 – 解除封锁
@app.route(‘/admin/unblock-ip/<int:ip_id>’, methods=[‘POST’])
def unblock_ip(ip_id):
ip_record = IPAddress.query.get(ip_id)
if not ip_record:
return jsonify({“error”: “找不到IP记录”}), 404
ip_record.blocked_until = None
ip_record.is_permanent = False
ip_record.failed_attempts = 0
db.session.commit()
return jsonify({“message”: f”IP {ip_record.ip} 已解除封锁”})
# 首页
@app.route(‘/’)
def home():
return render_template(‘index.html’)
if __name__ == ‘__main__’:
app.run(debug=True)
本网站内容及资料来源于网络,并不代表本站观点和对其真实性负责,也不构成任何其他建议;部分内容是由网友自主投稿和发布、编辑整理上传,对此类内容本站仅提供交流平台,不为其版权负责;所提供的信息,只供参考之用,不保证信息的准确性、有效性、及时性和完整性;如果您发现网站上有侵犯您的知识产权的内容,请与我们取得联系,我们会及时修改或删除。文章版权归作者所有,原创作品未经允许请勿转载。投诉请联系:admin@chnhonker.com
暂无评论内容