爬虫基本库使用

最近买了本《python3网络爬虫开发实战》,之前scrapy爬虫有过研究,但是没有系统的买书研读过,挺感兴趣所以试试

urllib库

urlopen

1
2
3
4
5
6
7
'''
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
*, cafile=None, capath=None, cadefault=False, context=None)
'''
# 最基础使用
response = urllib.request.urlopen('https://www.baidu.com')
print(response.read().decode('utf-8'))

Request

1
2
3
4
5
6
import urllib.request
import urllib.parse
# 请求时携带参数
data = bytes(urllib.parse.urlencode({"username": "1"}), encoding='utf-8')
request = urllib.request.Request(url='https://www.baidu.com', data=data, headers={"head": "1"}, method='POST')
urllib.request.urlopen(url=request)

error

1
2
3
4
5
6
from urllib import request, error

try:
response = request.urlopen('https://cuiqingcai.com/404')
except error.URLError as e:
print(e.reason)

urlparseurlunparse/urlspliturlunsplit

1
2
3
4
5
6
7
8
9
10
11
12
13
from urllib.parse import urlparse
# url解析
result = urlparse('https://www.baidu.com/index.html;user?id=5#comment', allow_fragments=True)
# ParseResult(scheme='https', netloc='www.baidu.com', path='/index.html', params='user', query='id=5', fragment='comment')
print(result)

# 合成url
data = ['https', 'www.baidu.com', '/index.html', 'user', 'id=5', 'comment']
urlunparse = urllib.parse.urlunparse(data)
# https://www.baidu.com/index.html;user?id=5#comment
print(urlunparse)

# split的方法结果和urlparse不同之处在于会将params合并到path中,其余都一样

urljoin

1
2
3
# base_url 包括scheme(http/https) netloc(www.baidu.com) path(/index.html)
# 如果新的链接不存在这三项则补充,如果存在则使用新的链接
new_url = urljoin(base_url,target_url)

urlencodeparse_qsparse_qsl

1
2
3
4
5
6
7
8
9
10
11
12
# 将字典进行参数转化
params = {
'name': 'aa',
"age": 19
}
# url为 name=aa&age=19
url = urlencode(params)
# 反序列化
# {'name': ['aa'], 'age': ['19']}
parse_qs(url)
# [('name', 'aa'),('age', '19')]
parse_qsl(url)

quoteunquote

url中的中文参数转化为URL编码、解码

requests库

GET请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import requests  
# 方式一
r = requests.get('https://www.httpbin.org/get?name=germey&age=25')
# 方式二
data = {
'name': 'germey',
'age': 25
}
r = requests.get('https://httpbin.org/get', params=data)
# 可以将结果直接转换为json对象
# 需要注意的是如果返回的是html结构的话 直接转json肯定报错
print(r.json())
# 保存二进制文件(音频、视频、图片等)
r = requests.get('https://github.com/favicon.ico')
# open方法中,第一个参数为保存的名称,第二个参数代表以二进制写的形式打开文件,可以向文件中写入二进制
'''
'r' open for reading (default) 读 默认
'w' open for writing, truncating the file first 写
'x' create a new file and open it for writing
'a' open for writing, appending to the end of the file if it exists 在文末写入
'b' binary mode 二进制
't' text mode (default)
'+' open a disk file for updating (reading and writing)
'''
with open('favicon.ico', 'wb') as f:
f.write(r.content)
# 添加请求头
headers = {
'User-Agent': XXX
}
requests.get(url,headers = headers)

POST请求

1
2
data = {'name': 'germey', 'age': '25'}
r = requests.post("https://www.httpbin.org/post", data=data)

获取cookie

1
2
3
4
r = requests.get('https://www.baidu.com')
print(r.cookies)
for key, value in r.cookies.items():
print(key + '=' + value)

Session维持

1
2
3
4
5
6
7
8
# 下面两个请求不在同一个会话中,请求2无法获得请求1的cookies值
requests.get('https://www.httpbin.org/cookies/set/number/123456789')
requests.get('https://www.httpbin.org/cookies')
# 为了保证请求在一个会话中,下面两个请求是在同一个会话中
# 通常在模拟登陆成功后,进行下一步操作时用到
s = requests.Session()
s.get('https://www.httpbin.org/cookies/set/number/123456789')
r = s.get('https://www.httpbin.org/cookies')

SSL证书验证

当出现”您的连接不是私密连接”时,可以采用verify设置为False的方法进行绕开验证

1
2
3
# 关闭错误提示,如果不设置会出现建议指定证书的警告
urllib3.disable_warnings()
response = requests.get('https://ssr2.scrape.center/', verify=False)

超时设置

1
2
# 单位为秒
r = requests.get('https://www.httpbin.org/get', timeout=1)

代理设置

1
2
3
4
5
6
7
# 为了防止单一IP被封,可以使用代理
# 本地开启代理 clashXpro中查看混合代理端口(自设)
proxies = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890',
}
get = requests.get('https://www.google.com', proxies=proxies)

re库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
content = 'Hello 1234567 World_This is a Regex Demo'
# ------re.match只匹配字符串的开始------
result = re.match('^Hello\s(\d+)\sWorld', content)
# group方法显示匹配内容
# 打印数值为 Hello 1234567 World
print(result.group())
# 当group方法中传入数值时,需要保证正则中有相对应的括号
# 比如上文中,如果\d+没有被括号括住,那么程序报错,有几个括号就能取几个值
# 打印数值为 1234567
print(result.group(1))
# span方法显示匹配的范围
print(result.span())
result = re.match('\d+', content)
# 为NONE
print(result)
# ------贪婪匹配------
result = re.match('^He.*(\d+).*Demo$', content)
# 输出为7 因为.*将123456也匹配进去了
print(result.group(1))

# ------非贪婪匹配------
result = re.match('^He.*?(\d+).*Demo$', content)
# 输出为1234567
print(result.group(1))

# ------re.search匹配整个字符串------
result = re.search('\d+', content)
# 输出为1234567
print(result)
# 注意由于大多数HTML结构都包含换行符,所以尽量匹配时加上re.S修饰符
results = re.match('^He.*?(\d+).*Demo$', content, re.S)

# ------re.findall匹配所有字符串------

# ------re.sub去除所有匹配的字符串------
content = '54aK54yr5oiR54ix5L2g'
content = re.sub('\d+', '', content)
# aKyroiRixLg 将所有数字都去除了
print(content)

# ------re.compile将正则字符串编译为正则表达式对象,方便复用------
content = '2019-12-22 13:21'
pattern = re.compile('\d{2}:\d{2}')
result = re.sub(pattern, '', content)

httpx库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 前面的requests库、urllib库都只支持http/1.1 如果遇到http/2.0的网站,则需要使用httpx库
pip install httpx
pip install 'httpx[http2]'
# ------安装结束------
import httpx
# httpx默认还是为http/1.1,需要在这儿手动声明
client = httpx.Client(http2=True)
response = client.get('https://spa16.scrape.center/')
print(response.text)
# httpx.Client()类似于requests.Session()
# Client()中可携带很多配置信息,如headers,cookies,timeout等
with httpx.Client() as client:
response = client.get('https://httpbin.org/get')
print(response)

multiprocessing库

1
2
3
4
5
6
7
8
9
10
11
def main(page):
print(page)

if __name__ == '__main__':
# 创建进程池,注意是进程池不是线程池
pool = multiprocessing.Pool()
# pages = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pages = range(1, 11)
# 会将1-10数字分别传递给main方法,产生10个进程,最后并行进程数由CPU核数量决定
pool.map(main, pages)
pool.close()

JSON库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
data = [{
"name": "Bob",
"gender": "male",
"birthday": "1992-10-18"
}]
# dumps方法将JSON对象转为字符串
with open('data.json', 'w', encoding='utf-8') as file:
# indent表示缩进字符个数,可选
# 如果想输出中文 ensure_ascii需要设置为False
file.write(json.dumps(data, indent=2, ensure_ascii=False))
# dump方法操作的是文本
json.dump(data, open('data.json', 'w', encoding='utf-8'), indent=2, ensure_ascii=False)

str = '''
[{
"name": "Bob",
"gender": "male",
"birthday": "1992-10-18"
}, {
"name": "Selina",
"gender": "female",
"birthday": "1995-10-18"
}]
'''
# loads方法将字符串转为JSON对象
data = json.loads(str)
# load方法将文本转为JSON对象
data = json.load(open('data.json', encoding='utf-8'))

CSV库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 写入
with open('data.csv', 'w') as csvfile:
# 默认分割符号为逗号,delimiter参数可以设置
writer = csv.writer(csvfile,delimiter=' ')
rows = ['aa', 12], ['bb', 33], ['cc', 44], ['gg', 20]
writer.writerows(rows)
# 还有一种字典方式的写入
with open('data.csv', 'w') as csvfile:
fieldnames = ['id', 'name', 'age']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'id': '10001', 'name': 'Mike', 'age': 20})
writer.writerow({'id': '10002', 'name': 'Bob', 'age': 22})
# 读取
with open('data.csv', 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
print(row)

asyncio库–协程库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 协程库
import asyncio
# 用async关键字定义的方法在调用时不会被立即执行,而是返回一个协程对象
async def execute(x):
print('Number:', x)
# 调用execute方法返回一个协程对象
coroutine = execute(1)
# 调用get_event_loop方法创建一个事件循环loop
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# Task: <Task pending name='Task-1' coro=<execute() running at demo3.py:3>>
print('Task:', task)
# 调用run_until_complete方法将协程对象注册到事件循环中
# run_until_complete执行完后才会真正调用execute方法
loop.run_until_complete(coroutine)
# Task: <Task finished name='Task-1' coro=<execute() done, defined at demo3.py:3> result=1>
print('Task:', task)

aiohttp库–异步请求库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# aiohttp是一个基于asyncio的异步HTTP模块
# 用aiohttp取代之前的requests库
import asyncio
import aiohttp

# 可选 设置最大并发限制,防止爬崩网站
semaphore = asyncio.Semaphore(5)
# async关键字修饰方法,当该方法被执行时耗时的操作将被挂起让出控制权
async def get(url,params):
# 可选项,设置超时时间,单位s
timeout = aiohttp.ClientTimeout(total=1)
# 采用withAs的方式自动关闭session,否则需要调用session.close()方法
async with aiohttp.ClientSession(timeout=timeout) as session:
# POST提交:session.post('',data=data)
# POST JSON提交:session.post('',json=data)
return await session.get(url, params=params)

async def request():
async with semaphore:
# 该请求模拟每次返回时间为5s
url = 'https://httpbin.org/delay/5'
print('Waiting for', url)
params = {'name': 'germey', 'age': 25}
response = await get(url,params)
# 判断是否需要加await的方法是:被修饰的方法返回的类型,如果是coroutine类型(如async修饰的方法)则需要加上await
print('Get response from', url, 'response', await response.text(), response.status)
# 创建100个协程去调用request方法
tasks = [asyncio.ensure_future(request()) for _ in range(100)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
赏个🍗吧
0%