Akka(37): Http:客户端操作模式

来源:互联网 时间:2017-11-22

   Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

 (for {

response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))

message <- Unmarshal(response.entity).to[String]

} yield message).andThen {

case Success(msg) => println(s"Received message: $msg")

case Failure(err) => println(s"Error: ${err.getMessage}")

}.andThen {case _ => sys.terminate()}

这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

 (for {

entity <- Marshal("Wata hell you doing?").to[RequestEntity]

response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))

message <- Unmarshal(response.entity).to[String]

} yield message).andThen {

case Success(msg) => println(s"Received message: $msg")

case Failure(err) => println(s"Error: ${err.getMessage}")

}.andThen {case _ => sys.terminate()}

以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

 case class Item(id: Int, name: String, price: Double)

def getItem(itemId: Int): Future[HttpResponse] = for {

response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))

} yield response

def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {

futResp.andThen {

case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>

Unmarshal(entity).to[T]

.onComplete {

case Success(t) => println(s"Got response entity: ${t}")

case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")

}

case Success(_) => println("Exception in response!")

case Failure(err) => println(s"Response Failed: ${err.getMessage}")

}

}

extractEntity[Item](getItem(13))

现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

 def putItem(item: Item): Future[HttpResponse] =

for {

reqEntity <- Marshal(item).to[RequestEntity]

response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))

} yield response

extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))

.andThen { case _ => sys.terminate()}

当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport

import org.json4s.jackson

...

trait JsonCodec extends Json4sSupport {

import org.json4s.DefaultFormats

import org.json4s.ext.JodaTimeSerializers

implicit val serilizer = jackson.Serialization

implicit val formats = DefaultFormats ++ JodaTimeSerializers.all

}

object JsConverters extends JsonCodec

...

import JsConverters._

implicit val jsonStreamingSupport = EntityStreamingSupport.json()

.withParallelMarshalling(parallelism = 8, unordered = false)

如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)

(implicit sys: ActorSystem, mat: ActorMaterializer) {

import sys.dispatcher

private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =

Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)

//单一request

def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {

Source.single(req -> 1)

.via(cnnPool)

.runWith(Sink.head).flatMap {

case (Success(resp), _) => Future.successful(resp)

case (Failure(fail), _) => Future.failed(fail)

}

}

//组串request

def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {

Source(reqs.zipWithIndex.toMap)

.via(cnnPool)

.runFold(SortedMap[Int, Future[HttpResponse]]()) {

case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))

case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))

}.flatMap { m => Future.sequence(m.values) }

}

}

下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)

(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)

(implicit sys: ActorSystem, mat: ActorMaterializer) {

import sys.dispatcher

private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =

Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)

val queue =

Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)

.via(cnnPool)

.to(Sink.foreach({

case ((Success(resp), p)) => p.success(resp)

case ((Failure(e), p)) => p.failure(e)

})).run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {

val responsePromise = Promise[HttpResponse]()

queue.offer(request -> responsePromise).flatMap {

case QueueOfferResult.Enqueued => responsePromise.future

case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))

case QueueOfferResult.Failure(ex) => Future.failed(ex)

case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))

}

}

}

下面是这些工具函数的具体使用示范:

 val settings = ConnectionPoolSettings(sys)

.withMaxConnections(8)

.withMaxOpenRequests(8)

.withMaxRetries(3)

.withPipeliningLimit(4)

val pooledClient = new PooledClient("localhost",8011,settings)

def getItemByPool(itemId: Int): Future[HttpResponse] = for {

response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))

} yield response

extractEntity[Item](getItemByPool(13))

def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {

val reqs = itemIds.map { id =>

HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")

}

val rets = (for {

responses <- pooledClient.orderedResponses(reqs)

} yield responses)

rets

}

val futResps = getItemsByPool(List(3,5,7))

futResps.andThen {

case Success(listOfResps) => {

listOfResps.foreach { r =>

r match {

case HttpResponse(StatusCodes.OK, _, entity, _) =>

Unmarshal(entity).to[Item]

.onComplete {

case Success(t) => println(s"Got response entity: ${t}")

case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")

}

case _ => println("Exception in response!")

}

}

}

case _ => println("Failed to get list of responses!")

}

val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()

def putItemByQueue(item: Item): Future[HttpResponse] =

for {

reqEntity <- Marshal(item).to[RequestEntity]

response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))

} yield response

extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))

.andThen { case _ => sys.terminate()}

下面是本次讨论的示范源代码:

服务端代码:

import akka.actor._

import akka.stream._

import akka.http.scaladsl.Http

import akka.http.scaladsl.server.Directives._

import de.heikoseeberger.akkahttpjson4s.Json4sSupport

import org.json4s.jackson

trait JsonCodec extends Json4sSupport {

import org.json4s.DefaultFormats

import org.json4s.ext.JodaTimeSerializers

implicit val serilizer = jackson.Serialization

implicit val formats = DefaultFormats ++ JodaTimeSerializers.all

}

object JsConverters extends JsonCodec

object TestServer extends App with JsonCodec {

implicit val httpSys = ActorSystem("httpSystem")

implicit val httpMat = ActorMaterializer()

implicit val httpEC = httpSys.dispatcher

import JsConverters._

case class Item(id: Int, name: String, price: Double)

val messages = path("message") {

get {

complete("hello, how are you?")

} ~

put {

entity(as[String]) {msg =>

complete(msg)

}

}

}

val items =

(path("item" / IntNumber) & get) { id =>

get {

complete(Item(id, s"item#$id", id * 2.0))

}

} ~

(path("item") & put) {

entity(as[Item]) {item =>

complete(item)

}

}

val route = messages ~ items

val (host, port) = ("localhost", 8011)

val bindingFuture = Http().bindAndHandle(route,host,port)

println(s"Server running at $host $port. Press any key to exit ...")

scala.io.StdIn.readLine()

bindingFuture.flatMap(_.unbind())

.onComplete(_ => httpSys.terminate())

}

客户端源代码: 

import akka.actor._

import akka.http.scaladsl.settings.ConnectionPoolSettings

import akka.stream._

import akka.stream.scaladsl._

import akka.http.scaladsl.Http

import akka.http.scaladsl.model._

import scala.util._

import de.heikoseeberger.akkahttpjson4s.Json4sSupport

import org.json4s.jackson

import scala.concurrent._

import akka.http.scaladsl.unmarshalling.Unmarshal

import akka.http.scaladsl.unmarshalling._

import akka.http.scaladsl.marshalling.Marshal

import scala.collection.SortedMap

import akka.http.scaladsl.common._

trait JsonCodec extends Json4sSupport {

import org.json4s.DefaultFormats

import org.json4s.ext.JodaTimeSerializers

implicit val serilizer = jackson.Serialization

implicit val formats = DefaultFormats ++ JodaTimeSerializers.all

}

object JsConverters extends JsonCodec

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)

(implicit sys: ActorSystem, mat: ActorMaterializer) {

import sys.dispatcher

private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =

Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)

def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {

Source.single(req -> 1)

.via(cnnPool)

.runWith(Sink.head).flatMap {

case (Success(resp), _) => Future.successful(resp)

case (Failure(fail), _) => Future.failed(fail)

}

}

def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {

Source(reqs.zipWithIndex.toMap)

.via(cnnPool)

.runFold(SortedMap[Int, Future[HttpResponse]]()) {

case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))

case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))

}.flatMap { m => Future.sequence(m.values) }

}

}

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)

(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)

(implicit sys: ActorSystem, mat: ActorMaterializer) {

import sys.dispatcher

private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =

Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)

val queue =

Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)

.via(cnnPool)

.to(Sink.foreach({

case ((Success(resp), p)) => p.success(resp)

case ((Failure(e), p)) => p.failure(e)

})).run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {

val responsePromise = Promise[HttpResponse]()

queue.offer(request -> responsePromise).flatMap {

case QueueOfferResult.Enqueued => responsePromise.future

case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))

case QueueOfferResult.Failure(ex) => Future.failed(ex)

case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))

}

}

}

object ClientRequesting extends App {

import JsConverters._

implicit val sys = ActorSystem("sysClient")

implicit val mat = ActorMaterializer()

implicit val ec = sys.dispatcher

implicit val jsonStreamingSupport = EntityStreamingSupport.json()

.withParallelMarshalling(parallelism = 8, unordered = false)

case class Item(id: Int, name: String, price: Double)

def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {

futResp.andThen {

case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>

Unmarshal(entity).to[T]

.onComplete {

case Success(t) => println(s"Got response entity: ${t}")

case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")

}

case Success(_) => println("Exception in response!")

case Failure(err) => println(s"Response Failed: ${err.getMessage}")

}

}

(for {

response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))

message <- Unmarshal(response.entity).to[String]

} yield message).andThen {

case Success(msg) => println(s"Received message: $msg")

case Failure(err) => println(s"Error: ${err.getMessage}")

} //.andThen {case _ => sys.terminate()}

(for {

entity <- Marshal("Wata hell you doing?").to[RequestEntity]

response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))

message <- Unmarshal(response.entity).to[String]

} yield message).andThen {

case Success(msg) => println(s"Received message: $msg")

case Failure(err) => println(s"Error: ${err.getMessage}")

} //.andThen {case _ => sys.terminate()}

def getItem(itemId: Int): Future[HttpResponse] = for {

response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))

} yield response

extractEntity[Item](getItem(13))

def putItem(item: Item): Future[HttpResponse] =

for {

reqEntity <- Marshal(item).to[RequestEntity]

response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))

} yield response

extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))

.andThen { case _ => sys.terminate()}

val settings = ConnectionPoolSettings(sys)

.withMaxConnections(8)

.withMaxOpenRequests(8)

.withMaxRetries(3)

.withPipeliningLimit(4)

val pooledClient = new PooledClient("localhost",8011,settings)

def getItemByPool(itemId: Int): Future[HttpResponse] = for {

response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))

} yield response

extractEntity[Item](getItemByPool(13))

def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {

val reqs = itemIds.map { id =>

HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")

}

val rets = (for {

responses <- pooledClient.orderedResponses(reqs)

} yield responses)

rets

}

val futResps = getItemsByPool(List(3,5,7))

futResps.andThen {

case Success(listOfResps) => {

listOfResps.foreach { r =>

r match {

case HttpResponse(StatusCodes.OK, _, entity, _) =>

Unmarshal(entity).to[Item]

.onComplete {

case Success(t) => println(s"Got response entity: ${t}")

case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")

}

case _ => println("Exception in response!")

}

}

}

case _ => println("Failed to get list of responses!")

}

val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()

def putItemByQueue(item: Item): Future[HttpResponse] =

for {

reqEntity <- Marshal(item).to[RequestEntity]

response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))

} yield response

extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))

.andThen { case _ => sys.terminate()}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关阅读:
Top