如何进行大文件传输?

article/2025/8/18 2:17:09

本文首发微信公众号:码上观世界

网络文件传输的应用场景很多,如网络聊天的点对点传输、文件同步网盘的上传与下载、文件上传到分布式文件存储器等,其传输速度主要受限于网络带宽、存储器大小、CPU处理速度以及磁盘读写速度,尤其是网络带宽。本文主要讨论通常情况下数十GB规模大小的文件传输的优化方式,对于更大规模的文件容量建议考虑人工硬盘运输,毕竟基于公路运输的方式不仅带宽大而且成本低。

文件传输涉及到客户端、中间网络和服务器,常用的传输协议有HTTP(s)、(S)FTP和TCP(UDP)协议等,对于客户端用户来讲,能够起作用的地方不大,所以本文就两种基本的场景来讨论文件传输在客户端的优化方式:基于HTTP协议的非结构化文件传输和基于TCP协议的结构化文件传输。

基于HTTP协议的非结构化文件传输

最常用的文件上传是基于HTTP POST。观察浏览器的请求头数据可知,文件的二进制数据被置于请求body里面,也就是说在上传文件过程中,客户端是一次性将文件内容加载到内存,如果文件过大,浏览器很可能会崩溃,加上HTTP请求连接本身有超时时间限制,所以这种方式不适合传输大文件。

所以一种自然的方式就是手写符合规范的HTTP协议跟服务端通信:

上面的示例代码相比通过浏览器上传文件方式显得自由度更大,但是问题也更多,比如OutputStream将数据写入到PosterOutputStream内部缓冲区,而该缓冲区只有当调用HttpURLConnection的getInputStream方法之后才会发送到Socket流中。所以当文件过大(也许几十MB)就会导致内存溢出,即使通过调用flush方法也无济于事,因为PosterOutputStream的flush方法是空操作,什么都不干!幸运的是HttpURLConnection提供的setFixedLengthStreamingMode方法能够获取到自动刷新流缓存的StreamingOutputStream。虽然这种方式能够解决问题,但是还可能会遇到其他大大小小的坑,而且上述方式还是过于原始,使用Apache HttpClient能够轻易实现上述功能:

HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
RequestConfig requestConfig = RequestConfig.custom().setCookieSpec(CookieSpecs.DEFAULT).build();
CloseableHttpClient httpClient = httpClientBuilder.build();
File file = new File(filePath);
HttpPut httpPut = new HttpPut(url);
FileEntity fileEntity = new FileEntity(file);
httpPut.setEntity(fileEntity);
FileInputStream fileInputStream = new FileInputStream(file);
InputStreamEntity reqEntity = new InputStreamEntity(fileInputStream, file.length());
//post.setEntity(reqEntity);
HttpResponse response = httpClient.execute(httpPut);
String content = EntityUtils.toString(response.getEntity());

示例代码中,HttpClient帮我们封装了协议相关的所有内容。对于文件传输FileEntity 和InputStreamEntity 都可以使用,不同的是,InputStreamEntity 用了流传输的方式,我们需要做的是就是验证这两种方式是否存在文件过大导致的内存溢出问题。先看FileEntity ,直接翻到代码DefaultBHttpClientConnection :

class DefaultBHttpClientConnection extends BHttpConnectionBase{
......
public void sendRequestEntity(final HttpEntityEnclosingRequest request)
throws HttpException, IOException {Args.notNull(request, "HTTP request");ensureOpen();final HttpEntity entity = request.getEntity();if (entity == null) {return;}final OutputStream outstream = prepareOutput(request);entity.writeTo(outstream);outstream.close();}
......
}
class FileEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = new FileInputStream(this.file);try {final byte[] tmp = new byte[OUTPUT_BUFFER_SIZE];int l;while ((l = instream.read(tmp)) != -1) {outstream.write(tmp, 0, l);}outstream.flush();} finally {instream.close();}
}
......
}

再看看InputStreamEntity:

class InputStreamEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = this.content;try {final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];int l;if (this.length < 0) {// consume until EOFwhile ((l = instream.read(buffer)) != -1) {outstream.write(buffer, 0, l);}} else {// consume no more than lengthlong remaining = this.length;while (remaining > 0) {l = instream.read(buffer, 0, (int)Math.min(OUTPUT_BUFFER_SIZE, remaining));if (l == -1) {break;}outstream.write(buffer, 0, l);remaining -= l;}}} finally {instream.close();}
}
......
}

可见FileEntity 和InputStreamEntity使用了相同的outstream,其生成方式为:

class BHttpConnectionBase{
......protected OutputStream prepareOutput(final HttpMessage message) throws HttpException {final long len = this.outgoingContentStrategy.determineLength(message);return createOutputStream(len, this.outbuffer);}protected OutputStream createOutputStream(final long len,final SessionOutputBuffer outbuffer) {if (len == ContentLengthStrategy.CHUNKED) {return new ChunkedOutputStream(2048, outbuffer);} else if (len == ContentLengthStrategy.IDENTITY) {return new IdentityOutputStream(outbuffer);} else {return new ContentLengthOutputStream(outbuffer, len);}}
......
}

这里以ContentLengthOutputStream为例来看数据是如何发送到Socket流中的:

class ContentLengthOutputStream{private final SessionOutputBuffer out;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (this.closed) {throw new IOException("Attempted write to closed stream.");}if (this.total < this.contentLength) {final long max = this.contentLength - this.total;int chunk = len;if (chunk > max) {chunk = (int) max;}this.out.write(b, off, chunk);this.total += chunk;}
}
......
}
class SessionOutputBufferImpl{private OutputStream outstream;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (b == null) {return;}// Do not want to buffer large-ish chunks// if the byte array is larger then MIN_CHUNK_LIMIT// write it directly to the output streamif (len > this.fragementSizeHint || len > this.buffer.capacity()) {// flush the bufferflushBuffer();// write directly to the out streamstreamWrite(b, off, len);this.metrics.incrementBytesTransferred(len);} else {// Do not let the buffer grow unnecessarilyfinal int freecapacity = this.buffer.capacity() - this.buffer.length();if (len > freecapacity) {// flush the bufferflushBuffer();}// bufferthis.buffer.append(b, off, len);}
}
private void flushBuffer() throws IOException {final int len = this.buffer.length();if (len > 0) {streamWrite(this.buffer.buffer(), 0, len);this.buffer.clear();this.metrics.incrementBytesTransferred(len);}
}
private void streamWrite(final byte[] b, final int off, final int len) throws IOException {Asserts.notNull(outstream, "Output stream");this.outstream.write(b, off, len);
}
......
}
class SocketOutputStream {
......public void write(byte b[], int off, int len) throws IOException {socketWrite(b, off, len);}
......
}

通过上面关键代码可见,不管用哪一种Entity,当缓冲区满了就自动flush到Socket,理论上都可以进行大文件传输,只要超时时间允许,两者并没有什么特别的不同。

基于TCP协议的结构化文件传输

基于HTTP协议的文件传输,虽然通过流的方式能解决大文件传输问题,但是基于应用层协议毕竟效率不到,时间消耗仍是个大问题,尽管可以通过文件拆分,并行处理,但需要服务器端的配合才能完成(比如将小文件还原,断点续传等)。这里讨论的多文件传输到分布式系统不需要对服务端再做改造就能直接使用,天然具备并行处理能力。对于结构化文件传输的使用场景多用于数据迁移,比如从数据库系统或者文件系统传输到大数据存储计算平台。这里以将本地的CSV文件上传到HDFS为例,需要解决的是如何对文件拆分。虽然对非结构化,半结构化文件因为涉及到分隔符问题,对于文件拆分有点儿难度,但对规范化格式的文件,问题倒不大,但考虑让问题描述更简洁,这里不考虑文件拆分,只考虑一个文件(比如文件夹下已经拆分后的某个文件)的传输问题。该问题模型可以描述为:

引入Channel是为了解决File和HDFS存取速率不匹配的问题,通过Channel连接File读过程和HDFS写过程:当Channel缓存满的时候,File等待HDFS读取之后再开始写入Channel,HDFS读取之后File再写入Channel,两者通过信号量机制协调,HDFS每次写入都是一个独立的文件。关键代码实现如下:

File端读取数据到Channel:

public void readCSV(String filePath, String fieldDelimiter) {
......BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"), 8192);CsvReader csvReader = new CsvReader(reader);csvReader.setDelimiter(fieldDelimiter.charAt(0));String[] parseRows;while ((parseRows = splitBufferedReader(csvReader)) != null) {//Record 为文件中一行数据记录,由Column组成Record record = createRecord(parseRows);this.buffer.add(record);if (this.buffer.size() >= MemoryChannel.bufferSize) {this.channel.pushAll(this.buffer);this.buffer.clear();}}this.channel.pushAll(this.buffer);this.buffer.clear();
......}
//基于内存的Channel实现
class  MemoryChannel{private ArrayBlockingQueue<Record> queue;private ReentrantLock lock;private Condition notInsufficient, notEmpty;
......//将File读取端将记录push到Channelpublic void pushAll(final Collection<Record> rs) {Validate.notNull(rs);Validate.noNullElements(rs);try {lock.lockInterruptibly();while (!this.queue.isEmpty()) {notInsufficient.await(200L, TimeUnit.MILLISECONDS);}this.queue.addAll(rs);notEmpty.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pushAll", e);} finally {lock.unlock();}}......
}
class HdfsWriteService{
......
public void writeFile(String fieldDelimiter) {FileOutputFormat outFormat = new TextOutputFormat();outFormat.setOutputPath(jobConf, outputPath);outFormat.setWorkOutputPath(jobConf, outputPath);List<Record> recordList= new ArrayList(MemoryChannel.bufferSize);this.channel.pullAll(recordList);RecordWriter writer = outFormat.getRecordWriter(fileSystem, jobConf, outputPath.toString(), Reporter.NULL);for (Record record : recordList) {//将Record记录组装成HDFS的TEXT行记录,列分隔符可自定义Text recordResult = new Text(StringUtils.join(mergeColumn(record), fieldDelimiter));writer.write(NullWritable.get(), recordResult);}writer.close(Reporter.NULL);
}
......
//基于内存的Channel实现
class  MemoryChannel{
......//HDFS写入端从Channel中Pull记录public void pullAll(Collection<Record> rs) {assert rs != null;rs.clear();try {lock.lockInterruptibly();while (this.queue.drainTo(rs, bufferSize) <= 0) {notEmpty.await(200L, TimeUnit.MILLISECONDS);}notInsufficient.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pullAll", e);} finally {lock.unlock();}
}
......
}

上述方式是实现多文件并行传输的基础,每个独立Channel的传输过程互不影响,即使当前Chanel过程失败,也可以独立重跑恢复。

END


http://chatgpt.dhexx.cn/article/R2SviNlE.shtml

相关文章

大文件分片上传

前言 前端进行文件大小分割 &#xff0c;按10M分割进行分片上传&#xff0c;使用的项目还是前面文档介绍的poi同一个项目 另一篇poi导出文章,使用的同一个项目 poi的使用和工具类&#xff08;一&#xff09; 开发 1、maven依赖 <!--文件分片上传使用到依赖 start --&g…

HTTP传输大文件

一 概述 早期网络传输的文件非常小&#xff0c;只是一些几K大小的文本和图片&#xff0c;随着网络技术的发展&#xff0c;传输的不仅有几M的图片&#xff0c;还有可以达到几G和几十G的视频。 在这些大文件传输的情况下&#xff0c;100M的光纤或者4G移动网络都会因为网络压力导致…

使用python读取大文件

读取文件时&#xff0c;如果文件过大&#xff0c;则一次读取全部内容到内存&#xff0c;容易造成内存不足&#xff0c;所以要对大文件进行批量的读取内容。 python读取大文件通常两种方法&#xff1a;第一种是利用yield生成器读取&#xff1b;第二种是&#xff1a;利用open()自…

前端必学 - 大文件上传如何实现

前端必学 - 大文件上传如何实现 写在前面问题分析开始操作一、文件如何切片二、得到原文件的hash值三、文件上传四、文件合并 技术点总结【重要】一、上传文件&#xff1f;二、显示进度三、暂停上传四、Hash有优化空间吗&#xff1f;五、限制请求个数六、拥塞控制&#xff0c;动…

Linux如何快速生成大文件

微信搜索&#xff1a;“二十同学” 公众号&#xff0c;欢迎关注一条不一样的成长之路 dd命令 dd if/dev/zero offile bs1M count20000 会生成一个20G的file 文件&#xff0c;文件内容为全0&#xff08;因从/dev/zero中读取&#xff0c;/dev/zero为0源&#xff09;。 此命令可…

java 处理大文件

目的&#xff1a; 前几天在开发过程中遇到一个需求: 读取一个大约5G的csv文件内容&#xff0c;将其转化为对象然后存储到redis中, 想着直接开大内存直接load 进入到内存中就行了&#xff0c;结果可想而知,5G的文件 &#xff0c;Xmx 开到10G都没有解决&#xff0c;直接out of Me…

5、Linux:如何将大文件切割成多份小文件

最近&#xff0c;在做数据文件的导入操作时&#xff0c;发现有些文本文件太大了&#xff0c;需要将这样的大文件切分成多个小文件进行操作。那么&#xff0c;Linux 中如何将大文件切割成许多的小文件呢&#xff1f;在此记录一下。 Linux 提供了 split 命令可以轻松实现大文件的…

大文件传输有哪些方式可用?大文件传输有哪些方式?

大文件传输有哪些方式可用&#xff1f;大文件传输有哪些方式&#xff1f;互联网时代&#xff0c;速度决定效率。在企业生产过程中需要进行信息数据交换、搬运。这时就需要进行大文件传输。方方面面的行业都要涉及到大文件传输。例如影视行业需要每天进行视频素材的传输&#xf…

简道云-第5章-流程

title: 简道云-第5章-流程 date: 2022-06-13 22:21:29 tags: 简道云 categories: 简道云 简道云-第5章-流程 背景介绍 简道云三个基本项目表单、流程以及仪表。关于它们的介绍可以参照官方文档表单 vs 流程表单 vs 仪表盘。 「流程表单」&#xff1a;填报数据&#xff0c;并带…

阿里云【达摩院特别版·趣味视觉AI训练营】笔记2

阿里云【趣味视觉AI训练营】笔记2 一、笔记说明二、正文2.1 人体分割实验2.2 图像人脸融合实验 三、转载说明 一、笔记说明 本博客专栏《阿里云【达摩院特别版趣味视觉AI训练营】》的所有文章均为趣味视觉AI训练营的学习笔记&#xff0c;当前【达摩院特别版趣味视觉AI训练营】…

笔记本简单使用eNSP的云连接外网

文章目录 前言一、连接拓扑图二、配置cloud 三、配置pc测试是否能连接外网 前言 很多时候ping不通的原因不是网卡问题&#xff0c;而是配置没有设置好 一、连接拓扑图 二、配置cloud 绑定信息为UDP然后点击增加 绑定信息 笔记本电脑可以选择WiFi-ip&#xff0c;有本地连接可以…

头歌-信息安全技术-用Python实现自己的区块链、支持以太坊的云笔记服务器端开发、编写并测试用于保存云笔记的智能合约、支持以太坊的云笔记小程序开发基础

头歌-信息安全技术-用Python实现自己的区块链、支持以太坊的云笔记服务器端开发、编写并测试用于保存云笔记的智能合约、支持以太坊的云笔记小程序开发基础 一、用Python实现自己的区块链1、任务描述2、评测步骤(1)打开终端&#xff0c;输入两行代码即可评测通过 二、支持以太坊…

华为云HCS解决方案笔记HUAWEI CLOUD Stack【面试篇】

目录 HCS方案 一、定义 1、特点 2、优点 二、云服务 1、云管理 2、存储服务 3、网络服务 4、计算服务 5、安全服务 6、灾备服务 7、容器服务 三、应用场景 四、HCS功能层 五、OpenStack网络平面规划 六、ManageOne运维面 1、首页 2、集中监控 3、资源拓扑 …

关于玄武集团MOS云平台的使用笔记

对于该平台感兴趣的可以自己下载开发文档看一下&#xff0c;附上地址: https://download.csdn.net/download/qq_39380192/11182359 1、根据开发手册&#xff0c;MOS云平台给用户提供了关于各种通信服务的接口&#xff0c;用户可以通过调用相关的接口来实现一下几点功能&#x…

巧用git commit搭建云笔记+历史记录本

一、整理笔记的必要性 长期学习过程中&#xff0c;我发现人脑并不擅长记忆&#xff0c;它更擅长思考问题。程序员每天都要学习很多知识&#xff0c;学得快&#xff0c;忘得快很正常。很多东西并不需要记住&#xff0c;况且知识那么多&#xff0c;怎么可能全部记住&#xff1f;…

Aliyun 学习笔记(二)阿里云物联网平台介绍

文章目录 1 阿里云物联网平台1.1 设备接入1.2 设备管理1.3 安全能力1.4 规则引擎 1 阿里云物联网平台 根据阿里云物联网平台文档可以了解到所有有关阿里云物联网平台的介绍。 阿里云物联网平台为设备提供安全可靠的连接通信能力&#xff0c;向下连接海量设备&#xff0c;支撑…

《没道云笔记》开发手记

基本配置 Client&#xff1a;Android Servlet&#xff1a;SAE&#xff08;PHPMySQLStorage&#xff09; Period&#xff1a;2 weeks 项目分析 1.Model: Article.class{int id;String username;String title;String time;String content;} Bean.calss{int[] ids;String u…

《物联网 - 机智云开发笔记》第2章 设备驱动开发

开发板&#xff1a;GoKit3开发板&#xff08;STM32F103&#xff09; 在上一章节&#xff0c;笔者带领大家已经将机智云平台玩起来&#xff0c;本节内容讲带领大家经进一步开发。 在开始讲解之前&#xff0c;有必要先了解的机智云的平台架构。 从上面的架构图可以看到&#xf…

云笔记的使用感受和选择

市场上有很多文章针对云笔记的选择&#xff0c;但经过下载发现可能存在很多虚假广告【求生欲&#xff1a;其实可能是个人使用感受不佳仅表示个人观点】。 为什么选择云笔记 个人比较喜欢(❤ ω ❤)记录学习笔记和生活中的东西。之前选择有道云笔记&#xff0c;但因为最近打开…

基于分布式的云笔记实现(参考某道云笔记)

注&#xff1a; 1&#xff09;云笔记代码可在github上下载&#xff0c;如果对您有用&#xff0c;记得star一下。 2&#xff09;依赖jar包可在以下地址下载jar包&#xff0c;密码&#xff1a;yvkj&#xff0c;放到web/lib下即可 3&#xff09;hdfs配置参考网址 4&#xff09…