本文主要是介绍2024-简单点-MediaCrawler解构,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
MediaCrawler
- var.py
- recv_sms.py
- async_db.py
- db.py
- main.py
- base/base_crawler.py
- config/baseconfig.py
- config/db_config.py
- 有待更新
var.py
from asyncio.tasks import Task
from contextvars import ContextVar
from typing import Listimport aiomysqlfrom async_db import AsyncMysqlDBrequest_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")
crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="")
comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])
media_crawler_db_var: ContextVar[AsyncMysqlDB] = ContextVar("media_crawler_db_var")
db_conn_pool_var: ContextVar[aiomysql.Pool] = ContextVar("db_conn_pool_var")
recv_sms.py
import re
from typing import Listimport redis
import uvicorn
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModelimport config
from tools import utilsapp = FastAPI()redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)class SmsNotification(BaseModel):platform: strcurrent_number: strfrom_number: strsms_content: strtimestamp: strdef extract_verification_code(message: str) -> str:"""Extract verification code of 6 digits from the SMS."""pattern = re.compile(r'\b[0-9]{6}\b')codes: List[str] = pattern.findall(message)return codes[0] if codes else ""@app.post("/")
def receive_sms_notification(sms: SmsNotification):"""Receive SMS notification and send it to Redis.Args:sms:{"platform": "xhs","from_number": "1069421xxx134","sms_content": "【小红书】您的验证码是: 171959, 3分钟内有效。请勿向他人泄漏。如非本人操作,可忽略本消息。","timestamp": "1686720601614","current_number": "13152442222"}Returns:"""utils.logger.info(f"Received SMS notification: {sms.platform}, {sms.current_number}")sms_code = extract_verification_code(sms.sms_content)if sms_code:# Save the verification code in Redis and set the expiration time to 3 minutes.key = f"{sms.platform}_{sms.current_number}"redis_client.set(key, sms_code, ex=60 * 3)return {"status": "ok"}@app.get("/", status_code=status.HTTP_404_NOT_FOUND)
async def not_found():raise HTTPException(status_code=404, detail="Not Found")if __name__ == '__main__':uvicorn.run(app, port=8000, host='0.0.0.0')
async_db.py
from typing import Any, Dict, List, Unionimport aiomysqlclass AsyncMysqlDB:def __init__(self, pool: aiomysql.Pool) -> None:self.__pool = poolasync def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:"""从给定的 SQL 中查询记录,返回的是一个列表:param sql: 查询的sql:param args: sql中传递动态参数列表:return:"""async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, args)data = await cur.fetchall()return data or []async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:"""从给定的 SQL 中查询记录,返回的是符合条件的第一个结果:param sql: 查询的sql:param args:sql中传递动态参数列表:return:"""async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, args)data = await cur.fetchone()return dataasync def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:"""表中插入数据:param table_name: 表名:param item: 一条记录的字典信息:return:"""fields = list(item.keys())values = list(item.values())fields = [f'`{field}`' for field in fields]fieldstr = ','.join(fields)valstr = ','.join(['%s'] * len(item))sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr)async with self.__pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql, values)lastrowid = cur.lastrowidreturn lastrowidasync def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,value_where: Union[str, int, float]) -> int:"""更新指定表的记录:param table_name: 表名:param updates: 需要更新的字段和值的 key - value 映射:param field_where: update 语句 where 条件中的字段名:param value_where: update 语句 where 条件中的字段值:return:"""upsets = []values = []for k, v in updates.items():s = '`%s`=%%s' % kupsets.append(s)values.append(v)upsets = ','.join(upsets)sql = 'UPDATE %s SET %s WHERE %s="%s"' % (table_name,upsets,field_where, value_where,)async with self.__pool.acquire() as conn:async with conn.cursor() as cur:rows = await cur.execute(sql, values)return rowsasync def execute(self, sql: str, *args: Union[str, int]) -> int:"""需要更新、写入等操作的 excute 执行语句:param sql::param args::return:"""async with self.__pool.acquire() as conn:async with conn.cursor() as cur:rows = await cur.execute(sql, args)return rows
db.py
import asyncio
from typing import Dict
from urllib.parse import urlparseimport aiofiles
import aiomysqlimport config
from async_db import AsyncMysqlDB
from tools import utils
from var import db_conn_pool_var, media_crawler_db_vardef parse_mysql_url(mysql_url) -> Dict:"""从配置文件中解析db链接url,给到aiomysql用,因为aiomysql不支持直接以URL的方式传递链接信息。Args:mysql_url: mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawlerReturns:"""parsed_url = urlparse(mysql_url)db_params = {'host': parsed_url.hostname,'port': parsed_url.port or 3306,'user': parsed_url.username,'password': parsed_url.password,'db': parsed_url.path.lstrip('/')}return db_paramsasync def init_mediacrawler_db():"""初始化数据库链接池对象,并将该对象塞给media_crawler_db_var上下文变量Returns:"""db_conn_params = parse_mysql_url(config.RELATION_DB_URL)pool = await aiomysql.create_pool(autocommit=True,**db_conn_params)async_db_obj = AsyncMysqlDB(pool)# 将连接池对象和封装的CRUD sql接口对象放到上下文变量中db_conn_pool_var.set(pool)media_crawler_db_var.set(async_db_obj)async def init_db():"""初始化db连接池Returns:"""utils.logger.info("[init_db] start init mediacrawler db connect object")await init_mediacrawler_db()utils.logger.info("[init_db] end init mediacrawler db connect object")async def close():"""关闭连接池Returns:"""utils.logger.info("[close] close mediacrawler db pool")db_pool: aiomysql.Pool = db_conn_pool_var.get()if db_pool is not None:db_pool.close()async def init_table_schema():"""用来初始化数据库表结构,请在第一次需要创建表结构的时候使用,多次执行该函数会将已有的表以及数据全部删除Returns:"""utils.logger.info("[init_table_schema] begin init mysql table schema ...")await init_mediacrawler_db()async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()async with aiofiles.open("schema/tables.sql", mode="r") as f:schema_sql = await f.read()await async_db_obj.execute(schema_sql)utils.logger.info("[init_table_schema] mediacrawler table schema init successful")await close()if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(init_table_schema())
tips:
main.py
import argparse
import asyncio
import sysimport config
import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawlerclass CrawlerFactory:CRAWLERS = {"xhs": XiaoHongShuCrawler,"dy": DouYinCrawler,"ks": KuaishouCrawler,"bili": BilibiliCrawler,"wb": WeiboCrawler}@staticmethoddef create_crawler(platform: str) -> AbstractCrawler:crawler_class = CrawlerFactory.CRAWLERS.get(platform)if not crawler_class:raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...")return crawler_class()async def main():# define command line params ...parser = argparse.ArgumentParser(description='Media crawler program.')parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)parser.add_argument('--start', type=int, help='crawler type (number of start page)',default=config.START_PAGE)parser.add_argument('--keywords', type=str, help='crawler type (please input keywords)',default=config.KEYWORDS)# init dbif config.SAVE_DATA_OPTION == "db":await db.init_db()args = parser.parse_args()crawler = CrawlerFactory.create_crawler(platform=args.platform)crawler.init_config(platform=args.platform,login_type=args.lt,crawler_type=args.type,start_page=args.start,keyword=args.keywords)await crawler.start()if config.SAVE_DATA_OPTION == "db":await db.close()if __name__ == '__main__':try:# asyncio.run(main())asyncio.get_event_loop().run_until_complete(main())except KeyboardInterrupt:sys.exit()
base/base_crawler.py
from abc import ABC, abstractmethod
from typing import Dict, Optionalfrom playwright.async_api import BrowserContext, BrowserTypeclass AbstractCrawler(ABC):@abstractmethoddef init_config(self, platform: str, login_type: str, crawler_type: str, start_page: int, keyword: str):pass@abstractmethodasync def start(self):pass@abstractmethodasync def search(self):pass@abstractmethodasync def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str],headless: bool = True) -> BrowserContext:passclass AbstractLogin(ABC):@abstractmethodasync def begin(self):pass@abstractmethodasync def login_by_qrcode(self):pass@abstractmethodasync def login_by_mobile(self):pass@abstractmethodasync def login_by_cookies(self):passclass AbstractStore(ABC):@abstractmethodasync def store_content(self, content_item: Dict):pass@abstractmethodasync def store_comment(self, comment_item: Dict):pass# TODO support all platform# only xhs is supported, so @abstractmethod is commented# @abstractmethodasync def store_creator(self, creator: Dict):passclass AbstractStoreImage(ABC):#TODO: support all platform# only weibo is supported# @abstractmethodasync def store_image(self, image_content_item: Dict):passclass AbstractApiClient(ABC):@abstractmethodasync def request(self, method, url, **kwargs):pass@abstractmethodasync def update_cookies(self, browser_context: BrowserContext):pass@abstractmethodasync def pong(self):pass
config/baseconfig.py
# 基础配置
PLATFORM = "xhs"
KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = ""
SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书
CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)# 是否开启 IP 代理
ENABLE_IP_PROXY = False# 代理IP池数量
IP_PROXY_POOL_COUNT = 2# 代理IP提供商名称
IP_PROXY_PROVIDER_NAME = "kuaidaili"# 设置为True不会打开浏览器(无头浏览器)
# 设置False会打开一个浏览器
# 小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码
# 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。
HEADLESS = True# 是否保存登录状态
SAVE_LOGIN_STATE = True# 数据保存类型选项配置,支持三种类型:csv、db、json
SAVE_DATA_OPTION = "json" # csv or db or json# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name# 爬取开始页数 默认从第一页开始
START_PAGE = 1# 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 20# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 4# 是否开启爬图片模式, 默认不开启爬图片
ENABLE_GET_IMAGES = False# 是否开启爬评论模式, 默认不开启爬评论
ENABLE_GET_COMMENTS = False# 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False# 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = ["6422c2750000000027000d88","64ca1b73000000000b028dd2","630d5b85000000001203ab41",# ........................
]# 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = ["7280854932641664319","7202432992642387233"# ........................
]# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig","3x6zz972bchmvqe"
]# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = ["BV1d54y1g7db","BV1Sz4y1U77N","BV14Q4y1n7jz",# ........................
]# 指定微博平台需要爬取的帖子列表
WEIBO_SPECIFIED_ID_LIST = ["4982041758140155",# ........................
]# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = ["63e36c9a000000002703502b",# ........................
]
config/db_config.py
import os# redis config
REDIS_DB_HOST = "127.0.0.1" # your redis host
REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456") # your redis password# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db password
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"# sqlite3 config
# RELATION_DB_URL = f"sqlite://data/media_crawler.sqlite"
有待更新
这篇关于2024-简单点-MediaCrawler解构的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!