Akka(39): Http:File streaming-文件交换

2024-04-09 04:48
文章标签 http 交换 file 39 akka streaming

本文主要是介绍Akka(39): Http:File streaming-文件交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表行的上传下载。Akka-http的数据交换模式支持流式操作:代表交换数据可以是一种无限长度流的元素。这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦。更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。

 任何文件的内容储存格式无论在硬盘、内存或者数据线上都是一堆bytes。文件交换流程包括读取文件里的bytes,传送这些bytes,最终把这些bytes写入文件。我们看到这里每个环节操作目标都是bytes,所以可能在程序里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:

  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =fromPath(f, chunkSize, startPosition = 0)def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =toPath(f, options, startPosition = 0)def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

我们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下:

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}

fileStream是Source[ByteString,_]可以直接放进Entity:

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/A4.TIF",256))

我们把fileStream放入了HttpRequest中。对于HttpResponse可以用下面的方式:

 val route = pathPrefix("file") {(get & path("text" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:

          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }

上面我们提过FileIO.toPath就是一个Sink。由于我们的目的是大型的文件交换,所以无论上传下载都使用了withoutSizeLimit:

 val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}

好了下面的示范代码里对字符型或二进制文件都进行了交换的示范操作:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._object FileServer extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherdef fileStream(filePath: String, chunkSize: Int) = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._object FileClient extends App {implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherdef downloadFileTo(request: HttpRequest, destPath: String) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download file!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}val dlFile = "Downloads/readme.txt"val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")scala.io.StdIn.readLine()val dlFile2 = "Downloads/image.png"val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")scala.io.StdIn.readLine()def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/readme.txt",256))uploadFile(uploadText,textData)scala.io.StdIn.readLine()sys.terminate()}






这篇关于Akka(39): Http:File streaming-文件交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887207

相关文章

解决JavaWeb-file.isDirectory()遇到的坑问题

《解决JavaWeb-file.isDirectory()遇到的坑问题》JavaWeb开发中,使用`file.isDirectory()`判断路径是否为文件夹时,需要特别注意:该方法只能判断已存在的文... 目录Jahttp://www.chinasem.cnvaWeb-file.isDirectory()遇

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

VMWare报错“指定的文件不是虚拟磁盘“或“The file specified is not a virtual disk”问题

《VMWare报错“指定的文件不是虚拟磁盘“或“Thefilespecifiedisnotavirtualdisk”问题》文章描述了如何修复VMware虚拟机中出现的“指定的文件不是虚拟... 目录VMWare报错“指定的文件不是虚拟磁盘“或“The file specified is not a virt

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Python如何实现 HTTP echo 服务器

《Python如何实现HTTPecho服务器》本文介绍了如何使用Python实现一个简单的HTTPecho服务器,该服务器支持GET和POST请求,并返回JSON格式的响应,GET请求返回请求路... 一个用来做测试的简单的 HTTP echo 服务器。from http.server import HT

提示:Decompiled.class file,bytecode version如何解决

《提示:Decompiled.classfile,bytecodeversion如何解决》在处理Decompiled.classfile和bytecodeversion问题时,通过修改Maven配... 目录问题原因总结问题1、提示:Decompiled .class file,China编程 bytecode

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

《数据结构(C语言版)第二版》第八章-排序(8.3-交换排序、8.4-选择排序)

8.3 交换排序 8.3.1 冒泡排序 【算法特点】 (1) 稳定排序。 (2) 可用于链式存储结构。 (3) 移动记录次数较多,算法平均时间性能比直接插入排序差。当初始记录无序,n较大时, 此算法不宜采用。 #include <stdio.h>#include <stdlib.h>#define MAXSIZE 26typedef int KeyType;typedef char In

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而