Потоковое потоковое вещание AKKA с фьючерсами

У меня есть большой видеофайл, заложенный в таблице Кассандры. Я пытаюсь передать его обратно клиенту API с помощью Sourceпотоковой передачи.

Мой код обслуживания выглядит ниже,

def getShards(id: String, shards: Int) = {
  def getShardsInternal(shardNo: Int, shards: Future[Array[Byte]]): Future[Array[Byte]] = {
    if (shardNo == 0) shards
    else getShardsInternal(shardNo - 1, shards.flatMap(x => Database.ShardModel.find(id, shardNo)))
  }
  getShardsInternal(shards, Future.successful(Array()))
}

На моем маршруте HTTP AKKA я пытаюсь построить a Sourceиз возвращенного будущего, как показано ниже,

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        val f = mediaService.getMetadata(id).flatMap { x =>
          mediaService.getShards(id, x.shards)
        }
        Source.fromFuture(f)
      }
    }
  }
}

Я не уверен, как это Source.fromFutureделается для ответа. Будущее, которое передается, представляет собой, по существу, ряд фьючерсов с плоскими картами, которые, как ожидается, будут выполняться последовательно. Тем не менее, я не верю, что это вернется назад, как поток chteed byte обратно к клиенту.

Любые указатели на это будут высоко оценены.

РЕДАКТИРОВАТЬ 1 Я пытался сузить это дальше следующим:

get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        Source.fromFuture {
          Future.successful("Hello".getBytes()).flatMap(x => Future.successful("World".getBytes()))
        }
      }
    }
  }
}

Я ожидал, что это вернется

[72,101,108,108,111,32,87,111,114,108,100]

Тем не менее, я получаю только результат последнего будущего, как показано ниже,

[[87,111,114,108,100]]

С уважением Meeraj

scala,akka,akka-stream,akka-http,

1

Ответов: 1


Преобразуйте Source[Array[Byte], NotUsed]в a Source[ByteString, NotUsed]и используйте HttpEntityс ContentTypes:

application/octet-stream

Здесь я использую ContentType.Binaryв качестве примера. Поскольку вы передаете видео, вам может потребоваться использовать complete(HttpEntity(ContentType.Binary(MediaTypes.`video/mpeg`), source)) соответствующий тип медиа . Например:

getShards

Обращаясь к вашему комментарию и обновлению, кажется, что вы хотите объединить результаты фьючерсов в flatMap: как вы обнаружили, Future.reduceLeftне делает этого. Используйте def getShards(id: String, shards: Int): Future[Array[Byte]] = { val futures = (1 to shards).map(Database.ShardModel.find(id, _)) Future.reduceLeft(futures)(_ ++ _) } вместо этого:

getShards

В качестве альтернативы вместо объединения результатов в один массив вы можете переопределить Future[List[Array[Byte]]]для возврата a Source, а затем создать Sourceиспользование flatMapConcat:

def getShards(id: String, shards: Int): Future[List[Array[Byte]]] = {
  val futures = (1 to shards).map(Database.ShardModel.find(id, _)).toList
  Future.sequence(futures)
}

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      val f = mediaService.getMetadata(id).flatMap { x =>
        mediaService.getShards(id, x.shards)
      }
      val source =
        Source.fromFuture(f)
              .flatMapConcat(Source.apply)
              .map(ByteString.apply)
      complete(HttpEntity(/* a content type */, source))
    }
  }
}
Скала, Акка, Акка-поток, Акка-клиент,
Похожие вопросы
Яндекс.Метрика