Akka(39): Http:File streaming-文件交换

2024-04-09 04:48
文章标签 http 交换 file 39 akka streaming

本文主要是介绍Akka(39): Http:File streaming-文件交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表行的上传下载。Akka-http的数据交换模式支持流式操作:代表交换数据可以是一种无限长度流的元素。这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦。更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。

 任何文件的内容储存格式无论在硬盘、内存或者数据线上都是一堆bytes。文件交换流程包括读取文件里的bytes,传送这些bytes,最终把这些bytes写入文件。我们看到这里每个环节操作目标都是bytes,所以可能在程序里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:

  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =fromPath(f, chunkSize, startPosition = 0)def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =toPath(f, options, startPosition = 0)def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

我们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下:

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}

fileStream是Source[ByteString,_]可以直接放进Entity:

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/A4.TIF",256))

我们把fileStream放入了HttpRequest中。对于HttpResponse可以用下面的方式:

 val route = pathPrefix("file") {(get & path("text" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:

          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }

上面我们提过FileIO.toPath就是一个Sink。由于我们的目的是大型的文件交换,所以无论上传下载都使用了withoutSizeLimit:

 val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}

好了下面的示范代码里对字符型或二进制文件都进行了交换的示范操作:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._object FileServer extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherdef fileStream(filePath: String, chunkSize: Int) = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._object FileClient extends App {implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherdef downloadFileTo(request: HttpRequest, destPath: String) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download file!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}val dlFile = "Downloads/readme.txt"val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")scala.io.StdIn.readLine()val dlFile2 = "Downloads/image.png"val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")scala.io.StdIn.readLine()def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/readme.txt",256))uploadFile(uploadText,textData)scala.io.StdIn.readLine()sys.terminate()}






这篇关于Akka(39): Http:File streaming-文件交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887207

相关文章

在java中如何将inputStream对象转换为File对象(不生成本地文件)

《在java中如何将inputStream对象转换为File对象(不生成本地文件)》:本文主要介绍在java中如何将inputStream对象转换为File对象(不生成本地文件),具有很好的参考价... 目录需求说明问题解决总结需求说明在后端中通过POI生成Excel文件流,将输出流(outputStre

C语言实现两个变量值交换的三种方式

《C语言实现两个变量值交换的三种方式》两个变量值的交换是编程中最常见的问题之一,以下将介绍三种变量的交换方式,其中第一种方式是最常用也是最实用的,后两种方式一般只在特殊限制下使用,需要的朋友可以参考下... 目录1.使用临时变量(推荐)2.相加和相减的方式(值较大时可能丢失数据)3.按位异或运算1.使用临时

使用C语言实现交换整数的奇数位和偶数位

《使用C语言实现交换整数的奇数位和偶数位》在C语言中,要交换一个整数的二进制位中的奇数位和偶数位,重点需要理解位操作,当我们谈论二进制位的奇数位和偶数位时,我们是指从右到左数的位置,本文给大家介绍了使... 目录一、问题描述二、解决思路三、函数实现四、宏实现五、总结一、问题描述使用C语言代码实现:将一个整

Go语言中最便捷的http请求包resty的使用详解

《Go语言中最便捷的http请求包resty的使用详解》go语言虽然自身就有net/http包,但是说实话用起来没那么好用,resty包是go语言中一个非常受欢迎的http请求处理包,下面我们一起来学... 目录安装一、一个简单的get二、带查询参数三、设置请求头、body四、设置表单数据五、处理响应六、超

Java实现将byte[]转换为File对象

《Java实现将byte[]转换为File对象》这篇文章将通过一个简单的例子为大家演示Java如何实现byte[]转换为File对象,并将其上传到外部服务器,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言1. 问题背景2. 环境准备3. 实现步骤3.1 从 URL 获取图片字节数据3.2 将字节数组

如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件

《如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件》本文介绍了如何使用Docker部署FTP服务器和Nginx,并通过HTTP访问FTP中的文件,通过将FTP数据目录挂载到N... 目录docker部署FTP和Nginx并通过HTTP访问FTP里的文件1. 部署 FTP 服务器 (

Qt实现发送HTTP请求的示例详解

《Qt实现发送HTTP请求的示例详解》这篇文章主要为大家详细介绍了如何通过Qt实现发送HTTP请求,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、添加network模块2、包含改头文件3、创建网络访问管理器4、创建接口5、创建网络请求对象6、创建一个回复对

springMVC返回Http响应的实现

《springMVC返回Http响应的实现》本文主要介绍了在SpringBoot中使用@Controller、@ResponseBody和@RestController注解进行HTTP响应返回的方法,... 目录一、返回页面二、@Controller和@ResponseBody与RestController

解决JavaWeb-file.isDirectory()遇到的坑问题

《解决JavaWeb-file.isDirectory()遇到的坑问题》JavaWeb开发中,使用`file.isDirectory()`判断路径是否为文件夹时,需要特别注意:该方法只能判断已存在的文... 目录Jahttp://www.chinasem.cnvaWeb-file.isDirectory()遇

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一