Apache Beam в потоке данных большой боковой ввод

Это больше всего похоже на это вопрос.

Я создаю конвейер в Dataflow 2.x, который принимает потоковый ввод из очереди Pubsub. Каждое поступающее сообщение необходимо передать в потоке через очень большой набор данных, поступающий из Google BigQuery, и к нему должны быть прикреплены все соответствующие значения (на основе ключа) перед записью в базу данных.

Проблема в том, что набор данных сопоставления из BigQuery очень велик - любая попытка использовать его в качестве побочного ввода не удастся, поскольку бегуны потока данных выдают ошибку «java.lang.IllegalArgumentException: ByteString будет слишком длинным». Я пробовал использовать следующие стратегии:

1) Боковой вход

  • Как уже говорилось, данные сопоставления (по-видимому) слишком велики для этого. Если я ошибаюсь или есть обходной путь, дайте мне знать, потому что это было бы самым простым решением.

2) Сопоставление пар ключ-значение

  • В этой стратегии я читаю данные BigQuery и сообщения Pubsub в первой части конвейера, а затем запускаю каждое через преобразования ParDo, которые изменяют каждое значение в парах PCollections на KeyValue. Затем я запускаю преобразование Merge.Flatten и преобразование GroupByKey, чтобы прикрепить соответствующие данные сопоставления к каждому сообщению.
  • Проблема здесь в том, что для потоковой передачи данных требуется объединение окон с другими данными, поэтому мне приходится применять оконное управление и к большим, ограниченным данным BigQuery. Также требуется, чтобы стратегии работы с окнами были одинаковыми для обоих наборов данных. Но никакая оконная стратегия для ограниченных данных не имеет смысла, и несколько попыток работы с окнами, которые я сделал, просто отправляют все данные BQ в одном окне, а затем никогда не отправляют их снова. Его нужно присоединять к каждому входящему сообщению pubsub.

3) Вызов BQ прямо в ParDo (DoFn)

  • Это казалось хорошей идеей - пусть каждый рабочий объявит статический экземпляр данных карты. Если его там нет, вызовите BigQuery напрямую, чтобы получить его. К сожалению, это каждый раз вызывает внутренние ошибки BigQuery (поскольку во всем сообщении просто написано «Внутренняя ошибка»). Подача заявки в службу поддержки Google привела к тому, что они сказали мне, что, по сути, «вы не можете этого сделать».

Кажется, эта задача не подходит для модели «досадно распараллеливаемой», так что я здесь не на то дерево?

РЕДАКТИРОВАТЬ:

Даже при использовании машины с большим объемом памяти в потоке данных и попытке сделать боковой ввод в виде карты я получаю сообщение об ошибке java.lang.IllegalArgumentException: ByteString would be too long

Вот пример (псевдо) кода, который я использую:

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != null) {
                LOG.info("holyWowItWOrked");
                c.output(new TableRow());
            } else {
                LOG.info("noSideInputDataHere");
            }
        }
    }).withSideInputs(mapData));

Конвейер выдает исключение и завершается ошибкой перед регистрацией чего-либо из ParDo.

Трассировки стека:

java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
        com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
        com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

person Taylor    schedule 27.11.2017    source источник
comment
Какой вид SideInput вы используете? Не могли бы вы привести пример того, как вы это использовали?   -  person Ben Chambers    schedule 27.11.2017
comment
Вы рассматривали возможность использования Stateful ParDo? Если бы вы выполняли обработку в глобальном окне, это позволило бы вам сохранить значение из BigQuery в состоянии и использовать его для обработки каждого значения, поступившего из другого потока. Вам нужно будет использовать тот же подход Merge.Flatten, который вы упомянули, поскольку Stateful DoFn работает только с одной коллекцией входных данных.   -  person Ben Chambers    schedule 27.11.2017
comment
Для вашего первого комментария @BenChambers боковой ввод представляет собой большую таблицу сопоставления. Каждая строка содержит ключевую строку, которая может соответствовать данным во входящем сообщении Pubsub. Набор картографических данных меняется каждую неделю, но в настоящее время составляет ~ 40 миллионов строк (около 10 ГБ) и в течение недели полностью статичен и неизменен. Я сейчас смотрю документацию pardo с отслеживанием состояния и смотрю, может ли это быть жизнеспособным ...   -  person Taylor    schedule 28.11.2017
comment
Для боковых входов вы используете View.asSingleton, View.asMap и т. Д.? Например - View.asSingleton возьмет коллекцию PCollection с одним элементом и сделает ее видимой для ParDo. View.asMap возьмет PCollection<KV<K, V>> и сделает его доступным как Map<K, V>, но будет читать только те ключи, которые вам нужны.   -  person Ben Chambers    schedule 28.11.2017
comment
Я использовал View.asIterable, чтобы просмотреть каждую строку, чтобы проверить соответствие   -  person Taylor    schedule 28.11.2017
comment
Использование View.asIterable означает, что для каждого элемента вам необходимо прочитать (потенциально) все 10 ГБ. Это объясняет некоторые проблемы с производительностью. Можно ли использовать View.asMap или View.asMultimap? Для этого потребуется связать каждую строку с ключом поиска, но тогда вы сможете запрашивать эти элементы, не читая все.   -  person Ben Chambers    schedule 29.11.2017
comment
Вы хотите сказать, что я могу использовать статический боковой ввод размером 10 ГБ и вносить только необходимые значения в боковые входные данные для каждого входящего сообщения pubsub?   -  person Taylor    schedule 29.11.2017
comment
Если они запрограммированы, то да. Вы берете PCollection<K, V>, используете View.asMap или View.asMultimap и получаете обратно PCollectionView<Map<K, V>> или PCollectionView<Map<K, Iterable<V>>. Боковой ввод записывается с использованием индексированного формата, поэтому, когда вы делаете context.sideInput(view).get(someKey), ему нужно прочитать только подмножество всех данных бокового ввода.   -  person Ben Chambers    schedule 29.11.2017
comment
Звучит многообещающе. Я попробую это   -  person Taylor    schedule 29.11.2017
comment
Помещение большого бокового ввода в пары KeyValue и создание представления с использованием View.asMap, даже при использовании машин n1-highmem-16, по-прежнему вызывает ошибку java.lang.IllegalArgumentException: ByteString would be too long. Я отредактирую вопрос с подробностями об этой попытке.   -  person Taylor    schedule 30.11.2017
comment
Можете ли вы поделиться трассировкой стека из получаемого исключения IllegalArgumentException? Я хочу понять, где происходит слишком длинная ByteString.   -  person Ben Chambers    schedule 30.11.2017
comment
Есть мысли по этому поводу? У меня был открыт запрос в службу поддержки Google, и я безуспешно отправил электронное письмо разработчикам Apache Beam   -  person Taylor    schedule 07.12.2017
comment
Есть новости об этом? столкнувшись с той же проблемой   -  person tomer.z    schedule 10.11.2018


Ответы (1)


Ознакомьтесь с разделом "Шаблон: большие таблицы поиска в режиме потоковой передачи" в этой статье https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2 (это может быть единственное жизнеспособное решение, поскольку ваш боковой ввод не помещается в память):

Описание:

Большая (в ГБ) таблица поиска должна быть точной и часто изменяется или не умещается в памяти.

Пример:

У вас есть информация о торговой точке от розничного продавца, и вам необходимо связать название позиции продукта с записью данных, содержащей productID. Во внешней базе данных хранятся сотни тысяч элементов, которые могут постоянно меняться. Кроме того, все элементы должны обрабатываться с использованием правильного значения.

Решение:

Используйте атрибут "Вызов внешних служб для обогащения данных ", но вместо вызова микросервиса напрямую вызовите оптимизированную для чтения базу данных NoSQL (такую ​​как Cloud Datastore или Cloud Bigtable).

Для каждого значения, которое нужно найти, создайте пару «ключ-значение», используя служебный класс KV. Сделайте GroupByKey для создания пакетов с одним и тем же типом ключа для вызова базы данных. В DoFn вызовите базу данных для этого ключа, а затем примените значение ко всем значениям, пройдя по итерации. Следуйте передовым методам создания экземпляров клиентов, как описано в разделе «Вызов внешних служб для обогащения данных».

Другие соответствующие шаблоны описаны в этой статье: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1:

  • Шаблон: медленно меняющийся кеш поиска
  • Шаблон: вызов внешних служб для обогащения данных
person medvedev1088    schedule 24.12.2018