线程

threading

概念:

区分线程和进程

  • 线程:==执行单位==。 它被包含在进程之中,是进程中的实际运作单位。

    线程是操作系统能够进行运算调度的最小单位。

    一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

  • 进程:==资源单位==。进程是程序的实体,每个进程都要占用一定内存。

    计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

进程里必定有至少一个线程。进程如同项目,线程如同员工,项目的运作至少需要一个员工。

多进程应用:

  1. 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from threading import Thread
    def func():
    for i in range(1,1000):
    print(f"Child :{i}")

    t = Thread(target= func) # t 为线程对象,Thread里的target参数设置工作内容
    t.start() #设置线程状态为 开始 (只是状态,实际执行时间由CPU决定)
    if __name__ = '__main__':
    for i in range(1,1000):
    print(f"Base :{i}")
  2. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from threading import Thread

    class Mythread(Thread): #继承 Thread 类
    def run(self): #固定格式,线程状态开启,自动运行
    for i in range(1, 1000):
    print(f"Child :{i}")

    if __name__ == '__main__':
    t = Mythread()
    #t.run #直接调用,为当前线程的单线程执行
    t.start() #开启线程
    for i in range(1,1000):
    print(f"Base :{i}")

遇到参数

  1. 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from threading import Thread
    def func(dj):
    for i in range(1,1000):
    print(dj,i)
    #args 参数必须以元组形式发出
    t1 = Thread(target= func,args=("Jay_Z", ))
    t1.start()
    t2 = Thread(target= func,args=("kan_ye", ))
    t2.start()
  2. 1
    2
    3
    4
    5
    6
    7
    8
    9
    from threading import Thread
    def func(dj):
    for i in range(1,1000):
    print(dj,i)
    #args 参数必须以元组形式发出
    t1 = Thread(target= func,args=("Jay_Z", ))
    t1.start()
    t2 = Thread(target= func,args=("kan_ye", ))
    t2.start()

多进程

多线程应用的API和多进程一样,只是使用库为 multiprocessing . Process

1
from multiprocessing import Process

(这只是因为Python方便用户设置的,实际两者底层原理完全不同)

线程池

线程池:一次性开辟一些线程,用户直接给线程池子提交任务,线程任务的调度交给线程池来完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from concurrent.futures import ThreadPoolExecutor ,
#打个比喻,这里50个线程可以理解成50个人,for...range(100)相当于有100个任务,
#每个任务是从1数到1000,只要一个人(线程)数完了(执行完1-1000),就可以继续数下一个任务

def fn(name):
for i in range(1000):
print(name, i)

if __name__ =='__main__':
#创建线程池 (该例中有50个线程)
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(fn, name=f"线程{i}")
#线程池任务全部执行完毕后,才继续执行(守护)
print("over")

进程池: 可以提供指定数量的进程给用户使用,

即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;

反之,如果池中进程数为最大值,则请求等待;池中有进程空闲,则请求得到执行。

线程池示例:新发地菜市场价格爬取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 1. 如何提取单个页面的数据
# 2. 上线程池,多个页面同时抓取
import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor

f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)


def download_one_page(url):
# 拿到页面源代码
resp = requests.get(url)
html = etree.HTML(resp.text)
table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
trs = table.xpath("./tr[position()>1]")
# 拿到每个tr
for tr in trs:
txt = tr.xpath("./td/text()")
# 对数据做简单的处理: “\\”, “/”去掉
txt = (item.replace("\\", "").replace("/", "") for item in txt)
# 把数据存放在文件中
csvwriter.writerow(txt)
print(url, "提取完毕!")


if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(1, 200): # 199 * 20 = 3980
# 把下载任务提交给线程池
t.submit(download_one_page, f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")

print("全部下载完毕!")

协程

概念:当程序遇见了IO操作的时候,可以选择性切换到其他任务上。

这种切换:

  • 微观上是一个任务切换到另一个任务

  • 宏观上则是多任务异步操作:多个任务同时执行(因为切换速度太快)

切换的操作:input,request.get()网络请求返回数据,sleep等。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time,asyncio

async def func1():
print("任务一 1")
#time.sleep(3) # time.sleep为同步操作,此时异步操作会中断
await asyncio.sleep(3)
print("任务一 2")

async def func2():
print("任务二 1")
await asyncio.sleep(4)
print("任务二 2")

async def func3():
print("任务三 1")
await asyncio.sleep(5)
print("任务三 2")

if __name__ == '__main__':
# tasks = [func1(),func2(),func3()] #这种写法3.8以后会有警告
tasks = [
asyncio.create_task(func1)
asyncio.create_task(func2)
asyncio.create_task(func2)
]
t1 = time.time()
asyncio.run(asyncio.wait(tasks))
t2 = time.time()
print(t2-t1) # 单进程同步操作:12.028307676315308
# 多进程异步操作:5.009987115859985
  1. create_task问题:这个问题对应警告(DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11)

    把协程对象创建成一个任务,即task对象,将多个task对象包装在一起就没问题了。

    (原来是包装好准备协程对象后同时执行,现在写法是逐个执行)

2.await 挂起操作 :必须在 async 函数里,且要放在协程对象前面

异步初涉:异步http请求aiohttp模块讲解

请求连接操作上,request 是同步代码 ,异步代码你得用 aiohttp 模块中的 aiohttp.ClientSession() (也许你会想到 cookie 中的会话)

示例(爬取唯美图片)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import aiohttp
import asyncio

urls = [
"http://kr.shanghai-jiuxin.com/file/2022/0607/0d0208537886e225a5902e1001b2c702.jpg",

"http://kr.shanghai-jiuxin.com/file/2022/0607/b690bb5677be51cf12fe5b7638faf6b9.jpg",

"http://kr.shanghai-jiuxin.com/file/2022/0607/b3238e128301a37d24f65157d25fe3a9.jpg"
]

async def download(url):
name = url.rsplit("/",1)[1] #懒得起名字
async with aiohttp.ClientSession() as S: #1. 发送请求
async with S.get(url) as resp: #2. 得到内容
with open(name, mode="wb") as f: #3. 下载图片
f.write(await resp.content.read()) #读取内容操作是异步的,要挂起 await
print(name,"文件已下载")
#S.get() / S.post() <==> request.get() / .post()
#S.text()/json() <==> resp.text() /.post()

async def main():
tasks = []
for url in urls:
tasks.append(asyncio.create_task(download(url)))
#包装链接(任务网址)为一个列表
await asyncio.wait(tasks)
if __name__ == "__main__":
asyncio.run(main())

简单的异步爬虫

爬取百度小说内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
'''
1.同步操作:连接目录网址,拿到cid和book_id #def getId
2.异步操作: 根据cid和book_id 连接内容网址,拿到小说内容
3.下载 #def getContend
'''
import json
import aiofiles
import aiohttp
import requests
import asyncio

url1 = 'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"}'
url2 = 'http://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|1569782244","need_bookinfo":1}'

async def getContend(book_id,cid,title):
data = {
"book_id":book_id ,
"cid":f"{book_id}|{cid}" ,
"need_bookinfo":1
}
data = json.dumps(data)
url = f"http://dushu.baidu.com/api/pc/getChapterContent?data={data}"

async with aiohttp.ClientSession() as s:
async with s.get(url) as resp:
dic = await resp.json() # 协程对象
async with aiofiles.open(title+".txt", mode="w",newline="" ,encoding="utf8") as f:
await f.write(dic["data"]["novel"]["content"])

async def getId():
tasks = []
resp = requests.get(url1)
dic = resp.json()
book_id = dic["data"]["novel"]["book_id"]
print(book_id)
for it in dic["data"]["novel"]["items"][0:5]: #切个片
title = it["title"]
cid = it["cid"]
tasks.append(asyncio.create_task(getContend(book_id,cid,title)))
await asyncio.wait(tasks)

if __name__ == "__main__":
asyncio.run(getId())

一般视频爬虫思路

网站服务器把整个源视频发送给客户端是很少见的:

  1. 传输整个视频文件太大,传输速度很慢 ,花费流量高
  2. 用户会拉进度条,存在资源浪费

一般视频网站存储思路:

用户上传 —— 转码(不同清晰度) —— 切片(将大文件视频切成连续的一小段)

切片意味着需要一个文件来保证片段的有序性,也就是记录 视频播放顺序视频存放路径。当然这个文件还可以附加视频码率等其他信息。

通常该文件格式为 ==M3U8== 。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:4 #每个视频切片最大时长
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="*/key.key"#切片文件加密方式和秘钥地址(如果有加密,则要先解密才能播放)

#EXTINF:2, #持续时间
https://hey07.cjkypo.com/20220505/9BS0wTIh/1100kb/hls/dsZ27s7p.ts #ts文件的地址
#EXTINF:2,
https://hey07.cjkypo.com/20220505/9BS0wTIh/1100kb/hls/Gw8DtNn5.ts
#EXTINF:2,
https://hey07.cjkypo.com/20220505/9BS0wTIh/1100kb/hls/EZ0UlXoC.ts
#EXTINF:2,
https://hey07.cjkypo.com/20220505/9BS0wTIh/1100kb/hls/MNb9yXCB.ts
#EXTINF:2,
https://hey07.cjkypo.com/20220505/9BS0wTIh/1100kb/hls/JisT83Nx.ts

反爬(示例网站:91视频):

m3u8文件的响应头 url 和视频网址源代码中的url有所差别:

1
2
3
4
response header:
"https://m3api.awenhao.com/index.php?note=kkRphqrd5sykfam43wzx1&raw=1&n.m3u8"
source code:
"https://m3api.awenhao.com/index.php?note=kkRtxaw975rd6qcbnz3jf&raw=1&n.m3u8"

note 中间部分会有部分字符串不同的情况(动态)。这是反爬的一种机制(仅就示例而言)。

播放视频有两步:

  1. 客户端访问html请求到服务器 —— 服务器返回html页面源代码
  2. 客户端请求得到m3u8文件播放视频 —— 服务器返回m3u8文件

在第一步进行时,服务器会生成一个动态字符串,这个字符串会赋值到接下来的m3u8文件,两者互相对应。

这样能保证先进行html页面请求,再请求m3u8的正常流程。直接请求m3u8的爬虫机制会因为动态字符串不匹配被驳回。

(动态字符串有时效性,如同5分钟验证码这种。页面长时间没有请求m3u8文件操作,也会造成m3u8响应失效,需要刷新网址重新请求)

抓取视频思路:

  1. 找到 m3u8 (各种手段)
  2. 通过 m3u8 下载 ts 文件
  3. 通过各种手段把 ts 文件合并到一起

示例代码(91看剧爬取ts):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#resp 的 text / content ,字节流和字符串的格式变换太难受了,这段代码仅供参考,实操未成功
import requests
import re
url = "http://91kanju2.com/vod-play/47487-1-1.html"

resp = requests.get(url)
obj = re.compile(r"url: '(?P<url>.*?)',",re.S)
m3u8_url = obj.search(resp.text).group("url")
resp.close()

resp2 = requests.get(m3u8_url)
#这里相当坎坷,不具备普适性。但探究精神值得借鉴。
resp3 = requests.get("https://v4.cdtlas.com/"+ resp2.text.split("/",1)[1].rstrip())

s = resp3.content.decode("utf-8")
n = 0
for line in s
line = line.strip()
if line.startswith("#"):
continue
print(line)
resp4 = requests.get(line)
with open(f"{n}.ts",mode="wb") as f:
f.write(resp4.text)
resp4.close()
n += 1

resp3.close()
resp2.close()

两个大作业

普通爬虫爬取网易云音乐评论

91视频

selenium