curl多线程大批量分片下载大文件源码示例

2024-03-18 15:48

本文主要是介绍curl多线程大批量分片下载大文件源码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这段时间,一直在探索使用curl多线程来下载一系列的大文件的可行性方法。下面是我探索的结果:

1.将大文件分为许多小片段,比如20M一个片段(当然这个值可以配置,比如100M一个片段,取决于你的业务需求),使用http range来下载这些片段;

2.使用预先生成的线程池来连续不间断地执行文件片段的下载,这个线程池也就是一个固定线程个数的普通线程池,使用互斥锁和信号量来保护任务队列,各worker线程抢占式获取要下载的任务;

3.文件的下载使用磁盘文件形式,当然你也可以修改为保存在内存中。

4.由于easy handle不能跨线程调用,我们只能是一个线程函数里一个curl easy handle,用完就释放掉。

5.如果是小文件(2M以内的,比如html,img之类的),建议使用单线程中的curl multi handle的异步调用架构,后续我会将相关代码放到网上。

由于一个文件分为多个分片,每个分片可能被不同的线程下载,每个分片完成的次序并不固定,这和Bittorrent有些相似,所以,文件的关闭是个需要审慎考虑的问题。怎样处理,才能保证所有的文件全都及时安全关闭呢?及时的要求是,整个文件一下载,最好就把它关闭,以免占用文件描述符。

在目前demo的设计中,我还没有找到一个很好的思路来将它们及时关闭。欢迎大牛指点良策。

源码示例的功能:

目前的代码中,设置了是否启用http range下载的宏开关,同时定义了每个分片大小是20M。在开启range分片下载时,一个63M的文件,将会分为 63 /20 = 3 + 1 =4个分片。因而在线程池中可能会被4个线程下载。而目前线程池我设置了30个线程。

如果没有开启range分片下载,就是一个线程下载一个文件,这个文件有可能是2M,也有可能是400M,还有可能是2.8G,所以会造成大多数线程空闲,个别线程累死,这对合理高效利用计算资源是不利的,所以,我提倡尽可能使用range下载大文件。

下面是源码

//g++ -g test_pool.cpp thread_pool.cpp -o test_pool -lpthread -lcurl
//
#include <unistd.h>
#include <libgen.h>
#include <curl/curl.h>
#include <string>
#include <vector>
#include <map>
#include "thread_pool.h"using namespace std;#define  HTTP_RANGE    0 //是否开启http range下载功能?
const long seg_size = 20 * 1024 * 1024; //文件分片阈值大小是100MB, 超过该大小的文件将会强制分片下载static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
map<string, int>  download_map; //下载任务的完成情况, key是url或是文件名, val是分片数, 用于主线程监视class CloseInfo {public:CloseInfo(FILE* f, string& s): fp(f), url(s) {}~CloseInfo(){}public:FILE*    fp;string   url;
};
vector<CloseInfo*> closed_vec; //所有打开的文件列表需要最后继中关闭, 因为在分片下载的情况下,无法保证在所有的分片都下载完成的情况下关闭文件//这里必须是struct而不能是class
struct JobInfo{JobInfo(): fp(NULL), startPos(0), stopPos(0), ranged(false) {}FILE*    fp;  //本地文件句柄long     startPos;long     stopPos;string   url;bool     ranged; //是否需要http range下载?
};static void set_JobInfo(JobInfo* ji, FILE* f, long s, long e, string u, bool r){ji->fp = f;ji->startPos = s;ji->stopPos = e;ji->url = u;ji->ranged = r;
}long get_download_file_length (const char *url)
{double file_len = 0;CURL *handle = curl_easy_init ();curl_easy_setopt (handle, CURLOPT_URL, url);curl_easy_setopt (handle, CURLOPT_FOLLOWLOCATION, 1L);curl_easy_setopt (handle, CURLOPT_MAXREDIRS, 3L);curl_easy_setopt (handle, CURLOPT_AUTOREFERER, 1L);curl_easy_setopt (handle, CURLOPT_HEADER, 0L);   //只需要header头curl_easy_setopt (handle, CURLOPT_NOBODY, 1L);   //不需要bodycurl_easy_setopt (handle, CURLOPT_FORBID_REUSE, 1);curl_easy_setopt (handle, CURLOPT_USERAGENT, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"); //user-agentif (curl_easy_perform (handle) == CURLE_OK){curl_easy_getinfo (handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &file_len);}else{file_len = -1;}curl_easy_cleanup(handle);return file_len;
}//每个线程在下载每个文件分片时,都会回调该函数
static size_t write_data(void* ptr, size_t size, size_t nmemb, void* userdata){JobInfo* ji = (JobInfo*) userdata;bool ranged = ji->ranged;size_t written;//要分片下载的大文件, 需要设置http range域if(ranged){//多线程写同一个文件, 需要加锁pthread_mutex_lock (&g_mutex);if(ji->startPos + size * nmemb <= ji->stopPos){fseek(ji->fp, ji->startPos, SEEK_SET);written = fwrite(ptr, size, nmemb, ji->fp);ji->startPos += size * nmemb;}else{fseek(ji->fp, ji->startPos, SEEK_SET);written = fwrite(ptr, 1, ji->stopPos - ji->startPos + 1, ji->fp);ji->startPos = ji->stopPos;}pthread_mutex_unlock (&g_mutex);}else{written = fwrite(ptr, size, nmemb, ji->fp);}return written;
}void* job_process(void* arg){CURL* curl;CURLcode res;JobInfo* ji = (JobInfo*)arg;char range[64] = { 0 };if(ji->ranged) {snprintf (range, sizeof(range), "%ld-%ld", ji->startPos, ji->stopPos);printf("range: [%s]\n", range);}curl = curl_easy_init();curl_easy_setopt(curl, CURLOPT_URL, ji->url.c_str());curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data);curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)ji);curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 3L);curl_easy_setopt(curl, CURLOPT_AUTOREFERER, 1L);curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1L);curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L);curl_easy_setopt(curl, CURLOPT_USERAGENT, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"); //user-agentcurl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1L);//curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);if(ji->ranged)curl_easy_setopt(curl, CURLOPT_RANGE, range);res = curl_easy_perform(curl);if(CURLE_OK != res){printf("[%d] %s\n", res, ji->url.c_str());}curl_easy_cleanup(curl);//TODO: 回馈下载分片的完成情况....string url = ji->url;pthread_mutex_lock(&g_mutex);download_map[url]--;printf("[-1]%s\n", url.c_str());pthread_mutex_unlock(&g_mutex);return NULL;
}//从url列表得到一个相应的下载任务列表
int get_job_queue (vector<string>& urls, vector<JobInfo*>& jobs){FILE* fp;long file_len = 0L, start, stop, seg_num;bool ranged = false;JobInfo* ji;vector<string>::iterator it;for(it = urls.begin(); it != urls.end(); ++it){string url = *it;file_len = get_download_file_length (url.c_str());if(file_len <= 0) continue;printf("[%ld] %s\n", file_len, url.c_str());//对应每个url, 打开一个本地文件const char* fn = basename((char*)url.c_str());string full_path(fn);full_path = "./" + full_path; //设为当前路径下面fp = fopen(full_path.c_str(), "wb"); //在此统一打开文件if(NULL == fp)  continue;//构造关闭文件向量CloseInfo* ci = new CloseInfo(fp, url);closed_vec.push_back(ci);//加入全局任务监听映射int additional = (file_len % seg_size == 0) ? 0 : 1;int seg_total = ranged ? (file_len < seg_size ? 1 : (file_len/seg_size + additional)) : 1;download_map[url] = seg_total;printf("[+1]%s, seg total %d\n", url.c_str(), seg_total);#if HTTP_RANGE//根据文件大小, 确定是否分片?if(file_len < seg_size){start = 0;stop = file_len - 1;ji = new JobInfo();set_JobInfo(ji, fp, start, stop, url, ranged);jobs.push_back(ji);}else{//分片下载,先确定分片个数ranged = true;seg_num = (long)file_len / seg_size;printf("filesize[%ld], segsize[%ld], seg num: %ld\n", file_len, seg_size, seg_num);for(int i = 0; i <= seg_num; i++){if(i < seg_num){start = i * seg_size;stop = (i + 1) * seg_size - 1;}else{if(file_len % seg_size != 0){start = i * seg_size;stop = file_len - 1;}elsebreak;}ji = new JobInfo();set_JobInfo(ji, fp, start, stop, url, ranged);jobs.push_back(ji);}}
#elsestart = 0;stop = file_len - 1;ji = new JobInfo();set_JobInfo(ji, fp, start, stop, url, ranged);jobs.push_back(ji);
#endif}return 0;
}int main(int argc, char* argv[]){vector<string> urls_vec;urls_vec.push_back("http://dlsw.baidu.com/sw-search-sp/soft/da/17519/BaiduYunGuanjia_4.8.3.1409021519.exe");//10M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-ppc64el.template"); //67M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-amd64+mac.template"); //88Murls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-powerpc.template"); //64M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04.1-server-ppc64el.template"); //67M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-desktop-amd64+mac.iso"); //962Murls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-desktop-amd64+mac.iso.zsync"); //1.9M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-amd64+mac.iso"); //566M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-amd64+mac.template");  //88M// urls_vec.push_back("http://cdimage.ubuntu.com/releases/14.04/release/ubuntu-14.04-server-powerpc.iso"); //654M//创建线程越多, cpu占用率越大, 下载效率会有一定提高CThreadPool* pool = new CThreadPool(30);vector<JobInfo*> ji_vec;get_job_queue(urls_vec, ji_vec);vector<JobInfo*>::iterator it;for(it = ji_vec.begin(); it != ji_vec.end(); ++it){JobInfo* ji = *it;pool->pool_add_job(job_process, (void*)ji);}//这个时间仅是一个粗略值,当有线程下载文件未完成时,这个worker线程是不会退出的, master线程会一直等待, 直到所有的worker线程都退出//此处更好的思路是统计所有的任务是否下载完成, 再准备退出//usleep(2000 * 1000 * 1000); //2000map<string, int>::iterator mit;int finished = 0;size_t map_size = download_map.size();printf("map size %ld\n", map_size);while(1){finished = 0;pthread_mutex_lock(&g_mutex);for(mit = download_map.begin(); mit != download_map.end(); ++mit)if(!mit->second)  finished++;pthread_mutex_unlock(&g_mutex);if(finished == map_size) {printf("finish all task =====================>\n");break;}else{usleep(30 * 1000 * 1000);}}download_map.clear();delete pool;urls_vec.clear();//注意所含的jobinfo已经在线程处理完后就删除了,这里不用再单独删除.JobInfo对象析构的时候,会关闭打开的文件.//另外线程池销毁时,会将剩下没有处理的任务删除ji_vec.clear();//打开的文件,要记得全部关闭vector<CloseInfo*>::iterator cit;for(cit = closed_vec.begin(); cit != closed_vec.end(); ++cit){CloseInfo* ci = *cit;if(ci->fp != NULL){fclose(ci->fp);ci->fp = NULL;}printf("[closed] %s\n", ci->url.c_str());delete ci;}closed_vec.clear();return 0;
}

注意需依赖libcurl和lpthread库, 要先安装好libcurl库.另外,固定线程池的源码参见我另一篇博客,这里仅是替换了test_pool.cpp的代码, 改为curl下载文件.

简单C++线程池包装类源码示例

运行截图



后续改进的地方:

1.文件关闭方式

2.个别情况下线程函数下载超时的问题

这篇关于curl多线程大批量分片下载大文件源码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/822843

相关文章

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

Android多线程下载见解

通过for循环开启N个线程,这是多线程,但每次循环都new一个线程肯定很耗内存的。那可以改用线程池来。 就以我个人对多线程下载的理解是开启一个线程后: 1.通过HttpUrlConnection对象获取要下载文件的总长度 2.通过RandomAccessFile流对象在本地创建一个跟远程文件长度一样大小的空文件。 3.通过文件总长度/线程个数=得到每个线程大概要下载的量(线程块大小)。

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

为什么要做Redis分区和分片

Redis分区(Partitioning)和分片(Sharding)是将数据分布在多个Redis实例或多个节点上的做法。这种技术用于提高性能、可扩展性和可用性。以下是执行Redis分区和分片的主要原因: 1. **提高吞吐量**:    - 通过将数据分散到多个节点,可以并行处理更多的操作,从而提高整体吞吐量。 2. **内存限制**:    - 单个Redis实例的内存是有限的。分区允许数据

基于Java医院药品交易系统详细设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W+,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码+数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多的人  Java精品实战案例《600套》 2023-2025年最值得选择的Java毕业设计选题大全:1000个热

MySQL理解-下载-安装

MySQL理解: mysql:是一种关系型数据库管理系统。 下载: 进入官网MySQLhttps://www.mysql.com/  找到download 滑动到最下方:有一个开源社区版的链接地址: 然后就下载完成了 安装: 双击: 一直next 一直next这一步: 一直next到这里: 等待加载完成: 一直下一步到这里

美容美发店营销版微信小程序源码

打造线上生意新篇章 一、引言:微信小程序,开启美容美发行业新纪元 在数字化时代,微信小程序以其便捷、高效的特点,成为了美容美发行业营销的新宠。本文将带您深入了解美容美发营销微信小程序,探讨其独特优势及如何助力商家实现业务增长。 二、微信小程序:美容美发行业的得力助手 拓宽客源渠道:微信小程序基于微信社交平台,轻松实现线上线下融合,帮助商家快速吸引潜在客户,拓宽客源渠道。 提升用户体验:

风水研究会官网源码系统-可展示自己的领域内容-商品售卖等

一款用于展示风水行业,周易测算行业,玄学行业的系统,并支持售卖自己的商品。 整洁大气,非常漂亮,前端内容均可通过后台修改。 大致功能: 支持前端内容通过后端自定义支持开启关闭会员功能,会员等级设置支持对接官方支付支持添加商品类支持添加虚拟下载类支持自定义其他类型字段支持生成虚拟激活卡支持采集其他站点文章支持对接收益广告支持文章评论支持积分功能支持推广功能更多功能,搭建完成自行体验吧! 原文

53、Flink Interval Join 代码示例

1、概述 interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join; interval Join 算子的水位线会取两条流中水位线的最小值; interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准; interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,