Python+MySQL爬取船讯网AIS静态数据

2023-11-08 21:10

本文主要是介绍Python+MySQL爬取船讯网AIS静态数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 注意:学习前请自行掌握Python和MySQL基础知识。

  • 以下代码为原创,完整且可执行。

1、了解船讯网的信息

  • 打开船讯网界面 https://www.shipxy.com
  • 随机搜索一只船舶—设置—网络—Getship—预览。根据自己的需要,选择一会需要保存的数据项。

  • 切换至“标头”,圈起来的都是要用的。

2、准备工具

  • Pycharm (爬取数据用)+MySQL workbench(存储数据用)
  • 需要爬取的船舶mmsi数据(可以是csv,也可以是MySQL数据表)
  • 需要爬取的船舶mmsi表
  •  创建需要存储数据的表

  •  需要的Python库
    import requests
    import pymysql
    import threading
    import concurrent.futures

    3、编写代码

import requests
import pymysql
import threading
import concurrent.futures
import time #导入time模块class CompanyShipInDatabase:def __init__(self):self.conn = Noneself.cursor = Noneself.session = requests.Session()def create_database_connection(self):# 创建数据库连接self.conn = pymysql.connect(host="localhost", port=3306, user="root", password="your_password", database="your_database", charset="utf8")self.cursor = self.conn.cursor()def close_database_connection(self):# 关闭数据库连接if self.cursor:self.cursor.close()if self.conn:self.conn.close()def in_database(self, data_list):# 准备要插入或更新的数据update_data = []insert_data = []for item in data_list:mmsi = item['mmsi']select_sql = "SELECT mmsi FROM static_data WHERE mmsi = %s"self.cursor.execute(select_sql, (mmsi,))existing_record = self.cursor.fetchone()if existing_record:# 如果记录已存在,执行更新操作update_data.append((item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught'], mmsi))else:# 如果记录不存在,执行插入操作insert_data.append((item['mmsi'], item['imo'], item['name'], item['callsign'], item['length'], item['width'], item['trail'], item['draught']))if update_data:# 批量执行更新操作update_sql = '''UPDATE static_dataSET imo = %s, name = %s, callsign = %s, length = %s, width = %s, trail = %s, draught = %sWHERE mmsi = %s'''self.cursor.executemany(update_sql, update_data)if insert_data:# 批量执行插入操作insert_sql = '''INSERT INTO static_data(mmsi, imo, name, callsign, length, width, trail, draught)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'''self.cursor.executemany(insert_sql, insert_data)# 提交数据库事务self.conn.commit()def make_http_request(self, mmsi, headers):# 发送HTTP请求获取数据url = f'https://www.shipxy.com/ship/GetShip?mmsi={mmsi}'try:response = self.session.get(url, headers=headers)if response.status_code == 200:result = response.json()return resultelse:return Noneexcept Exception as e:print(f"HTTP请求过程中发生错误: {str(e)}")return Nonedef fetch_ship_data(self, mmsi_group, headers):# 获取船舶数据data = []for item in mmsi_group:mmsi = item[0]result = self.make_http_request(mmsi, headers)if result and 'data' in result and len(result['data']) > 0:data.append(result['data'][0])print(f"已成功获取 mmsi: {mmsi}")else:print(f"mmsi: {mmsi} 未找到数据")return data#主程序def company_ship_in_database(self):# 主程序入口self.create_database_connection()mmsi_group = self.get_mmsi()data = []Cookie = 'your_Cookie'  # 请替换成你的Cookieheaders = {'User-Agent': 'your_User-Agent',  # 请替换成你的User-Agent'Cookie': Cookie,}#记录程序开始的时间start_time = time.time()# 使用线程池并发获取数据num_threads = 8  # 可根据需要调整线程数chunk_size = len(mmsi_group) // num_threadswith concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:futures = []for i in range(num_threads):start = i * chunk_sizeend = (i + 1) * chunk_size if i < num_threads - 1 else len(mmsi_group)future = executor.submit(self.fetch_ship_data, mmsi_group[start:end], headers)futures.append(future)for future in concurrent.futures.as_completed(futures):data.extend(future.result())#记录数据爬取结束的时间data_fetch_end_time = time.time()print(f"已完成查询,共获取 {len(data)} 条数据")print(data)# 批量插入或更新数据self.in_database(data)#记录数据写入数据库结束的时间data_write_end_time = time.time()self.close_database_connection()#计算程序运行时间并打印elapsed_time = data_fetch_end_time - start_timeprint(f"数据获取时间:0.0569 秒")elapsed_time = data_write_end_time - data_fetch_end_timeprint(f"数据写入数据库时间:0.0569 秒")return datadef get_mmsi(self):# 获取要查询的MMSI列表mmsi_sql = "select distinct mmsi from your_mmsi order by mmsi"self.cursor.execute(mmsi_sql)mmsi_group = self.cursor.fetchall()return mmsi_groupif __name__ == "__main__":company_ship = CompanyShipInDatabase()data = company_ship.company_ship_in_database()
  • 以上代码为完整代码,替换成对应的参数即可运行。
  • 代码设计时优先考虑'运行速率',采用多线程池和减少循环的手段。因为爬取数据涉及的数据量庞大,再加上本人的电脑垃圾,所以以最低的时间爬取同等的信息非常重要。
  • 代码的实际运行速度。

                ## 4828条船舶数据从爬取到写入数据库仅用了48s。

                ## 43405条船舶数据从爬取到写入数据库仅用348s。

  •  结果(我只取了9个数据,大家根据自己的需求筛选就行)

这篇关于Python+MySQL爬取船讯网AIS静态数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/372527

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import

oracle DBMS_SQL.PARSE的使用方法和示例

《oracleDBMS_SQL.PARSE的使用方法和示例》DBMS_SQL是Oracle数据库中的一个强大包,用于动态构建和执行SQL语句,DBMS_SQL.PARSE过程解析SQL语句或PL/S... 目录语法示例注意事项DBMS_SQL 是 oracle 数据库中的一个强大包,它允许动态地构建和执行

Python Jupyter Notebook导包报错问题及解决

《PythonJupyterNotebook导包报错问题及解决》在conda环境中安装包后,JupyterNotebook导入时出现ImportError,可能是由于包版本不对应或版本太高,解决方... 目录问题解决方法重新安装Jupyter NoteBook 更改Kernel总结问题在conda上安装了