2024-简单点-MediaCrawler解构

2024-05-16 06:44

本文主要是介绍2024-简单点-MediaCrawler解构,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MediaCrawler

  • var.py
  • recv_sms.py
  • async_db.py
  • db.py
  • main.py
  • base/base_crawler.py
  • config/baseconfig.py
  • config/db_config.py
  • 有待更新

在这里插入图片描述

var.py

from asyncio.tasks import Task
from contextvars import ContextVar
from typing import Listimport aiomysqlfrom async_db import AsyncMysqlDBrequest_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")
crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="")
comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])
media_crawler_db_var: ContextVar[AsyncMysqlDB] = ContextVar("media_crawler_db_var")
db_conn_pool_var: ContextVar[aiomysql.Pool] = ContextVar("db_conn_pool_var")

在这里插入图片描述

recv_sms.py

import re
from typing import Listimport redis
import uvicorn
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModelimport config
from tools import utilsapp = FastAPI()redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)class SmsNotification(BaseModel):platform: strcurrent_number: strfrom_number: strsms_content: strtimestamp: strdef extract_verification_code(message: str) -> str:"""Extract verification code of 6 digits from the SMS."""pattern = re.compile(r'\b[0-9]{6}\b')codes: List[str] = pattern.findall(message)return codes[0] if codes else ""@app.post("/")
def receive_sms_notification(sms: SmsNotification):"""Receive SMS notification and send it to Redis.Args:sms:{"platform": "xhs","from_number": "1069421xxx134","sms_content": "【小红书】您的验证码是: 171959, 3分钟内有效。请勿向他人泄漏。如非本人操作,可忽略本消息。","timestamp": "1686720601614","current_number": "13152442222"}Returns:"""utils.logger.info(f"Received SMS notification: {sms.platform}, {sms.current_number}")sms_code = extract_verification_code(sms.sms_content)if sms_code:# Save the verification code in Redis and set the expiration time to 3 minutes.key = f"{sms.platform}_{sms.current_number}"redis_client.set(key, sms_code, ex=60 * 3)return {"status": "ok"}@app.get("/", status_code=status.HTTP_404_NOT_FOUND)
async def not_found():raise HTTPException(status_code=404, detail="Not Found")if __name__ == '__main__':uvicorn.run(app, port=8000, host='0.0.0.0')

在这里插入图片描述

async_db.py

from typing import Any, Dict, List, Unionimport aiomysqlclass AsyncMysqlDB:def __init__(self, pool: aiomysql.Pool) -> None:self.__pool = poolasync def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:"""从给定的 SQL 中查询记录,返回的是一个列表:param sql: 查询的sql:param args: sql中传递动态参数列表:return:"""async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, args)data = await cur.fetchall()return data or []async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:"""从给定的 SQL 中查询记录,返回的是符合条件的第一个结果:param sql: 查询的sql:param args:sql中传递动态参数列表:return:"""async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, args)data = await cur.fetchone()return dataasync def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:"""表中插入数据:param table_name: 表名:param item: 一条记录的字典信息:return:"""fields = list(item.keys())values = list(item.values())fields = [f'`{field}`' for field in fields]fieldstr = ','.join(fields)valstr = ','.join(['%s'] * len(item))sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr)async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, values)lastrowid = cur.lastrowidreturn lastrowidasync def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,value_where: Union[str, int, float]) -> int:"""更新指定表的记录:param table_name: 表名:param updates: 需要更新的字段和值的 key - value 映射:param field_where: update 语句 where 条件中的字段名:param value_where: update 语句 where 条件中的字段值:return:"""upsets = []values = []for k, v in updates.items():s = '`%s`=%%s' % kupsets.append(s)values.append(v)upsets = ','.join(upsets)sql = 'UPDATE %s SET %s WHERE %s="%s"' % (table_name,upsets,field_where, value_where,)async with self.__pool.acquire() as conn:async with conn.cursor() as cur:rows = await cur.execute(sql, values)return rowsasync def execute(self, sql: str, *args: Union[str, int]) -> int:"""需要更新、写入等操作的 excute 执行语句:param sql::param args::return:"""async with self.__pool.acquire() as conn:async with conn.cursor() as cur:rows = await cur.execute(sql, args)return rows

在这里插入图片描述

db.py

import asyncio
from typing import Dict
from urllib.parse import urlparseimport aiofiles
import aiomysqlimport config
from async_db import AsyncMysqlDB
from tools import utils
from var import db_conn_pool_var, media_crawler_db_vardef parse_mysql_url(mysql_url) -> Dict:"""从配置文件中解析db链接url,给到aiomysql用,因为aiomysql不支持直接以URL的方式传递链接信息。Args:mysql_url: mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawlerReturns:"""parsed_url = urlparse(mysql_url)db_params = {'host': parsed_url.hostname,'port': parsed_url.port or 3306,'user': parsed_url.username,'password': parsed_url.password,'db': parsed_url.path.lstrip('/')}return db_paramsasync def init_mediacrawler_db():"""初始化数据库链接池对象,并将该对象塞给media_crawler_db_var上下文变量Returns:"""db_conn_params = parse_mysql_url(config.RELATION_DB_URL)pool = await aiomysql.create_pool(autocommit=True,**db_conn_params)async_db_obj = AsyncMysqlDB(pool)# 将连接池对象和封装的CRUD sql接口对象放到上下文变量中db_conn_pool_var.set(pool)media_crawler_db_var.set(async_db_obj)async def init_db():"""初始化db连接池Returns:"""utils.logger.info("[init_db] start init mediacrawler db connect object")await init_mediacrawler_db()utils.logger.info("[init_db] end init mediacrawler db connect object")async def close():"""关闭连接池Returns:"""utils.logger.info("[close] close mediacrawler db pool")db_pool: aiomysql.Pool = db_conn_pool_var.get()if db_pool is not None:db_pool.close()async def init_table_schema():"""用来初始化数据库表结构,请在第一次需要创建表结构的时候使用,多次执行该函数会将已有的表以及数据全部删除Returns:"""utils.logger.info("[init_table_schema] begin init mysql table schema ...")await init_mediacrawler_db()async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()async with aiofiles.open("schema/tables.sql", mode="r") as f:schema_sql = await f.read()await async_db_obj.execute(schema_sql)utils.logger.info("[init_table_schema] mediacrawler table schema init successful")await close()if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(init_table_schema())

在这里插入图片描述
tips:
在这里插入图片描述

main.py

import argparse
import asyncio
import sysimport config
import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawlerclass CrawlerFactory:CRAWLERS = {"xhs": XiaoHongShuCrawler,"dy": DouYinCrawler,"ks": KuaishouCrawler,"bili": BilibiliCrawler,"wb": WeiboCrawler}@staticmethoddef create_crawler(platform: str) -> AbstractCrawler:crawler_class = CrawlerFactory.CRAWLERS.get(platform)if not crawler_class:raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...")return crawler_class()async def main():# define command line params ...parser = argparse.ArgumentParser(description='Media crawler program.')parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)parser.add_argument('--start', type=int, help='crawler type (number of start page)',default=config.START_PAGE)parser.add_argument('--keywords', type=str, help='crawler type (please input keywords)',default=config.KEYWORDS)# init dbif config.SAVE_DATA_OPTION == "db":await db.init_db()args = parser.parse_args()crawler = CrawlerFactory.create_crawler(platform=args.platform)crawler.init_config(platform=args.platform,login_type=args.lt,crawler_type=args.type,start_page=args.start,keyword=args.keywords)await crawler.start()if config.SAVE_DATA_OPTION == "db":await db.close()if __name__ == '__main__':try:# asyncio.run(main())asyncio.get_event_loop().run_until_complete(main())except KeyboardInterrupt:sys.exit()

在这里插入图片描述

base/base_crawler.py

from abc import ABC, abstractmethod
from typing import Dict, Optionalfrom playwright.async_api import BrowserContext, BrowserTypeclass AbstractCrawler(ABC):@abstractmethoddef init_config(self, platform: str, login_type: str, crawler_type: str, start_page: int, keyword: str):pass@abstractmethodasync def start(self):pass@abstractmethodasync def search(self):pass@abstractmethodasync def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str],headless: bool = True) -> BrowserContext:passclass AbstractLogin(ABC):@abstractmethodasync def begin(self):pass@abstractmethodasync def login_by_qrcode(self):pass@abstractmethodasync def login_by_mobile(self):pass@abstractmethodasync def login_by_cookies(self):passclass AbstractStore(ABC):@abstractmethodasync def store_content(self, content_item: Dict):pass@abstractmethodasync def store_comment(self, comment_item: Dict):pass# TODO support all platform# only xhs is supported, so @abstractmethod is commented# @abstractmethodasync def store_creator(self, creator: Dict):passclass AbstractStoreImage(ABC):#TODO: support all platform# only weibo is supported# @abstractmethodasync def store_image(self, image_content_item: Dict):passclass AbstractApiClient(ABC):@abstractmethodasync def request(self, method, url, **kwargs):pass@abstractmethodasync def update_cookies(self, browser_context: BrowserContext):pass@abstractmethodasync def pong(self):pass

在这里插入图片描述

config/baseconfig.py

# 基础配置
PLATFORM = "xhs"
KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode"  # qrcode or phone or cookie
COOKIES = ""
SORT_TYPE = "popularity_descending"  # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书
CRAWLER_TYPE = "search"  # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)# 是否开启 IP 代理
ENABLE_IP_PROXY = False# 代理IP池数量
IP_PROXY_POOL_COUNT = 2# 代理IP提供商名称
IP_PROXY_PROVIDER_NAME = "kuaidaili"# 设置为True不会打开浏览器(无头浏览器)
# 设置False会打开一个浏览器
# 小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码
# 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。
HEADLESS = True# 是否保存登录状态
SAVE_LOGIN_STATE = True# 数据保存类型选项配置,支持三种类型:csv、db、json
SAVE_DATA_OPTION = "json"  # csv or db or json# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir"  # %s will be replaced by platform name# 爬取开始页数 默认从第一页开始
START_PAGE = 1# 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 20# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 4# 是否开启爬图片模式, 默认不开启爬图片
ENABLE_GET_IMAGES = False# 是否开启爬评论模式, 默认不开启爬评论
ENABLE_GET_COMMENTS = False# 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False# 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = ["6422c2750000000027000d88","64ca1b73000000000b028dd2","630d5b85000000001203ab41",# ........................
]# 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = ["7280854932641664319","7202432992642387233"# ........................
]# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig","3x6zz972bchmvqe"
]# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = ["BV1d54y1g7db","BV1Sz4y1U77N","BV14Q4y1n7jz",# ........................
]# 指定微博平台需要爬取的帖子列表
WEIBO_SPECIFIED_ID_LIST = ["4982041758140155",# ........................
]# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = ["63e36c9a000000002703502b",# ........................
]

config/db_config.py

import os# redis config
REDIS_DB_HOST = "127.0.0.1"  # your redis host
REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456")  # your redis password# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456")  # your relation db password
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"# sqlite3 config
# RELATION_DB_URL = f"sqlite://data/media_crawler.sqlite"

有待更新

这篇关于2024-简单点-MediaCrawler解构的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/994160

相关文章

利用Python编写一个简单的聊天机器人

《利用Python编写一个简单的聊天机器人》这篇文章主要为大家详细介绍了如何利用Python编写一个简单的聊天机器人,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 使用 python 编写一个简单的聊天机器人可以从最基础的逻辑开始,然后逐步加入更复杂的功能。这里我们将先实现一个简单的

使用IntelliJ IDEA创建简单的Java Web项目完整步骤

《使用IntelliJIDEA创建简单的JavaWeb项目完整步骤》:本文主要介绍如何使用IntelliJIDEA创建一个简单的JavaWeb项目,实现登录、注册和查看用户列表功能,使用Se... 目录前置准备项目功能实现步骤1. 创建项目2. 配置 Tomcat3. 项目文件结构4. 创建数据库和表5.

使用PyQt5编写一个简单的取色器

《使用PyQt5编写一个简单的取色器》:本文主要介绍PyQt5搭建的一个取色器,一共写了两款应用,一款使用快捷键捕获鼠标附近图像的RGB和16进制颜色编码,一款跟随鼠标刷新图像的RGB和16... 目录取色器1取色器2PyQt5搭建的一个取色器,一共写了两款应用,一款使用快捷键捕获鼠标附近图像的RGB和16

四种简单方法 轻松进入电脑主板 BIOS 或 UEFI 固件设置

《四种简单方法轻松进入电脑主板BIOS或UEFI固件设置》设置BIOS/UEFI是计算机维护和管理中的一项重要任务,它允许用户配置计算机的启动选项、硬件设置和其他关键参数,该怎么进入呢?下面... 随着计算机技术的发展,大多数主流 PC 和笔记本已经从传统 BIOS 转向了 UEFI 固件。很多时候,我们也

基于Qt开发一个简单的OFD阅读器

《基于Qt开发一个简单的OFD阅读器》这篇文章主要为大家详细介绍了如何使用Qt框架开发一个功能强大且性能优异的OFD阅读器,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 目录摘要引言一、OFD文件格式解析二、文档结构解析三、页面渲染四、用户交互五、性能优化六、示例代码七、未来发展方向八、结论摘要

Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)

《Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)》:本文主要介绍Python基于火山引擎豆包大模型搭建QQ机器人详细的相关资料,包括开通模型、配置APIKEY鉴权和SD... 目录豆包大模型概述开通模型付费安装 SDK 环境配置 API KEY 鉴权Ark 模型接口Prompt

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题是由安全生产模拟考试一点通提供,流动式起重机司机证模拟考试题库是根据流动式起重机司机最新版教材,流动式起重机司机大纲整理而成(含2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题参考答案和部分工种参考解析),掌握本资料和学校方法,考试容易。流动式起重机司机考试技