Это больше всего похоже на это вопрос.
Я создаю конвейер в Dataflow 2.x, который принимает потоковый ввод из очереди Pubsub. Каждое поступающее сообщение необходимо передать в потоке через очень большой набор данных, поступающий из Google BigQuery, и к нему должны быть прикреплены все соответствующие значения (на основе ключа) перед записью в базу данных.
Проблема в том, что набор данных сопоставления из BigQuery очень велик - любая попытка использовать его в качестве побочного ввода не удастся, поскольку бегуны потока данных выдают ошибку «java.lang.IllegalArgumentException: ByteString будет слишком длинным». Я пробовал использовать следующие стратегии:
1) Боковой вход
- Как уже говорилось, данные сопоставления (по-видимому) слишком велики для этого. Если я ошибаюсь или есть обходной путь, дайте мне знать, потому что это было бы самым простым решением.
2) Сопоставление пар ключ-значение
- В этой стратегии я читаю данные BigQuery и сообщения Pubsub в первой части конвейера, а затем запускаю каждое через преобразования ParDo, которые изменяют каждое значение в парах PCollections на KeyValue. Затем я запускаю преобразование Merge.Flatten и преобразование GroupByKey, чтобы прикрепить соответствующие данные сопоставления к каждому сообщению.
- Проблема здесь в том, что для потоковой передачи данных требуется объединение окон с другими данными, поэтому мне приходится применять оконное управление и к большим, ограниченным данным BigQuery. Также требуется, чтобы стратегии работы с окнами были одинаковыми для обоих наборов данных. Но никакая оконная стратегия для ограниченных данных не имеет смысла, и несколько попыток работы с окнами, которые я сделал, просто отправляют все данные BQ в одном окне, а затем никогда не отправляют их снова. Его нужно присоединять к каждому входящему сообщению pubsub.
3) Вызов BQ прямо в ParDo (DoFn)
- Это казалось хорошей идеей - пусть каждый рабочий объявит статический экземпляр данных карты. Если его там нет, вызовите BigQuery напрямую, чтобы получить его. К сожалению, это каждый раз вызывает внутренние ошибки BigQuery (поскольку во всем сообщении просто написано «Внутренняя ошибка»). Подача заявки в службу поддержки Google привела к тому, что они сказали мне, что, по сути, «вы не можете этого сделать».
Кажется, эта задача не подходит для модели «досадно распараллеливаемой», так что я здесь не на то дерево?
РЕДАКТИРОВАТЬ:
Даже при использовании машины с большим объемом памяти в потоке данных и попытке сделать боковой ввод в виде карты я получаю сообщение об ошибке java.lang.IllegalArgumentException: ByteString would be too long
Вот пример (псевдо) кода, который я использую:
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != null) {
LOG.info("holyWowItWOrked");
c.output(new TableRow());
} else {
LOG.info("noSideInputDataHere");
}
}
}).withSideInputs(mapData));
Конвейер выдает исключение и завершается ошибкой перед регистрацией чего-либо из ParDo
.
Трассировки стека:
java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
View.asSingleton
,View.asMap
и т. Д.? Например -View.asSingleton
возьмет коллекцию PCollection с одним элементом и сделает ее видимой для ParDo.View.asMap
возьметPCollection<KV<K, V>>
и сделает его доступным какMap<K, V>
, но будет читать только те ключи, которые вам нужны. - person Ben Chambers   schedule 28.11.2017View.asIterable
, чтобы просмотреть каждую строку, чтобы проверить соответствие - person Taylor   schedule 28.11.2017View.asIterable
означает, что для каждого элемента вам необходимо прочитать (потенциально) все 10 ГБ. Это объясняет некоторые проблемы с производительностью. Можно ли использоватьView.asMap
илиView.asMultimap
? Для этого потребуется связать каждую строку с ключом поиска, но тогда вы сможете запрашивать эти элементы, не читая все. - person Ben Chambers   schedule 29.11.2017PCollection<K, V>
, используетеView.asMap
илиView.asMultimap
и получаете обратноPCollectionView<Map<K, V>>
илиPCollectionView<Map<K, Iterable<V>>
. Боковой ввод записывается с использованием индексированного формата, поэтому, когда вы делаетеcontext.sideInput(view).get(someKey)
, ему нужно прочитать только подмножество всех данных бокового ввода. - person Ben Chambers   schedule 29.11.2017View.asMap
, даже при использовании машин n1-highmem-16, по-прежнему вызывает ошибкуjava.lang.IllegalArgumentException: ByteString would be too long
. Я отредактирую вопрос с подробностями об этой попытке. - person Taylor   schedule 30.11.2017