Python 网络请求

安装 Python 库

python 网络请求需要两个库 requests 获取网站信息,bs4 解析信息

pip install requests bs4

代码实现示例

访问

网络请求需要 4 个参数:

  • URL:访问的网址

  • headers:浏览器标识(可选)

  • cookies:登录信息(可选)

  • post:网络请求(可选)

headers,cookies 和 post 可在 右键-检查-网络 中查看

URL = ""
headers = {}
cookies = {}
post = {}

response = requests.post(
    URL, cookies=cookies, headers=headers, data=post
)
print(response.text)
  • response:自定义变量 接收返回的网站信息

  • response.status_code:状态码,200 为成功

  • response.ok:bool 变量,访问成功为 True

  • response.txt:网站内容

解析

根据需要获取网站的内容

content = response.text # 返回网页内容
soup = bs4.BeautifulSoup(content, "html.parser") # 解析网页内容
all_products = soup.find_all(class_="product_pod") # 找到所有class为product_pod的标签

for product in track(all_products, description="正在爬取..."):
    name = product.h3.a["title"] # 获取名字
    price = product.find(class_="price_color").text[2:] # 获取价格

完整代码 + 改良

from requests import get
from bs4 import BeautifulSoup
from rich.progress import Progress
from rich.panel import Panel
from os import path, remove
from sys import argv
from threading import Thread
from queue import Queue

class FrameProgress(Progress): # 创建边框进度条
    def get_renderables(self):
        yield Panel(self.make_tasks_table(self.tasks), expand=False)

def get_price(i, q): # 爬取价格
    response = get(f"https://books.toscrape.com/catalogue/page-{i}.html")
    price_file = f"{path.join(path.dirname(argv[0]), f'price{i}.txt')}"

    if response.ok:
        content = response.text

        soup = BeautifulSoup(content, "html.parser")
        all_products = soup.find_all(class_="product_pod")

        with open(price_file, "w", encoding="utf-8") as f:
            for product in all_products:
                name = product.h3.a["title"]
                price = product.find(class_="price_color").text[2:]

                f.write(f"书名:{name}\t价格:{price}\n")
        status = "成功!"
    else:
        status = "失败!!!"
    log = {"status": status, "page": i}
    q.put(log)

def all_price(): # 合并价格
    all_price_file = f"{path.join(path.dirname(argv[0]), 'all_price.txt')}"
    with open(all_price_file, "w", encoding="utf-8") as all_price:
        for i in range(10):
            price_file = f"{path.join(path.dirname(argv[0]), f'price{i+1}.txt')}"
            if path.exists(price_file):
                with open(price_file, encoding="utf-8") as price:
                    all_price.write(price.read())
                remove(price_file)
    print("爬取完成!")

if __name__ == "__main__":
    q = Queue()
    for i in range(10):
        Thread(name=f"第{i+1}页", target=get_price, args=(i + 1, q)).start() # 创建线程
    with FrameProgress() as progress:  # 创建进度条
        task = progress.add_task("爬取进度:", total=10)  # 添加任务
        for i in range(10):  # 监听队列
            item = q.get()
            progress.console.print(f"第{item['page']}页爬取{item['status']}")
            progress.update(task, advance=1)
            q.task_done()
    all_price()  # 合并价格