Python 网络请求
Python 网络请求
魔力刘易斯安装 Python 库
python 网络请求需要两个库 requests 获取网站信息,bs4 解析信息
pip install requests bs4
代码实现示例
访问
网络请求需要 4 个参数:
URL
:访问的网址headers
:浏览器标识(可选)cookies
:登录信息(可选)post
:网络请求(可选)
headers,cookies 和 post 可在 右键-检查-网络 中查看
URL = ""
headers = {}
cookies = {}
post = {}
response = requests.post(
URL, cookies=cookies, headers=headers, data=post
)
print(response.text)
response
:自定义变量 接收返回的网站信息response.status_code
:状态码,200 为成功response.ok
:bool 变量,访问成功为 Trueresponse.txt
:网站内容
解析
根据需要获取网站的内容
content = response.text # 返回网页内容
soup = bs4.BeautifulSoup(content, "html.parser") # 解析网页内容
all_products = soup.find_all(class_="product_pod") # 找到所有class为product_pod的标签
for product in track(all_products, description="正在爬取..."):
name = product.h3.a["title"] # 获取名字
price = product.find(class_="price_color").text[2:] # 获取价格
完整代码 + 改良
from requests import get
from bs4 import BeautifulSoup
from rich.progress import Progress
from rich.panel import Panel
from os import path, remove
from sys import argv
from threading import Thread
from queue import Queue
class FrameProgress(Progress): # 创建边框进度条
def get_renderables(self):
yield Panel(self.make_tasks_table(self.tasks), expand=False)
def get_price(i, q): # 爬取价格
response = get(f"https://books.toscrape.com/catalogue/page-{i}.html")
price_file = f"{path.join(path.dirname(argv[0]), f'price{i}.txt')}"
if response.ok:
content = response.text
soup = BeautifulSoup(content, "html.parser")
all_products = soup.find_all(class_="product_pod")
with open(price_file, "w", encoding="utf-8") as f:
for product in all_products:
name = product.h3.a["title"]
price = product.find(class_="price_color").text[2:]
f.write(f"书名:{name}\t价格:{price}\n")
status = "成功!"
else:
status = "失败!!!"
log = {"status": status, "page": i}
q.put(log)
def all_price(): # 合并价格
all_price_file = f"{path.join(path.dirname(argv[0]), 'all_price.txt')}"
with open(all_price_file, "w", encoding="utf-8") as all_price:
for i in range(10):
price_file = f"{path.join(path.dirname(argv[0]), f'price{i+1}.txt')}"
if path.exists(price_file):
with open(price_file, encoding="utf-8") as price:
all_price.write(price.read())
remove(price_file)
print("爬取完成!")
if __name__ == "__main__":
q = Queue()
for i in range(10):
Thread(name=f"第{i+1}页", target=get_price, args=(i + 1, q)).start() # 创建线程
with FrameProgress() as progress: # 创建进度条
task = progress.add_task("爬取进度:", total=10) # 添加任务
for i in range(10): # 监听队列
item = q.get()
progress.console.print(f"第{item['page']}页爬取{item['status']}")
progress.update(task, advance=1)
q.task_done()
all_price() # 合并价格