Лучший опыт

Изучаем Rust. Потоковая передача tar-архива.

Асинхронный поток для вас загадка? Чтобы полностью его освоить, напишите собственный. Асинхронное программирование стало доступным большинству разработчиков благодаря парадигме async/await. Код похож на классический блокирующий поток из-за создания компилятором сложного конечного автомата. Слоем ниже в Rust используются опрашивание, будильщики, фиксация. Этими компонентами не применяется синтаксис async/await, но при этом в полном объеме о
Изучаем Rust. Потоковая передача tar-архива...

Асинхронный поток для вас загадка? Чтобы полностью его освоить, напишите собственный.

Асинхронное программирование стало доступным большинству разработчиков благодаря парадигме async/await. Код похож на классический блокирующий поток из-за создания компилятором сложного конечного автомата.

Слоем ниже в Rust используются опрашивание, будильщики, фиксация. Этими компонентами не применяется синтаксис async/await, но при этом в полном объеме обеспечивается неблокирующая конкурентность. Отсутствие сгенерированного конечного автомата компенсируется написанием собственного, увеличением производительности, полным контролем над рабочим кодом.

Один из примеров, когда хочется и нужно быть на уровне опроса,  —  асинхронные потоки. Идеально интегрируясь в мир async/await, они остаются настоящей неблокирующей реализацией в фоновом режиме без больших накладных расходов.

Напишем поток фрагментов двоичного файла для исходящего tar-архива, создадим сложный конечный автомат, попробуем продемонстрировать итоговый результат и интегрировать его в процесс загрузки файлов в запущенный контейнер с API среды выполнения Docker.

Асинхронные потоки очень похожи на обычные итераторы, которыми часто проходят коллекции. Например, напишем обычный цикл for для идиоматического прохождения коллекции:

fn main() {
let numbers = vec![1, 2, 3, 4, 5];

for number in numbers.iter() {
println!("Number: {}", number);
}
}

Фактически это эквивалентно такому синтаксису, где временно созданный итератор  —  изменяемая переменная:

fn main() {
let numbers = vec![1, 2, 3, 4, 5];
let mut iter = numbers.iter();

while let Some(number) = iter.next() {
println!("Number: {}", number);
}
}

Проходимся по вектору, поэтому супербыстро. Следующей функцией не выполняется никаких операций ввода-вывода. А что, если итератором выдаются числа или сетевые подключения? Чтобы воспользоваться async/await, изменим пару строк:

#[tokio::main]
async fn main() {
let mut numbers = fetch_numbers();

while let Some(number) = numbers.next().await {
println!("{}", number);
}
}

Практически обычный итератор, на самом же деле это асинхронный поток.

Посмотрим, как появляются еще не реализованные типажи:

pub trait Iterator {
type Item;

fn next(&mut self) -> Option<Self::Item>;
}

pub trait Stream {
type Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}

Сходство очевидно: и там и тут определяется возвращаемый тип элемента, сопоставляемый в цикле while, требуется код для указания в следующей функции следующего выдаваемого элемента.

Что же такое tar и как он связан с асинхронными потоками на Rust? tar  —  способ объединения файлов и каталогов в один файл. Этот файловый формат тесно связан с историей ленточных накопителей, бывших на заре компьютерных вычислений одной из форм хранения данных. Он предназначен для ленточного запоминающего устройства, а название  —  сокращение от Tape ARchive («Ленточный архив»).

Чтобы создать tar-архив, обрабатывается несколько файлов, для каждого из которых создается заголовок в 512 байт. Затем добавляется содержимое файла с дополнением, если необходимо, в 512 байт. Много файлов, значит, много операций ввода-вывода. Где, как не здесь, требуются асинхронные потоки?

Начнем с простого tar-заголовка ровно в 512 байт, которым кодируется много информации о едином файле Cargo.toml:

00000000: 4361 7267 6f2e 746f 6d6c 0000 0000 0000  Cargo.toml......
00000010: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000020: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000030: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000040: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000050: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000060: 0000 0000 3030 3030 3634 3400 3030 3031 ....0000644.0001
00000070: 3735 3000 3030 3031 3735 3000 3030 3030 750.0001750.0000
00000080: 3030 3030 3037 3000 3134 3537 3136 3533 0000070.14571653
00000090: 3534 3000 3031 3233 3435 0020 3000 0000 540.012345. 0...
000000a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000000b0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000000c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000000d0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000000e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000000f0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000100: 0075 7374 6172 2020 0076 7363 6f64 6500 .ustar .vscode.
00000110: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000120: 0000 0000 0000 0000 0076 7363 6f64 6500 .........vscode.
00000130: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000140: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000150: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000160: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000170: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000180: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00000190: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001b0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001d0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
000001f0: 0000 0000 0000 0000 0000 0000 0000 0000 ................

Глядя в документацию, попробуем расшифровать закодированное:

  • Название файла: Cargo.toml.
  • Разрешения файла: 0000644, закодировано как восьмеричная строка.
  • Пользователь и группа: 0001750, закодировано как восьмеричная строка.
  • Размер файла: 0000070, закодировано как восьмеричная строка -> 56 байт.
  • Временнáя метка файла: 14571653540, закодировано как восьмеричная строка -> 1709660000.
  • Контрольная сумма заголовка: 012345, закодировано как восьмеричная строка.

Другие, необязательные данные файла при генерировании заголовка игнорируем.

Напишем базовый код Rust. Сначала определяем структуры и пример их использования без фактической реализации, создаем новый экземпляр структуры TarArchive, добавляем названия файлов:

enum TarEntry {
File(String),
}

struct TarArchive {
entries: Vec<TarEntry>,
}

impl TarArchive {
fn new() -> Self {
Self { entries: Vec::new() }
}

fn append_file(&mut self, file: String) {
self.entries.push(TarEntry::File(file));
}
}

В архиве содержатся только записи, подлежащие архивации. Теперь добавляем возможность преобразования его в поток, так архив используется:

impl TarArchive {
...

fn into_stream(self, buffer_size: usize) -> TarStream {
TarStream::new(self.entries, buffer_size)
}
}

struct TarStream {
buffer_size: usize,
entries: VecDeque<TarEntry>,
}

impl TarStream {
fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self {
Self {
buffer_size: buffer_size / 512 * 512,
entries: entries.into(),
}
}
}

Поток почти создан. Недостает лишь самого кода типажа Stream, а реализации типажа требуется определение, что такое элемент потока:

enum TarChunk {
Header(String, Box<[u8; 512]>),
Data(Vec<u8>),
Padding(usize),
}

enum TarError {}
type TarResult<T> = Result<T, TarError>;

impl Stream for TarStream {
type Item = TarResult<TarChunk>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}

Возвращается фрагмент с тремя вариантами:

  • заголовок header с названием файла и 512 байт полезной нагрузки;
  • данные data, то есть срез считанного файла, но согласно документации может содержаться и дополнение;
  • собственно дополнение padding  —  в конце каждого tar-архива всегда отправляется два пустых фрагмента.

Напишем из этой заготовки получателя. Возможно, достаточно вывести на консоль то, что происходит.

Следующим кодом генерируется tar-архив для трех файлов, показывается только ход выполнения в интерактивном режиме без записи архива:

#[tokio::main]
async fn main() {
let mut archive = TarArchive::new();

archive.append_file("enwiki-20230801-pages-meta-history27.xml-p74198591p74500204".to_owned());
archive.append_file("lubuntu-22.04.3-desktop-amd64.iso".to_owned());
archive.append_file("qemu-8.2.1.tar.xz".to_owned());

let mut stream = archive.into_stream(10 * 1024 * 1024);

while let Some(chunk) = stream.next().await {
match chunk {
Ok(TarChunk::Header(path, _)) => println!("\nheader {path}"),
Ok(TarChunk::Data(_)) => print!("."),
Ok(TarChunk::Padding(0)) => println!("\npadding 0"),
Ok(TarChunk::Padding(index)) => println!("padding {index}"),
Err(error) => println!("error: {:?}", error),
}

std::io::stdout().flush().unwrap();
}
}

Вот какой вывод получается на моем компьютере:

header enwiki-20230801-pages-meta-history27.xml-p74198591p74500204
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.............................................................
header lubuntu-22.04.3-desktop-amd64.iso
.........................................................................
.........................................................................
.........................................................................
............................................................
header qemu-8.2.1.tar.xz
.............
padding 0
padding 1

Количество точек представляет собой фрагмент данных каждые 10 Мб, приложение запускается считанные секунды.

Чтобы достигать такого результата в реализации и знать, где мы в конечном автомате, добавим к потоку поле состояния:

struct TarStream {
state: TarState,
...
}

impl TarStream {
pub fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self {
Self {
state: TarState::init(),
...
}
}
}

impl Stream for TarStream {
...

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let self_mut = self.get_mut();

loop {
let mut state = TarState::completed();
mem::swap(&mut state, &mut self_mut.state);

let result = match state {
TarState::Init(state) => state.poll(cx),
TarState::Open(state) => state.poll(cx),
TarState::Header(state) => state.poll(cx),
TarState::Read(state) => state.poll(cx),
TarState::Padding(state) => state.poll(cx),
TarState::Completed(state) => state.poll(cx),
};

let (state, poll) = match result {
TarPollResult::ContinueLooping(state) => (state, None),
TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)),
TarPollResult::NextEntry() => match self_mut.entries.pop_front() {
None => (TarState::padding(), None),
Some(entry) => (TarState::open(self_mut.buffer_size, entry), None),
},
};

self_mut.state = state;

if let Some(poll) = poll {
return poll;
}
}
}
}

Каждым вариантом состояния получается опрос, а возвращается результат опроса, на основе которого принимается решение: продолжить итерирование, вернуть найденный результат опроса или перейти к следующей записи. Каждый раз текущее состояние обновляется.

Но что именно представляет собой опрашивание? С одной стороны, нас опрашивают при реализации асинхронного потока, с другой  —  мы делегируем опрашивание другим опрашиваемым сущностям. Каждой функцией опроса принимается контекст, а возвращается вариант опроса.

В контексте содержится будильщик для возобновления блокированной задачи. Но важнее понять два варианта перечисления опроса: отложенный Pending и готовый Ready.

Отложенным указывается, что процесс опрашивания не завершен. Когда из функции получается этот вариант, нужно не опрашивать тут же заново, но подождать еще. Если получается готовый  —  искомое значение найдено.

Надеюсь, предыдущий листинг стал понятен. А зачем нужен конечный автомат? Представьте: кто-то вызывает функцию опроса, а вы возвращаете отложенный Pending. То есть обещаете уведомить вызывающего, когда можно будет продолжить. Когда это случится, уведомленный снова вызовет функцию опроса, и вам нужно будет определить, где вы с ним остановились. Вы отвечаете за отслеживание его продвижения. Шаблон конечного автомата  —  элегантный способ смоделировать это.

Чтобы обрабатывать создание tar-архива, в определенном конечном автомате содержатся состояния, смоделированные как варианты перечисления:

enum TarState {
Init(TarStateInit),
Open(TarStateOpen),
Header(TarStateHeader),
Read(TarStateRead),
Padding(TarStatePadding),
Completed(TarStateCompleted),
}

У каждого состояния имеется особое значение:

  • Init  —  начало обработки единого файла;
  • в Open хранится информация о начатом процессе открытия файла;
  • в Header содержится информация о начатом процессе считывания метаданных, например длина и разрешения;
  • в Read для каждого фрагмента отслеживается ход считывания из файла;
  • в Padding указывается, какая информация о дополнении уже выведена;
  • Completed  —  заключительный этап генерирования потока.

Принципиальный поток конечного автомата представлен на схеме, в случае какой-либо ошибки сразу переходим в состояние completed:

Разберемся сначала с исходным состоянием:

impl TarStateHandler for TarStateInit {
fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
TarPollResult::NextEntry()
}
}

Исходным состоянием почти ничего не делается, основное его назначение  —  указать потоку на переход к следующей записи. Процесс интерпретации результата, возвращаемого из любого состояния, обрабатывается самим потоком:

let (state, poll) = match result {
TarPollResult::ContinueLooping(state) => (state, None),
TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)),
TarPollResult::NextEntry() => match self_mut.entries.pop_front() {
None => (TarState::padding(), None),
Some(entry) => (TarState::open(self_mut.buffer_size, entry), None),
},
};

В данном случае в результате выполнения NextEntry выбор следующей записи осуществляется из вектора. Если ничего не найдено, начинаем дополнение padding. В противном случае переходим в состояние open с вновь выбранной записью.

Состояние open чуть сложнее. Открытие файла  —  это операция, чреватая блокированием. Однако частью операционных систем она, похоже, поддерживается не в полной мере, и в Tokio стандартный вызов каким-то образом искусственно делается асинхронным:

pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let std = asyncify(|| StdFile::open(path)).await?;

Ok(File::from_std(std))
}

Что это значит для нас? Трудности при опрашивании футуры, ведь для этого ее нужно сохранить как фиксированный и упакованный динамический объект:

struct TarStateOpen {
buffer_size: usize,
task: Pin<Box<dyn Future<Output = Result<(String, File), std::io::Error>> + Send>>,
}

impl TarStateOpen {
fn new(buffer_size: usize, entry: TarEntry) -> Self {
let task = async move {
match entry {
TarEntry::File(path) => match File::open(&path).await {
Ok(file) => Ok((path, file)),
Err(error) => Err(error),
},
}
};

Self {
buffer_size: buffer_size,
task: Box::pin(task),
}
}
}

impl TarStateHandler for TarStateOpen {
fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
let (path, file) = match self.task.as_mut().poll(cx) {
Poll::Pending => return TarState::Open(self).pending(),
Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
Poll::Ready(Ok((path, file))) => (path, file),
};

TarStateHeader::new(self.buffer_size, path, file).poll(cx)
}
}

В конструкторе сохраняется фиксированная ссылка на футуру, возвращаемую из асинхронного блока. Футура не применяется с await, опросим ее позже в функции опроса.

Если мы получаем отложенный Pending, просто возвращаем то же состояние вместе с отложенным Pending результатом потока. Если получаем кортеж, передаем его в следующее состояние. Легко и просто.

По той же причине опросим футуру и в состоянии header. Метаданные извлекаются только блокирующим вызовом, а искусственная async-ификация появилась в Tokio:

pub async fn metadata(&self) -> io::Result<Metadata> {
let std = self.std.clone();
asyncify(move || std.metadata()).await
}

То есть нужно сохранить эту футуру вместе с файлом, полученном в предыдущем состоянии:

struct TarStateHeader {
buffer_size: usize,
path: String,
task: Pin<Box<dyn Future<Output = Result<(File, Metadata), std::io::Error>> + Send>>,
}

impl TarStateHeader {
fn new<'a>(buffer_size: usize, path: String, file: File) -> TarStateHeader {
let task = async move {
match file.metadata().await {
Ok(metadata) => Ok((file, metadata)),
Err(error) => Err(error),
}
};

Self {
path: path,
task: Box::pin(task),
buffer_size: buffer_size,
}
}
}

impl TarStateHandler for TarStateHeader {
fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
let (file, metadata) = match self.task.as_mut().poll(cx) {
Poll::Pending => return TarState::Header(self).pending(),
Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
Poll::Ready(Ok(metadata)) => metadata,
};

let length: u64 = metadata.len();
let header: TarHeader = TarHeader::empty(self.path);

match header.write(&metadata) {
Ok(chunk) => TarState::read(self.buffer_size, file, length).ready(chunk),
Err(error) => TarState::failed(error),
}
}
}

Метаданными заполняется структура tar-заголовка, а файл передается на следующий этап. Как и в предыдущем состоянии, отложенный Pending результат мы возвращаем вместе с самим состоянием и идем дальше.

Как же tar-заголовок создается и преобразуется в фрагмент? Чтобы акцентировать внимание на основных этапах создания заголовка, часть кода я опустил:

struct TarHeader {
path: String,
data: Box<[u8; 512]>,
}

impl TarHeader {
...

fn write(mut self, metadata: &Metadata) -> TarResult<TarChunk> {
let data = &mut self.data;

Self::write_name(data, &self.path)?;
Self::write_mode(data, metadata)?;
Self::write_uid(data, 0)?;
Self::write_gid(data, 0)?;
Self::write_size(data, metadata)?;
Self::write_mtime(data, metadata)?;
Self::write_magic(data)?;
Self::write_type_flag(data)?;
Self::write_chksum(data)?;

Ok(self.into())
}
}

impl Into<TarChunk> for TarHeader {
fn into(self) -> TarChunk {
TarChunk::header(self.path, self.data)
}
}

Наконец, состояние чтения. Пожалуй, самое важное  —  ему уделяется большая часть процессорного времени  —  и самое опрашиваемое:

struct TarStateRead {
buffer_size: usize,
file: File,
left: usize,
completed: usize,
chunk: TarChunk,
offset: usize,
}

impl TarStateRead {
fn new(buffer_size: usize, file: File, length: u64) -> Self {
let left = length as usize / 512;
let available = buffer_size / 512;

let pages = std::cmp::min(available, left);
let pages = pages + if length as usize > 0 { 1 } else { 0 };

Self {
buffer_size: buffer_size,
file: file,
left: length as usize,
completed: 0,
chunk: TarChunk::data(pages),
offset: 0,
}
}

fn advance(self, bytes: usize) -> Self {
Self {
buffer_size: self.buffer_size,
file: self.file,
left: self.left - bytes,
completed: self.completed + bytes,
chunk: self.chunk,
offset: self.offset + bytes,
}
}

fn next(self) -> (TarChunk, Self) {
let left = self.left / 512;
let available = self.buffer_size / 512;

let pages = std::cmp::min(available, left);
let pages = pages + if self.left % 512 > 0 { 1 } else { 0 };

(
self.chunk,
Self {
buffer_size: self.buffer_size,
file: self.file,
left: self.left,
completed: self.completed,
chunk: TarChunk::data(pages),
offset: 0,
},
)
}
}

Здесь еще много полей, которые нужно проконтролировать. В состоянии содержатся считываемый файл, его длина, а также числа, которыми обозначаются текущая позиция, оставшиеся байты или смещения в пределах текущего фрагмента.

Примечателен и текущий фрагмент, еще не полностью заполненный или даже пустой. Определили для него две функции: advance и next. Первой увеличивается смещение в текущем фрагменте, второй создается новое состояние с возвращением предыдущего заполненного фрагмента.

Поскольку считывание  —  важнейшая часть потока, немного кода обработчику состояния тоже потребуется:

impl TarStateHandler for TarStateRead {
fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
let pinned: Pin<&mut File> = Pin::new(&mut self.file);
let data = match self.chunk.offset(self.offset) {
Err(error) => return TarState::failed(error),
Ok(data) => data,
};

let mut buffer: ReadBuf<'_> = ReadBuf::new(data);
match pinned.poll_read(cx, &mut buffer) {
Poll::Pending => return TarState::Read(self).pending(),
Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
_ => (),
}

let read: usize = buffer.filled().len();
let advanced: TarStateRead = self.advance(read);

if advanced.left == 0 {
return TarState::init().ready(advanced.chunk);
}

if advanced.offset == advanced.chunk.len() {
let (chunk, state) = advanced.next();
return TarState::from(TarState::Read(state)).ready(chunk);
}

TarState::from(TarState::Read(advanced)).looping()
}
}

Основной поток аналогичен: когда данные не готовы, возвращаем отложенное Pending состояние, когда данные получаются  —  принимаем решения. Текущий фрагмент заполнен полностью? Считывание файла завершено? Ответы на эти вопросы сказываются на возвращаемом состоянии или потоке, в котором выдается фрагмент.

Что дальше? Padding, одно из простейших состояний с полем для индекса:

struct TarStatePadding {
index: usize,
}

impl TarStatePadding {
fn new() -> Self {
Self { index: 0 }
}

fn next(self) -> Self {
Self { index: self.index + 1 }
}
}

Два таких же отправляются в конце, поэтому обработчик состояний так прост:

impl TarStateHandler for TarStatePadding {
fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
match self.index {
0 => TarState::Padding(self.next()).ready(TarChunk::padding(0)),
index => TarState::completed().ready(TarChunk::padding(index)),
}
}
}

Отправив два дополнения padding, перемещаемся в состояние completed. Просто возвращаем None, указывая конец потока:

impl TarStateHandler for TarStateCompleted {
fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
TarPollResult::ReturnPolling(TarState::completed(), Poll::Ready(None))
}
}

Когда применяется async/await, весь этот код  —  только чуть лаконичнее  —  генерируется компилятором: сопровождаемость кода у него не в приоритете.

Попробуем этим асинхронным потоком загружать в контейнер Docker файлы. Воспользуемся легковесным HTTP-клиентом с сокетами Unix для взаимодействия с API среды выполнения Docker и расширим его возможности.

Сначала интегрируем поток tar-архива с таким же подлежащим реализации асинхронным потоком body из hyper. Вместо того чтобы передавать в hyper 8 Гб данных для запроса PUT, напишем поток фрагментов body:

pub struct TarBody {
inner: TarStream,
}

impl TarBody {
pub fn from(stream: TarStream) -> Self {
Self { inner: stream }
}
}

impl Body for TarBody {
type Data = Bytes;
type Error = DockerError;

fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let self_mut: &mut TarBody = self.get_mut();
let pointer: &mut TarStream = &mut self_mut.inner;
let inner: Pin<&mut TarStream> = Pin::new(pointer);

match inner.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(chunk) => match chunk {
None => Poll::Ready(None),
Some(Err(error)) => Poll::Ready(Some(DockerError::raise_outgoing_archive_failed(error))),
Some(Ok(chunk)) => {
let data: Vec<u8> = chunk.into();
let frame: Frame<Bytes> = Frame::data(Bytes::from(data));

Poll::Ready(Some(Ok(frame)))
}
},
}
}
}

В 33 строках кода мы волшебным образом превратили tar-фрагмент в приемлемый для hyper байтовый фрейм, заодно обработав все ошибки. Интегрируем это в HTTP-клиент:

async fn container_upload(&self, id: &str, path: &str, archive: TarArchive) -> DockerResult<ContainerUpload> {
let url: String = format!("/v1.42/containers/{id}/archive?path={path}");
let connection: DockerConnection<TarBody> = DockerConnection::open(&self.socket).await?;

let stream: TarStream = archive.into_stream(256 * 1024);
let data: TarBody = TarBody::from(stream);

match connection.put(&url, data).await {
Ok(response) => match response.into_bytes().await {
Ok(_) => Ok(ContainerUpload::Succeeded),
Err(error) => Err(error),
},
Err(error) => match error {
DockerError::StatusFailed(url, status, response) => match status.as_u16() {
400 => Ok(ContainerUpload::BadParameter(response.into_error().await?)),
403 => Ok(ContainerUpload::PermissionDenied(response.into_error().await?)),
404 => Ok(ContainerUpload::NoSuchContainer(response.into_error().await?)),
500 => Ok(ContainerUpload::ServerError(response.into_error().await?)),
_ => Err(DockerError::StatusFailed(url, status, response)),
},
error => Err(error),
},
}
}

Принимаемый функцией upload поток tar-архива преобразуется в tar-поток body и передается в функцию put. Идеальная интеграция.

Как узнать, что обошлось без неожиданностей? Создадим контейнер, загрузим файлы, вычислим их хеши, а затем сравним с локальными файлами.

Мы описали очень подробную реализацию асинхронного потока, сделали нетривиальный конечный автомат, проверили в консоли создание tar-архива, а также доказали, что созданный tar полностью распознается в API среды выполнения Docker.

Код  —  здесь.