Параллелизм данных в своевременном потоке данных

Перевод | Автор оригинала: Frank McSherry

В предыдущем посте был описан изящный алгоритм Нго и др., А затем описано, как можно реализовать его в своевременном потоке данных. Было несколько бит кода Rust, несколько мест, где детали были замалчены, а затем утверждение, что теперь это была реализация аккуратного алгоритма с параллельными данными.

В этом посте мы рассмотрим, как происходит бит параллелизма данных, и подробно рассмотрим некоторые из движущихся частей.

Давайте начнем сверху, с некоторого кода, который мы не видели в предыдущем посте: основного метода:

fn main() {
    let workers = 4; // whatever you like here!
    let mut guards = Vec::new();
    for communicator in ProcessCommunicator::new_vector(workers) {
        guards.push(thread::Builder::new()
                        .name(format!("worker thread {}", communicator.index()))
                        .scoped(move || triangles(communicator))
                        .unwrap());
    }
}

Здесь мы определяем желаемое количество рабочих, просим ProcessCommunicator (кто бы то ни было) создать новый вектор (чего-то), и для каждого элемента этого вектора мы запускаем поток. Теперь элементы этого вектора были предположительно названы коммуникатором, и действительно, каждый из них реализует трэйту коммуникатора. Метод с заданной областью - это то место, где запускается поток, и, как вы можете видеть, единственный ввод, который он принимает, - это один из этих коммуникаторов. Что они могли сделать?

Коммуникатор

Трэйта Communicator обеспечивает единственную точку контакта между работниками в своевременном потоке данных. У него довольно простое определение, давайте посмотрим на него:

pub trait Communicator {
    fn index(&self) -> u64;
    fn peers(&self) -> u64;
    fn new_channel<T:Send+Columnar+Any>(&mut self)
        -> (Vec<Box<Pushable<T>>>, Box<Pullable<T>>);
}

Единственная функциональность, которую коммуникатор предоставляет работникам, - это сообщить им их личность, количество других работников и установить каналы с другими работниками. Метод new_channel возвращает некоторые целевые объекты, в которые рабочий может отправлять типизированные данные (по одному на каждого рабочего), и один источник, из которого рабочий может извлекать данные.

Когда работник извлекает данные из источника, он получает данные, отправленные другими работниками в соответствующие им цели. Ожидается, что рабочие создадут ту же последовательность каналов; все пойдет боком, если нет.

Это все. Если работники хотят общаться другими способами, им нужно создать несколько каналов.

Использование коммуникаторов

Давайте посмотрим на последний фрагмент кода из предыдущего поста, метод треугольников, упомянутый чуть выше, в котором мы определяем вычисление потока данных и запускаем его. Я собираюсь основывать это на коде, который в настоящее время существует в репозитории, который имеет несколько синтаксических отличий от предыдущего поста.

fn triangles<C: Communicator>(communicator: C) {

    // load up our fragment of the graph
    let graph = Rc::new(RefCell::new( /* load up mmap'd file */ ));

    // define extenders for a -> b and  (a, b) -> c, respectively
    let b_extend = vec![&graph.extend_using(|| { |&a| a as u64 } )];
    let c_extend = vec![&graph.extend_using(|| { |&(a,_)| a as u64 }),
                        &graph.extend_using(|| { |&(_,b)| b as u64 })]

    // create a new root dataflow context
    let mut root = GraphRoot::new(communicator);
    let mut input = {
        let mut builder = root.new_subgraph();
        let (input, stream) = builder.new_input();

        // enable, extend, extend
        stream.enable(builder)
              .extend(b_extend)
              .extend(c_extend);

        input  // return the input handle
    };

    // iterate until done
    while root.step() {
        // introduce input
    }
}

Возможно, самое важное, что нужно отметить в этом методе, - это то, что это логика для каждого коммуникатора. Каждый воркер будет вызывать этот метод независимо, поэтому им нужно убедиться, что они действуют согласованно. В основном это сводится к построению каналов в определенном последовательном порядке, а не к тому, чтобы делать что-то еще, пока другие работники ждут вас.

В идеале код не слишком опасный. Первые несколько строк относятся к предыдущему посту, где мы загружаем некоторые графики и определяем «расширители префиксов», используемые для управления вышеупомянутым аккуратным алгоритмом.

Однако есть несколько очень важных линий, поэтому давайте рассмотрим их более внимательно.

Создание корня графа потока данных

Точка, в которой граф потока данных впервые появляется, - это когда мы помещаем наш надежный коммуникатор в GraphRoot. Это важная строка кода:

let mut root = GraphRoot::new(communicator);

На самом деле GraphRoot довольно прост. Это простейший реализатор трейта GraphBuilder, основная роль которого в жизни - возвращать ссылки на коммуникатор, чтобы мы могли создавать каналы.

Есть гораздо более продвинутые и интересные разработчики GraphBuilder. Посмотрим сейчас!

Создание подграфа потока данных

Следующий фрагмент кода создает подграф потока данных.

let mut subgraph = root.new_subgraph();

Метод new_subgraph(), определенный трейтом GraphBuilder, возвращает SubgraphBuilder. Это тот продвинутый и интересный разработчик GraphBuilder, о котором я упоминал выше.

SubgraphBuilder предоставляет доступ к коммуникатору и возможность вызывать new_subgraph, но также имеет нетривиальную реализацию метода add_scope <S: Scope> (scope: S).

Конечно, я уверен, что вы все помните, что такое Scope, верно? (щелкните! сделайте это. боже… щелкните! щелкните!).

Напоминание об областях

Свойство Scope определяет методы, которые необходимо использовать различным компонентам графика своевременного потока данных для обсуждения хода выполнения на графике. Область видимости может объяснить своему родительскому объекту, сколько входов и выходов у нее есть, какие типы сообщений она может планировать отправлять по этим выходам, и, учитывая шанс, может быть, проделать небольшую работу, чтобы добиться прогресса, о чем она может затем сообщить. его родитель.

По ссылке выше вы найдете более подробную информацию и многое другое о протоколе отслеживания прогресса.

Наш первый пример области видимости фактически находится в следующей строке кода:

let (input, stream) = subgraph.new_input();

Метод ввода предоставляется с помощью трэйта расширения в средствах реализации Graph Builder. Давай увидим это.

pub trait InputExtensionTrait<G: GraphBuilder> {
    fn new_input<D:Data>(&mut self) -> (InputHelper<G::Timestamp, D>,
                                        Stream<G::Timestamp, D>);
}

Хорошо, возможно, это было слишком много информации. Мы видим, что new_input действительно возвращает две вещи, и, по-видимому, это InputHelper и Stream, но что это такое, в настоящее время остается полной загадкой. Кроме того, что означает весь этот шум G::Timestamp?

И последнее: G::Timestamp - это связанный тип построителя графов G. Каждый построитель графов имеет связанную временную метку, и все области, которые допускает построитель графиков, должны использовать эту временную метку. И InputHelper, и Stream определены в терминах конкретной общей временной метки, заданной параметром G. Каждый из них также имеет параметр второго типа, D, который представляет собой тип данных, которые будут передаваться входными данными.

Реализация ввода не особо очевидна и не проясняет, поэтому я подведу итоги. Метод выполняет несколько функций, включая создание InputHelper (для отправки данных) и Stream (для подключения потока данных). Важно отметить, что он также создает InputScope, который реализует Scope и дает обещания о том, какие входные временные метки запечатаны. Он добавлен в конструктор подграфов.

Реализация ввода также захватывает канал от коммуникатора и подключает его так, чтобы отправленные данные направлялись непосредственно потребителям Stream. За исключением отслеживания количества отправленных записей, которое требуется InputScope, вся логика находится за реализациями каналов.

Intermission: параллелизм данных

Давайте поразмышляем над тем, где мы находимся. Мы определили некоторую инфраструктуру, которая позволит каждому воркеру отправлять записи в вычисление потока данных, и, если мы будем продолжать в том же духе, обмениваться этими записями с другими воркерами. Хотя большая часть нашего обсуждения здесь звучит так, будто мы просто говорим об одном потоке выполнения, это именно то, что мы хотим.

Одно из основных достоинств программирования с параллельными данными заключается в том, что мы можем определять наши вычисления, как если бы они были однопоточными, зная, что если мы раскрутим произвольное количество копий рабочих процессов и обмениваемся данными между ними соответствующим образом, тот же результат выходит на другом конце.

Этот шаблон программирования действительно имеет ограничения, в основном то, что все коммуникации должны быть явными и что структура коммуникации (сам граф потока данных) определяется заранее, но с учетом этих ограничений нам не нужно слишком много думать о параллелизме. Это действительно хорошие новости.

Добавление еще нескольких областей

Вернемся к работе. Наш рабочий должен определить, что он должен делать с этим входным потоком теперь, когда он у него есть.

На этом этапе мы переходим к немного загадочной детали нашей текущей реализации, которая связана с серьезностью Rust в отношении совместного доступа к изменяемому состоянию. Rust действительно очень серьезно относится к этой теме.

Построитель графа имеет множество изменяемых состояний, и нам нужно четко понимать, когда мы его используем, а когда не используем. Мы явно попросили строителя создать ввод для нас, что довольно ясно. В то же время мы, вероятно, предпочли бы не включать ссылку на построитель в каждый вызов метода, но мы должны знать, кто в настоящее время за это отвечает.

На данный момент у нас есть понятие ActiveStream, который представляет собой информацию о потоке (в основном имя и список слушателей, к которому можно присоединиться) плюс построитель графов. ActiveStream владеет этим построителем графов в смысле Rust, поэтому он знает, что имеет эксклюзивный изменяемый доступ к своим сервисам.

Мы создаем активные потоки из неактивных потоков, вызывая enable с аргументом построителя графов.

stream.enable(builder)
      .extend(b_extend)
      .extend(c_extend);

Вы можете посмотреть на это и сказать: «Эй, строитель ушел?» В данном случае да. Однако вы всегда можете получить его обратно из активного потока; мы просто случайно отбросили результаты в этом случае.

К тому же, это была чужая, а не моя идея: есть общая реализация.

impl<G: GraphBuilder> GraphBuilder for &mut G {
    // lots of "fn func(&mut self) { (**self).func() }"
}

который говорит, что мы можем использовать изменяемую ссылку &mut G везде, где мы бы использовали G: GraphBuilder. Так что мы могли бы заменить builder там на &mut builder, не рискуя потерять его из виду.

Собственно добавление дополнительных областей

Написание расширения несколько раз - это своего рода отговорка как пример дополнительных возможностей. Не так уж и полезно.

В предыдущем посте я пропустил реализацию StreamPrefixExtender, трэйта, обеспечивающего действия в ActiveStream, включая такие вещи, как подсчет количества расширений, предложение расширений и проверка расширений. Давайте рассмотрим одну из этих реализаций.

Я собираюсь использовать предложение как простейшую реализацию. Два других примерно такие же, но с немного большей внутренней логикой. Предлагаемая реализация использует унарный метод расширения, который я написал, обертывая реализацию области с одним входом и одним выходом. Мы перейдем к его реализации через мгновение, но давайте посмотрим его аргументы, чтобы понять, что ему нужно знать.

impl<P, E, G, PE> StreamPrefixExtender<G, P, E> for Rc<RefCell<PE>>
where P: Data+Columnar,
      E: Data+Columnar,
      G: GraphBuilder,
      PE: PrefixExtender<P, E>+'static {
    fn propose(&self, stream: ActiveStream<G, P>) -> ActiveStream<G, (P, Vec<E>)> {
        let clone = self.clone();
        let exch = Exchange::new(|x| hash(x)); // <-- minor lie
        stream.unary(exch, format!("Propose"), move |handle| {
            let extender = clone.borrow();
            while let Some((time, data)) = handle.input.pull() {
                let mut session = handle.output.session(&time);
                for datum in data {  // send each extension
                    session.give(p, extender.propose(&p));
                }
            }
        })
    }

Унарный метод принимает три аргумента: описание того, как нужно обмениваться входными данными (чтобы убедиться, что данные поступают к нужному исполнителю), строка с именем оператора и логика, которая принимает дескриптор, с помощью которого входные данные могут быть прочитаны и выведены. послал.

Если мы задумаемся на мгновение, то на самом деле не так уж много нужно сказать об операторе: *

  1. Какие записи должны идти к какому рабочему.
  2. Что рабочий должен делать с материалами, когда они поступают.

В приведенном выше примере мы просто хотим последовательно направлять префиксы одному и тому же исполнителю, поэтому мы используем хеш префикса. При представлении некоторых префиксов правильнее всего расширить каждый из них и отправить пару префиксов и расширений. Больше нечего сказать о логике.

*: На самом деле есть еще кое-что, о чем нужно знать, а именно об уведомлении, которое является чрезвычайно важной частью своевременного потока данных, но мы еще не дошли до этого. Последующий пост, точно.

Унарная область видимости

Унарный метод скрывает множество шаблонов, поэтому мы можем сосредоточиться на логике оператора потока данных.

Что на самом деле происходит внутри этого метода? Ничего особо сложного. Это около 90 строк кода. Когда вызывается унарный, он использует первый аргумент, exch выше, для преобразования канала коммуникатора в простой интерфейс push / pull. Он регистрирует «выталкивающую» часть этого потока и прикрепляет «вытягивающую» часть к своему объекту-дескриптору. Он также подготавливает ActiveStream для вывода и подключает вывод дескриптора к (подлежащему заполнению) списку заинтересованных слушателей. Наконец, он создает UnaryScope, которому принадлежит дескриптор, который при необходимости будет вызывать для него логику пользователя и сообщать количество записей, полученных и отправленных в его родительскую область.

Это был довольно длинный абзац, так что, может быть, все не так просто. Но это тоже не так уж сложно.

Приводим дела в движение

Последний фрагмент кода выглядит так:

while root.step() {
    // introduce input
}

Это выглядит довольно просто и безвредно, но именно это приводит в движение все вычисления потока данных.

Вызов root.step() рекурсивно просматривает области, спрашивая каждую о достигнутом прогрессе. Здесь каждая область осматривается, говорит: «Привет, у меня есть данные!», Выполняет небольшую работу, а затем сообщает своей родительской области, что она сделала. Возвращаемое значение указывает, осталось ли сделать еще работу.

В нашем случае области начнут перемещать входные записи и начнут предлагать расширения и выполнять все элементы логики, которые мы запрограммировали. Они будут продолжать работать, пока входы остаются открытыми, а операторы сообщают, что они еще не завершены. , а протокол отслеживания прогресса сообщает о неразрешенной работе.

Важно отметить, что весь этот бизнес написан так, чтобы каждый из этих рабочих работал независимо от других. Хотя они неявно координируются через обмен данными и протокол отслеживания прогресса, нам просто нужно включить каждый из них и запустить каждый из них, пока он не остановится.

В идеале эта остановка происходит раньше, когда работает больше сотрудников, но ... ну, я не хочу портить сюрприз.