👨💻博客主页:i新木优子👀
🎉欢迎关注🔍点赞👍收藏⭐留言📝
🧚♂️寄语:成功的秘诀就是每天都比别人多努力一点👣
✨有任何疑问欢迎评论探讨
先声明一下:免费的代理稳定性都不高,即使经过层层筛选有些可能还是不能用,就像矮子里拔高的,即使已经是矮子里最高的,可是还是改变不了是矮子的本质
在做任何事情之前我们都需要先思考,要如何实现?需要用到什么?等等一系列的问题都要想清楚,要先将思路理清了,做起事来才能事半功倍
🎯下面是我做这个项目的思路,可能并不是很好,有更好的想法欢迎留言讨论
代理IP池:
自身:
- 能采集代理IP(用爬虫抓取网站即可)
采集到的IP我们的将它存储起来,这就有一个问题我们要将这些IP存储到哪里?
Mysql?MongoDB?还是Redis?
Mysql:它当然可以存储IP,可是它也有它的局限性,Mysql不能去重,因为有时我们采集到的IP可能一样,还有一个问题就是Mysql查询效率低
MongoDB:也可以存储IP,但它也不能去重
Redis:最合适,首先它的查询效率最高,还有良好的去重的集合(zset)
为什么要用zset呢?
zset有一个特性,他有一个分值(score),我们可以通过控制分值的高低就可以将稳定性高的IP取出来,从而提高免费IP的可用性
不了解Redis基本用法的小伙伴可以去看一下我的上一篇博客哦 - 能验证IP的有效性
先将每个IP定一个初始分值(50),然后对每个IP都进行校验,如果这个IP可用那么就将这个IP的分值拉满(100),如果不可用就进行扣分(10),直到IP变成0分,就将这个IP删除
对外:
- 提供免费的可用的代理IP
思路理清了,接下来就是如何写程序了
采集:写爬虫抓取IP,将IP存储到Redis
校验:从Redis中取出IP,用IP简单发送一个请求,如果可以正常返回,证明该IP可用
提供:写api接口,将可用的IP提供给用户
如果我们按照单线程去完成上面的步骤,就有局限性,只有每次将IP提供给用户,才可以继续采集IP,而我们希望的是这三个步骤互不影响,不管采集、校验还是给用户提供IP,都应该是一直进行,在提供IP的时候也可以继续采集、校验
三个独立的程序,我们就可以用多进程
下图就是IP代理池的模型:

仔细观察上图,三个操作都用到了Redis,所以就先写Redis涉及到的各种操作,再写其他三个功能就可以游刃有余了
1️⃣Redis的各种操作
- 连接Redis
- zset存储
判断IP存不存在,不存在就新增 - 查询所有IP(校验IP时要用到)
- 将分值拉满(IP可用)
- 将分值降低(IP不可用)
- 查询可用的IP
先给满分的,没有满分的给51-99分的
# redis的各种操作from redis import Redis
from settings import *class ProxyRedis:# 连接redisdef __init__(self):self.red = Redis(host=REDIS_HOST,port=REDIS_PORT,db=REDIS_DB,password=REDIS_PASSWORD,decode_responses=True)# 存储ipdef add_proxy_ip(self, ip):# 判断是否有ipif not self.red.zscore(REDIS_KEY, ip):self.red.zadd(REDIS_KEY, {ip: DEFAULT_SCORE})print("采集到了IP地址了", ip)else:print("采集到了IP地址了", ip, "但是已经存在")# 查询所有ipdef get_all_proxy(self):return self.red.zrange(REDIS_KEY, 0, -1)# 将分值拉满def set_max_score(self, ip):self.red.zadd(REDIS_KEY, {ip: MAX_SCORE})# 降低分值def reduce_score(self, ip):# 查询分值score = self.red.zscore(REDIS_KEY, ip)# 如果有分值,扣分if score > 0:self.red.zincrby(REDIS_KEY, -10, ip)else: # 分值没有则删除self.red.zrem(REDIS_KEY, ip)# 查询可用ipdef get_avail_proxy(self):lis = []ips = self.red.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE, 0, -1)if ips:lis.append(ips)return liselse:ips = self.red.zrangebyscore(REDIS_KEY, DEFAULT_SCORE + 1, MAX_SCORE - 1, 0, -1)if ips:lis.append(ips)return liselse:print("没有可用ip")return None
2️⃣采集IP
这里我爬取了三个网站,当然感觉不够用的自己还可以加
快代理:https://www.kuaidaili.com/free/intr/1/
高可用全球免费代理IP库:https://ip.jiangxianli.com/?page=1
66免费代理网:http://www.66ip.cn/areaindex_1/1.html
爬取这些网站很简单,基本都没有什么反爬,页面也都差不多,直接用xpath解析就可以得到想要的IP
# 代理IP的采集
from proxy_redis import ProxyRedis
import requests
from lxml import etree
import timeheaders = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
}# 采集快代理
def get_kuai_ip(red):url = "https://www.kuaidaili.com/free/intr/1/"resp = requests.get(url, headers=headers)tree = etree.HTML(resp.text)trs = tree.xpath("//table/tbody/tr")for tr in trs:ip = tr.xpath("./td[1]/text()") # ip地址port = tr.xpath("./td[2]/text()") # 端口if not ip:continueip = ip[0]port = port[0]proxy_ip = ip + ":" + portred.add_proxy_ip(proxy_ip) # 增加ip地址# 采集66免费代理网
def get_66_ip(red):url = "http://www.66ip.cn/areaindex_1/1.html"resp = requests.get(url, headers=headers)tree = etree.HTML(resp.text)trs = tree.xpath("//table//tr")[1:]for tr in trs:ip = tr.xpath("./td[1]/text()") # ip地址port = tr.xpath("./td[2]/text()") # 端口if not ip:continueip = ip[0]port = port[0]proxy_ip = ip + ":" + portred.add_proxy_ip(proxy_ip) # 增加ip地址# 采集高可用全球免费代理IP库
def get_quan_ip(red):url = "https://ip.jiangxianli.com/?page=1"resp = requests.get(url, headers=headers)tree = etree.HTML(resp.text)trs = tree.xpath("//table//tr")for tr in trs:ip = tr.xpath("./td[1]/text()") # ip地址port = tr.xpath("./td[2]/text()") # 端口if not ip:continueip = ip[0]port = port[0]proxy_ip = ip + ":" + portred.add_proxy_ip(proxy_ip) # 增加ip地址def run():red = ProxyRedis() # 创建redis存储while True:try:get_kuai_ip(red) # 采集快代理get_66_ip(red) # 采集66免费代理get_quan_ip(red) # 采集全球免费ip代理库except:print("出错了")time.sleep(60) # 每分钟跑一次if __name__ == '__main__':run()
3️⃣校验IP可用性
- 查询所有的IP
- 每一个IP都发送一个请求,可用分值拉满,不用可扣分
这里如果我们采集的IP比较多的话,用单线程就比较慢了,所以为了提高效率,这里我采用协程
# 代理IP的验证
from proxy_redis import ProxyRedis
from settings import *
import asyncio
import aiohttp
import timeasync def verify_one(ip, sem, red):print(f"开始检测{ip}")timeout = aiohttp.ClientTimeout(total=10) # 设置超时时间,超过10秒就报错try:async with sem:async with aiohttp.ClientSession() as session:async with session.get("http://www.baidu.com/", proxy="http://" + ip, timeout=timeout) as resp: # 简单发送一个请求page_source = await resp.text()if resp.status in [200, 302]: # 验证状态码# 将分值拉满red.set_max_score(ip)print(f"检测到{ip}是可用的")else:red.reduce_score(ip)print(f"检测到{ip}是不可用的, 扣10分")except Exception as E:print("ip检验时出错了", E)red.reduce_score(ip)print(f"检测到{ip}是不可用的, 扣10分")async def main(red):# 查询全部ipall_proxy = red.get_all_proxy()sem = asyncio.Semaphore(SEM_COUNT) # 控制并发量tasks = []for ip in all_proxy:tasks.append(asyncio.create_task(verify_one(ip, sem, red)))if tasks:await asyncio.wait(tasks)def run():red = ProxyRedis()time.sleep(10)while True:try:asyncio.run(main(red))time.sleep(100)except Exception as e:print("校验时报错了", e)time.sleep(100)if __name__ == '__main__':run()
4️⃣提供api
- 给用户提供一个http接口,用户通过访问
http://xxx.xxx.xxx.xxx:xxxx/get_proxy就可获取到IP
安装提供api接口的模块
pip install sanic
pip install sanic_cors # 防止出现跨域的模块
# 代理的IP的api接口
from proxy_redis import ProxyRedis
from sanic import Sanic, json
from sanic_cors import CORS# 1. 创建app
app = Sanic("ip")
# 2. 解决跨域
CORS(app)red = ProxyRedis()# 3. 准备处理http请求的函数
@app.route("/get_proxy") # 路由配置
def dispose(rep):ip_list = red.get_avail_proxy()return json({"ip": ip_list}) # 返回给客户端def run():app.run(host="127.0.0.1", port=5800)if __name__ == '__main__':run()
5️⃣启动采集IP、校验IP、提供api
将三个功能串在一起,每一个功能开一个进程
from ip_api import run as api_run
from ip_collection import run as col_run
from ip_verify import run as ver_run
from multiprocessing import Processdef run():# 启动三个进程p1 = Process(target=api_run)p2 = Process(target=col_run)p3 = Process(target=ver_run)p1.start()p2.start()p3.start()if __name__ == '__main__':run()
下面代码是代理IP池的配置文件,想要修改参数的直接修改配置文件中的就行
# 配置文件# proxy_redis
# redis主机ip地址
REDIS_HOST = "127.0.0.1"
# redis端口号
REDIS_PORT = 6379
# redis数据库编号
REDIS_DB = 2
# redis的密码
REDIS_PASSWORD = "123456"# redis的key
REDIS_KEY = "proxy_ip"# 默认的ip分值
DEFAULT_SCORE = 50
# 满分
MAX_SCORE = 100# ip_verify
# 一次检测ip的数量
SEM_COUNT = 30
6️⃣到这里我们的IP代理池就已经完成了



我们可以看到程序可以正常执行
然后去看一下我们的Redis中是否有IP

我们访问http://127.0.0.1:5800/get_proxy检测用户是否可以拿到IP

7️⃣检验IP代理池中的IP是否可用
免费IP代理池已经搭建好了,接下来就从IP代理池中取出来IP,检测IP是否可以使用
我们的IP有很多,使用这些IP最好的方法是将存放IP的列表进行循环,每拿一个IP访问一次或多次就换一个IP在访问,所以就需要写一个生成器
import requestsheaders = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
}def get_proxy():url = "http://127.0.0.1:5800/get_proxy"resp = requests.get(url, headers=headers)ips = resp.json()for ip in ips["ip"][0]:yield ip # 生成器def spider():url = "http://www.baidu.com/"while True:try:proxy_ip = next(gen)proxy = {"http:": "http:" + proxy_ip,"https:": "http:" + proxy_ip}resp = requests.get(url, proxies=proxy, headers=headers)resp.encoding = "utf-8"return resp.textexcept:print("代理失效了")if __name__ == '__main__':gen = get_proxy()page_source = spider()print(page_source)
可以拿到页面源代码表示我们的代理IP可用







![mysql一张表复制到另外一张表,报错[23000][1062] Duplicate entry ‘1‘ for key ‘big_2.PRIMARY‘](https://img-blog.csdnimg.cn/img_convert/0e88983003b16ef11aae3a22819d5821.png)


![[23000][1452] Cannot add or update a child row: a foreign key constraint fails (`test2`.`#sql-1238_5](https://img-blog.csdnimg.cn/8290f2862edd4b0384d8e3811d2b6f8c.gif)

![[23000][1062] Duplicate entry ‘6‘ for key ‘PRIMARY‘](https://img-blog.csdnimg.cn/dd6f11dc96204dccaaaac85d7085be65.gif)
![SQLSTATE[23000]: Integrity constraint violation:1062 Duplicate entry1664187678631531497821000‘ 解决办法](https://img-blog.csdnimg.cn/79832d39af8c43f2bf67e08f5da5ffd2.png)


