PySpark Streaming: преобразование потоковых данных для анализа в реальном времени
вступление
В PySpark DStreams (дискретизированные потоки) — это фундаментальная абстракция, предоставляемая Spark Streaming для обработки потоков данных в реальном времени. DStreams представляет собой последовательность RDD (устойчивых распределенных наборов данных), где каждый RDD содержит данные за определенный интервал времени.
В PySpark доступны различные преобразования для DStream, которые позволяют вам манипулировать и обрабатывать потоковые данные. Вот некоторые из часто используемых преобразований:
карта (функция):
Применяет данную функцию к каждому элементу DStream и возвращает новый DStream, состоящий из преобразованных элементов.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Define the transformation function def transform_text(text): # Convert each line to uppercase return text.upper() # Apply the map transformation to convert the lines to uppercase transformed_lines = lines.map(transform_text) # Print the transformed lines transformed_lines.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
В этом примере мы сначала создаем StreamingContext
с пакетным интервалом в 1 секунду. Затем мы создаем DStream lines
из потока TCP на локальном хосте с портом 9999.
Затем мы определяем функцию преобразования transform_text()
, которая преобразует каждую строку в верхний регистр. Преобразование map()
затем применяется к lines
DStream с использованием функции transform_text()
в качестве функции сопоставления.
Наконец, мы используем pprint()
для вывода преобразованных строк на консоль. Контекст потоковой передачи запускается с помощью ssc.start()
, а ssc.awaitTermination()
используется для поддержания работы контекста потоковой передачи до тех пор, пока он не будет явно остановлен.
Вы можете запустить этот сценарий и предоставить входные данные на порту localhost:9999, чтобы увидеть преобразованные строки в верхнем регистре, напечатанные потоковым приложением.
плоская карта (функция):
Подобно преобразованию map
, но каждый входной элемент может быть сопоставлен с нулем или более выходными элементами. Он применяет функцию к каждому элементу и возвращает новый DStream со сглаженными результатами.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Define the transformation function def split_words(line): # Split each line into words return line.split() # Apply the flatMap transformation to split lines into words words = lines.flatMap(split_words) # Print the words words.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы определяем функцию преобразования split_words()
, которая разбивает каждую строку на слова. Преобразование flatMap()
затем применяется к lines
DStream с использованием функции split_words()
в качестве функции сопоставления.
фильтр(функция):
Фильтрует элементы DStream на основе заданной функции. Он возвращает новый DStream только с элементами, удовлетворяющими условию.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Define the filter condition function def filter_condition(line): # Filter lines that contain the word "spark" return "spark" in line.lower() # Apply the filter transformation to filter lines filtered_lines = lines.filter(filter_condition) # Print the filtered lines filtered_lines.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы определяем функцию условия фильтра filter_condition()
, которая проверяет, присутствует ли слово «искра» в каждой строке. Затем к lines
DStream применяется преобразование filter()
с использованием функции filter_condition()
в качестве функции фильтрации.
союз (другой поток):
Возвращает новый DStream, содержащий объединение элементов из текущего DStream и otherStream
.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create two DStreams from TCP streams (localhost:9999 and localhost:8888) lines1 = ssc.socketTextStream("localhost", 9999) lines2 = ssc.socketTextStream("localhost", 8888) # Apply the union transformation to combine the two DStreams combined_lines = lines1.union(lines2) # Print the combined lines combined_lines.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы применяем преобразование union()
для объединения двух DStream lines1
и lines2
в один DStream combined_lines
.
передел():
Хотя PySpark Streaming не обеспечивает прямого преобразования repartition()
, вы все равно можете добиться повторного разбиения на разделы в PySpark Streaming с помощью преобразования transform()
.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Define the repartition function def repartition_rdd(rdd): # Repartition the RDD into 4 partitions return rdd.repartition(4) # Apply the transform transformation to repartition the RDD repartitioned_lines = lines.transform(repartition_rdd) # Print the number of partitions def print_partitions(rdd): num_partitions = rdd.getNumPartitions() print(f"Number of partitions: {num_partitions}") repartitioned_lines.foreachRDD(print_partitions) # Start the streaming context ssc.start() ssc.awaitTermination()
мы определяем функцию repartition_rdd()
, которая использует метод repartition()
в RDD для перераспределения его на 4 раздела. Затем мы применяем преобразование transform()
к DStream lines
, используя функцию repartition_rdd()
.
Чтобы проверить количество разделов в перераспределенном СДР, мы определяем отдельную функцию print_partitions()
, которая печатает количество разделов в СДР. Мы используем foreachRDD()
для применения этой функции к каждому RDD в repartitioned_lines
DStream.
считать():
Возвращает новый DStream, где каждый элемент является числом элементов в соответствующем RDD исходного DStream.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Apply the count transformation to count the number of lines line_count = lines.count() # Print the line count line_count.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы применяем преобразование count()
к DStream lines
для подсчета количества строк в каждом пакете.
уменьшать():
Возвратите новый DStream из одноэлементных RDD, объединив элементы в каждом RDD исходного DStream с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть ассоциативной и коммутативной, чтобы ее можно было вычислять параллельно.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Apply the reduce transformation to compute the sum of numbers sum_of_numbers = lines.reduce(lambda x, y: int(x) + int(y)) # Print the sum of numbers sum_of_numbers.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы применяем преобразование reduce()
к DStream lines
для вычисления суммы чисел. В этом случае лямбда-функция (lambda x, y: int(x) + int(y))
используется для преобразования каждого элемента в целое число и выполнения операции суммирования.
уменьшить по ключу (функция):
Применяет функцию сокращения к элементам каждого ключа в DStream. Он возвращает новый DStream с уменьшенными значениями.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split()) # Create key-value pairs with word as the key and count as the value word_counts = words.map(lambda word: (word, 1)) # Apply the reduceByKey transformation to get the count of each word word_count_totals = word_counts.reduceByKey(lambda x, y: x + y) # Print the word count totals word_count_totals.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Каждую строку мы разбиваем на слова с помощью преобразования flatMap()
. Затем мы создаем пары ключ-значение, где слово является ключом, а счетчик изначально устанавливается равным 1 с помощью преобразования map()
.
Наконец, мы применяем преобразование reduceByKey()
к word_counts
DStream, чтобы вычислить общее количество каждого слова. Лямбда-функция (lambda x, y: x + y)
используется для суммирования значений для каждого ключа.
countByValue():
При вызове DStream элементов типа K возвращайте новый DStream пар (K, Long), где значением каждого ключа является его частота в каждом RDD исходного DStream.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Apply the countByValue transformation to count the occurrences of each value value_counts = lines.countByValue() # Print the value counts value_counts.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы применяем преобразование countByValue()
к DStream lines
. Это преобразование подсчитывает количество вхождений каждого уникального значения в потоке.
присоединиться():
При вызове двух потоков DStream из пар (K, V) и (K, W) возвращает новый поток DStream из пар (K, (V, W)) со всеми парами элементов для каждого ключа.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create the first DStream from a TCP stream (localhost:9999) lines1 = ssc.socketTextStream("localhost", 9999) # Create the second DStream from a TCP stream (localhost:8888) lines2 = ssc.socketTextStream("localhost", 8888) # Create key-value pairs in the first DStream key_value_pairs1 = lines1.map(lambda line: line.split()).map(lambda words: (words[0], words[1])) # Create key-value pairs in the second DStream key_value_pairs2 = lines2.map(lambda line: line.split()).map(lambda words: (words[0], words[2])) # Join the two DStreams joined_stream = key_value_pairs1.join(key_value_pairs2) # Print the joined stream joined_stream.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы выполняем необходимые преобразования для создания пар ключ-значение в каждом DStream. В этом случае мы разбиваем каждую строку на слова и сопоставляем их с парами ключ-значение.
Наконец, мы применяем преобразование join()
к key_value_pairs1
DStream, объединяя его с key_value_pairs2
DStream. Полученный joined_stream
будет содержать объединенные записи на основе совпадающих ключей.
когруппа():
При вызове DStream из пар (K, V) и (K, W) возвращает новый DStream из (K, Seq[V], Seq[W]) кортежей.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create the first DStream from a TCP stream (localhost:9999) lines1 = ssc.socketTextStream("localhost", 9999) # Create the second DStream from a TCP stream (localhost:8888) lines2 = ssc.socketTextStream("localhost", 8888) # Create key-value pairs in the first DStream key_value_pairs1 = lines1.map(lambda line: line.split()).map(lambda words: (words[0], words[1])) # Create key-value pairs in the second DStream key_value_pairs2 = lines2.map(lambda line: line.split()).map(lambda words: (words[0], words[2])) # Cogroup the two DStreams cogrouped_stream = key_value_pairs1.cogroup(key_value_pairs2) # Print the cogrouped stream cogrouped_stream.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы выполняем необходимые преобразования для создания пар ключ-значение в каждом DStream. В этом случае мы разбиваем каждую строку на слова и сопоставляем их с парами ключ-значение.
Наконец, мы применяем преобразование cogroup()
к key_value_pairs1
DStream, объединяя его с key_value_pairs2
DStream. Полученный cogrouped_stream
будет содержать сгруппированные записи на основе совпадающих ключей. Каждая запись в сгруппированном потоке будет кортежем из ключа и двух последовательностей, содержащих все значения из каждого DStream.
преобразование (функция):
Позволяет применять к DStream произвольные преобразования RDD-RDD. Это преобразование можно использовать для доступа к базовым RDD DStream и применения любой операции RDD.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Define a transformation function to convert the lines to uppercase def to_uppercase(rdd): return rdd.map(lambda x: x.upper()) # Apply the transform transformation to convert the lines to uppercase uppercase_lines = lines.transform(to_uppercase) # Print the uppercase lines uppercase_lines.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
Мы определяем функцию преобразования to_uppercase
, которая принимает RDD в качестве входных данных и преобразует каждый элемент в верхний регистр, используя преобразование map()
.
Наконец, мы применяем преобразование transform()
к lines
DStream, передавая функцию to_uppercase
в качестве аргумента. Это преобразует каждый RDD в DStream, применяя функцию для преобразования строк в верхний регистр.
updateStateByKey (функция):
Возвращает новый DStream «состояния», в котором состояние для каждого ключа обновляется путем применения данной функции к предыдущему состоянию ключа и новым значениям для ключа. Это можно использовать для хранения произвольных данных состояния для каждого ключа.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with batch interval of 1 second sc = SparkContext("local[*]", "StreamingExample") ssc = StreamingContext(sc, 1) # Enable checkpointing for stateful operations ssc.checkpoint("checkpoint_dir") # Create a DStream from a TCP stream (localhost:9999) lines = ssc.socketTextStream("localhost", 9999) # Parse the lines and create key-value pairs key_value_pairs = lines.flatMap(lambda line: line.split()).map(lambda word: (word, 1)) # Define the update function to accumulate the word counts def update_word_count(new_values, running_count): if running_count is None: running_count = 0 return sum(new_values, running_count) # Apply the updateStateByKey transformation to update the word counts word_counts = key_value_pairs.updateStateByKey(update_word_count) # Print the word counts word_counts.pprint() # Start the streaming context ssc.start() ssc.awaitTermination()
В этом примере мы создаем StreamingContext
с пакетным интервалом в 1 секунду. Мы также включаем контрольные точки, указав каталог контрольных точек с помощью ssc.checkpoint("checkpoint_dir")
. Контрольные точки необходимы для операций с отслеживанием состояния, таких как updateStateByKey()
.
Мы определяем функцию обновления update_word_count
, которая принимает новые значения (количество для текущей партии) и текущий счетчик (накопленный счет для предыдущих партий). Функция суммирует новые значения с текущим счетчиком, чтобы обновить общий счетчик.
Наконец, мы применяем преобразование updateStateByKey()
к key_value_pairs
DStream, передавая функцию update_word_count
в качестве аргумента. Это преобразование обновляет количество слов, сохраняя состояние на основе ключа.
Заключение
PySpark Streaming предоставляет мощную платформу для обработки потоковых данных в реальном времени с помощью механизма Spark. Мы изучили различные преобразования, которые можно применить к DStreams, такие как map()
, flatMap()
, filter()
, repartition()
, union()
, count()
, reduce()
, countByValue()
, reduceByKey()
, join()
, cogroup()
, transform()
и updateStateByKey()
.