爬虫入门实战:斗鱼弹幕数据抓取,附送11节入门笔记

爬虫入门实战:斗鱼弹幕数据抓取,附送11节入门笔记

爬虫学习详细内容详细笔记第一课代理池概述以及开发环境https://blog.csdn.net/itcast_cn/article/details/123678415第二课代理池的设计https://blog.csdn.net/itcast_cn/article/details/123728552第三课实现代理池思路https://blog.csdn.net/itcast_cn/article/details/123729170第四课定义代理IP的数据模型类https://blog.csdn.net/itcast_cn/article/details/123789171第五课实现代理池工具模块https://blog.csdn.net/itcast_cn/article/details/123678415第六课实现代理池的校验模块https://blog.csdn.net/itcast_cn/article/details/123817860第七课实现代理池的数据库模块https://blog.csdn.net/itcast_cn/article/details/123817955第八课实现代理池的爬虫模块https://blog.csdn.net/itcast_cn/article/details/123852861第九课实现代理池的检测模块https://blog.csdn.net/itcast_cn/article/details/123880187第十课实现代理池的API模块https://blog.csdn.net/itcast_cn/article/details/123957650第十一课实现代理池的启动入口https://blog.csdn.net/itcast_cn/article/details/124015476

斗鱼弹幕

学习目标

掌握 asyncore 模块使用实现斗鱼弹幕数据抓取

预备知识

asyncore 模块

介绍

这个模块为异步socket的服务器客户端通信提供简单的接口。该模块提供了异步socket服务客户端和服务器的基础架构。

相比python原生的socket api,asyncore具备有很大的优势,asyncore对原生的socket进行封装,提供非常简洁优秀的接口,利用asyncore覆写相关需要处理的接口方法,就可以完成一个socket的网络编程,从而不需要处理复杂的socket网络状况以及多线程处理等等。

实现流程

搭建 Socket 服务器环境

nc -l 9000

客户端 Socket 开发基本使用

定义类继承自asyncore.dispatcher

class SocketClient(asyncore.dispatcher):

实现类中的回调代码

实现构造函数

调用父类方法

asyncore.dispatcher.__init__(self)

创建 Socket 对象

self.create_socket()

连接服务器

address = (host,port)

self.connect(address)

实现 handle_connect 回调函数

当 Socket 连接服务器成功时回调该函数

def handle_connect(self):

print("连接成功")

实现 writable 回调函数

描述是否有数据需要被发送到服务器。返回值为 True 表示可写,False 表示不可写,如果不实现默认返回为 True,当返回 True 时,回调函数 handle_write 将被触发

def writable(self):

return True

实现 handle_write 回调函数

当有数据需要发送时(writable 回调函数返回True时),该函数被触发,通常情况下在该函数中编写 send 方法发送数据

def handle_write(self):

# 内部实现对服务器发送数据的代码

# 调用 send 方法发送数据,参数是字节数据

self.send('hello world\n'.encode('utf-8'))

实现 readable 回调函数

描述是否有数据从服务端读取。返回 True 表示有数据需要读取,False 表示没有数据需要被读取,当不实现默认返回为 True,当返回 True 时,回调函数 handle_read 将被触发

def readable(self):

return True

实现 handle_read 回调函数

当有数据需要读取时触发(readable 回调函数返回 True 时),该函数被触发,通常情况下在该函数中编写 recv 方法接收数据

def handle_read(self):

# 主动接收数据,参数是需要接收数据的长度

# 返回的数据是字节数据

result = self.recv(1024)

print(result)

实现 handle_error 回调函数

当程序运行过程发生异常时回调

def handle_error(self):

# 编写处理错误方法

t,e,trace = sys.exc_info()

self.close()

实现 handle_close 回调函数

当连接被关闭时触发

def handle_close(self):

print("连接关闭")

self.close()

创建对象并且执行 asyncore.loop 进入运行循环

timeout 表示一次循环所需要的时长 client = SocketClient('127.0.0.1',9000)

# 开始启动运行循环

asyncore.loop(timeout=5)

斗鱼弹幕实战

文档资料

斗鱼弹幕服务器第三方接入协议v1.6.2.pdf 官方提供协议文档

弹幕客户端开发流程

连接初始化

使用TCP连接服务器

IP地址:openbarrage.douyutv.com端口:8601 客户端向弹幕服务器发送登录请求,登录弹幕服务器弹幕服务器收到客户端登录请求并完成登录后,返回登录成功消息给客户端客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器弹幕服务器接受到客户端弹幕分组请求后将客户端添加到请求指定的弹幕分组中 服务过程

客户端每隔 45 秒发送心跳给弹幕服务器,弹幕服务器回复心跳信息给客户端弹幕服务器如有广播信息,则推送给客户端,服务器消息协议 断开连接

客户端发送登出消息客户端关闭 TCP 连接

数据发送和接收流程

数据包讲解

消息长度:4 字节小端整数,表示整条消息(包括自身)长度(字节数)。 消息长度出现两遍,二者相同。消息类型:2 字节小端整数,表示消息类型。取值如下:

689 客户端发送给弹幕服务器的文本格式数据690 弹幕服务器发送给客户端的文本格式数据。 加密字段:暂时未用,默认为 0。保留字段:暂时未用,默认为 0。数据部分:斗鱼独创序列化文本数据,结尾必须为‘\0’。详细序列化、反 序列化算法见下节。(所有协议内容均为 UTF-8 编码)

数据包封装

对数据包进行对象化封装,对数据的封装方便以后使用,实现对象和二进制数据之间的转换

通过参数构建数据包对象实现获取数据包长度的方法实现获取二进制数据的方法

实现发送数据包

构建发送数据包的容器

self.send_queue = Queue()

实现回调函数,判断容器中有数据就发送没有数据不发送

def writable(self):

return self.send_queue.qsize() > 0

def handle_write(self):

# 从发送数据包队列中获取数据包对象

dp = self.send_queue.get()

# 获取数据包的长度,并且发送给服务器

dp_length = dp.get_length()

dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)

self.send(dp_length_data)

# 发送数据包二进制数据

self.send(dp.get_bytes())

self.send_queue.task_done()

pass

实现登录函数

构建登录数据包

content = "type@=loginreq/roomid@={}/".format(room_id)

login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)

把数据包添加到发送数据包容器中

# 把数据包添加到发送数据包容器中

self.send_queue.put(login_dp)

实现接收数据

构建接收数据包队列

# 存放接收的数据包对象

self.recv_queue = Queue()

读取回调函数中读取数据

读取长度

# 读取长度,二进制数据

data_length_data = self.recv(4)

# 通过二进制获取length 具体数据

data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)

读取内容

# 通过数据包的长度获取数据

data = self.recv(data_length)

构建数据包对象

数据包构造函数中解析二进制来构建数据包对象

self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)

self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)

self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)

# 构建数据部分

self.content = str(data_bytes[8:-1],encoding='utf-8')

通过二进制数据构建数据包对象

# 通过二进制数据构建数据包对象

dp = DataPacket(data_bytes=data)

把数据包放入接收数据包容器中

# 把数据包放入接收数据包容器中

self.recv_queue.put(dp)

构建处理线程专门处理接收数据包容器中数据

构建线程

# 构建一个专门处理接收数据包容器中的数据包的线程

self.callback_thread = threading.Thread(target=self.do_callback)

self.callback_thread.setDaemon(True)

self.callback_thread.start()

实现回调函数处理接收的数据包

def do_callback(self):

'''

专门负责处理接收数据包容器中的数据

:return:

'''

while True:

# 从接收数据包容器中获取数据包

dp = self.recv_queue.get()

# 对数据进行处理

print(dp.content)

pass

实现外部传入回调函数

通过外部指定回调函数实现自定义数据处理

添加参数 callback

构造函数中添加参数

def __init__(self,host,port,callback=None):

# 定义外部传入的自定义回调函数

self.callback = callback

外部传入自定义回调函数

def data_callback(dp):

'''

自定义回调函数

:param dp: 数据包对象

:return:

'''

print("data_callback:",dp.content)

pass

if __name__ == '__main__':

client = DouyuClient('openbarrage.douyutv.com',8601,callback=data_callback)

client.login_room_id(4494106)

asyncore.loop(timeout=10)

在处理接收数据包的线程中调用回调函数

def do_callback(self):

'''

专门负责处理接收数据包容器中的数据

:return:

'''

while True:

# 从接收数据包容器中获取数据包

dp = self.recv_queue.get()

# 对数据进行处理

if self.callback is not None:

self.callback(dp)

self.recv_queue.task_done()

数据内容序列化与反序列化

键 key 和值 value 直接采用‘@=’分割数组采用‘/’分割如果 key 或者 value 中含有字符‘/’,则使用‘@S’转义如果 key 或者 value 中含有字符‘@’,使用‘@A’转义

例子

多个键值对数据:key1@=value1/key2@=value2/key3@=value3/

数组数据:value1/value2/value3/

登录

参看斗鱼弹幕文档

加入弹幕分组

参看斗鱼弹幕文档,-9999 为海量弹幕

心跳机制

作用是让服务器解决假死连接问题,客户端必须每隔45秒发送一次请求,否则就会被主动断开。

实现发送心跳函数

构建心跳数据包把数据包添加到发送数据包容器队列中 构建心跳线程

构建心跳线程添加触发机制添加暂停机制

代码实现

#!/usr/bin/python3

# -*- coding: utf-8 -*-

import asyncore

import sys

import threading

import time

from queue import Queue

DATA_PACKET_TYPE_SEND = 689

DATA_PACKET_TYPE_RECV = 690

def encode_content(content):

'''

序列化函数

:param content: 需要序列化的内容

:return:

'''

if isinstance(content,str):

return content.replace(r'@',r'@A').replace(r'/',r'@S')

elif isinstance(content,dict):

return r'/'.join(["{}@={}".format(encode_content(k),encode_content(v)) for k,v in content.items()]) + r'/'

elif isinstance(content,list):

return r'/'.join([encode_content(data) for data in content]) + r'/'

return ""

def decode_to_str(content):

'''

反序列化字符串

:param content:字符串数据

:return:

'''

if isinstance(content,str):

return content.replace(r'@S',r'/').replace('@A',r'@')

return ""

def decode_to_dict(content):

'''

反序列化字典数据

:param content: 被序列化的字符串

:return:

'''

ret_dict = dict()

if isinstance(content,str):

item_strings = content.split(r'/')

for item_string in item_strings:

k_v_list = item_string.split(r'@=')

if k_v_list is not None and len(k_v_list) > 1:

k = k_v_list[0]

v = k_v_list[1]

ret_dict[decode_to_str(k)] = decode_to_str(v)

return ret_dict

def decode_to_list(content):

'''

反序列化列表数据

:param content: 被序列化的字符串

:return:

'''

ret_list = []

if isinstance(content,str):

items = content.split(r'/')

for idx,item in enumerate(items):

if idx < len(items) - 1:

ret_list.append(decode_to_str(item))

return ret_list

class DataPacket():

def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes=None):

if data_bytes is None:

# 数据包的类型

self.type = type

# 数据部分内容

self.content = content

self.encrypt_flag = 0

self.preserve_flag = 0

else:

self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)

self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)

self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)

# 构建数据部分

self.content = str(data_bytes[8:-1],encoding='utf-8')

def get_length(self):

'''

获取当前数据包的长度,为以后需要发送数据包做准备

:return:

'''

return 4 + 2 + 1 + 1 + len(self.content.encode('utf-8')) + 1

def get_bytes(self):

'''

通过数据包转换成 二进制数据类型

:return:

'''

data = bytes()

# 构建 4 个字节的消息长度数据

data_packet_length = self.get_length()

# to_bytes 把一个整型数据转换成二进制数据

# 第一个参数 表示需要转换的二进制数据占几个字节

# byteorder 第二个参数 描述字节序

# signed 设置是否有符号

# 处理消息长度

data += data_packet_length.to_bytes(4,byteorder='little',signed=False)

# 处理消息类型

data += self.type.to_bytes(2,byteorder='little',signed=False)

# 处理加密字段

data += self.encrypt_flag.to_bytes(1,byteorder='little',signed=False)

# 处理保留字段

data += self.preserve_flag.to_bytes(1,byteorder='little',signed=False)

# 处理数据内容

data += self.content.encode('utf-8')

# 添加 \0 数据

data += b'\0'

return data

class DouyuClient(asyncore.dispatcher):

def __init__(self,host,port,callback=None):

# 构建发送数据包的队列容器

# 存放了数据包对象

self.send_queue = Queue()

# 存放接收的数据包对象

self.recv_queue = Queue()

# 定义外部传入的自定义回调函数

self.callback = callback

asyncore.dispatcher.__init__(self)

self.create_socket()

address = (host,port)

self.connect(address)

# 构建一个专门处理接收数据包容器中的数据包的线程

self.callback_thread = threading.Thread(target=self.do_callback)

self.callback_thread.setDaemon(True)

self.callback_thread.start()

# 构建心跳线程

self.heart_thread = threading.Thread(target=self.do_ping)

self.heart_thread.setDaemon(True)

self.ping_runing = False

pass

def handle_connect(self):

print("连接成功")

self.start_ping()

def writable(self):

return self.send_queue.qsize() > 0

def handle_write(self):

# 从发送数据包队列中获取数据包对象

dp = self.send_queue.get()

# 获取数据包的长度,并且发送给服务器

dp_length = dp.get_length()

dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)

self.send(dp_length_data)

# 发送数据包二进制数据

self.send(dp.get_bytes())

self.send_queue.task_done()

pass

def readable(self):

return True

def handle_read(self):

# 读取长度,二进制数据

data_length_data = self.recv(4)

# 通过二进制获取length 具体数据

data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)

# 通过数据包的长度获取数据

data = self.recv(data_length)

# 通过二进制数据构建数据包对象

dp = DataPacket(data_bytes=data)

# 把数据包放入接收数据包容器中

self.recv_queue.put(dp)

def handle_error(self):

t, e, trace = sys.exc_info()

print(e)

self.close()

def handle_close(self):

self.stop_ping()

print("连接关闭")

self.close()

def login_room_id(self,room_id):

self.room_id = room_id

send_data = {

"type":"loginreq",

"roomid":str(room_id)

}

# 构建登录数据包

content = encode_content(send_data)

login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)

# 把数据包添加到发送数据包容器中

self.send_queue.put(login_dp)

def join_room_group(self):

'''

加入弹幕分组

:return:

'''

send_data = {

"type":"joingroup",

"rid":str(self.room_id),

"gid":'-9999'

}

content = encode_content(send_data)

dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)

self.send_queue.put(dp)

pass

def send_heart_data_packet(self):

send_data = {

"type":"mrkl"

}

content = encode_content(send_data)

dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)

self.send_queue.put(dp)

def start_ping(self):

'''

开启心跳

:return:

'''

self.ping_runing = True

def stop_ping(self):

'''

结束心跳

:return:

'''

self.ping_runing = False

def do_ping(self):

'''

执行心跳

:return:

'''

while True:

if self.ping_runing:

self.send_heart_data_packet()

time.sleep(40)

def do_callback(self):

'''

专门负责处理接收数据包容器中的数据

:return:

'''

while True:

# 从接收数据包容器中获取数据包

dp = self.recv_queue.get()

# 对数据进行处理

if self.callback is not None:

self.callback(self,dp)

self.recv_queue.task_done()

pass

def data_callback(client,dp):

'''

自定义回调函数

:param dp: 数据包对象

:return:

'''

resp_data = decode_to_dict(dp.content)

if resp_data["type"] == "loginres":

#调用加入分组请求

print("登录成功:",resp_data)

client.join_room_group()

elif resp_data["type"] == "chatmsg":

print("{}:{}".format(resp_data["nn"],resp_data["txt"]))

elif resp_data["type"] == 'onlinegift':

print("暴击鱼丸")

elif resp_data["type"] == "uenter":

print("{} 进入了房间".format(resp_data["nn"]))

pass

if __name__ == '__main__':

client = DouyuClient('openbarrage.douyutv.com',8601,callback=data_callback)

client.login_room_id(4494106)

asyncore.loop(timeout=10)

相关数据

电机电容坏了的表现?电容坏了怎么修?
必发365手机版下载

电机电容坏了的表现?电容坏了怎么修?

⌛ 07-16 👁️ 6387
局域网打印机怎么连接 3个方法教会连接
下载365App

局域网打印机怎么连接 3个方法教会连接

⌛ 09-03 👁️ 794
竞品对比
下载365App

竞品对比

⌛ 10-25 👁️ 3531