PySpark Streaming: преобразование потоковых данных для анализа в реальном времени

вступление

В PySpark DStreams (дискретизированные потоки) — это фундаментальная абстракция, предоставляемая Spark Streaming для обработки потоков данных в реальном времени. DStreams представляет собой последовательность RDD (устойчивых распределенных наборов данных), где каждый RDD содержит данные за определенный интервал времени.

В PySpark доступны различные преобразования для DStream, которые позволяют вам манипулировать и обрабатывать потоковые данные. Вот некоторые из часто используемых преобразований:

карта (функция):

Применяет данную функцию к каждому элементу DStream и возвращает новый DStream, состоящий из преобразованных элементов.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Define the transformation function
def transform_text(text):
    # Convert each line to uppercase
    return text.upper()

# Apply the map transformation to convert the lines to uppercase
transformed_lines = lines.map(transform_text)

# Print the transformed lines
transformed_lines.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

В этом примере мы сначала создаем StreamingContext с пакетным интервалом в 1 секунду. Затем мы создаем DStream lines из потока TCP на локальном хосте с портом 9999.

Затем мы определяем функцию преобразования transform_text(), которая преобразует каждую строку в верхний регистр. Преобразование map() затем применяется к lines DStream с использованием функции transform_text() в качестве функции сопоставления.

Наконец, мы используем pprint() для вывода преобразованных строк на консоль. Контекст потоковой передачи запускается с помощью ssc.start(), а ssc.awaitTermination() используется для поддержания работы контекста потоковой передачи до тех пор, пока он не будет явно остановлен.

Вы можете запустить этот сценарий и предоставить входные данные на порту localhost:9999, чтобы увидеть преобразованные строки в верхнем регистре, напечатанные потоковым приложением.

плоская карта (функция):

Подобно преобразованию map, но каждый входной элемент может быть сопоставлен с нулем или более выходными элементами. Он применяет функцию к каждому элементу и возвращает новый DStream со сглаженными результатами.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Define the transformation function
def split_words(line):
    # Split each line into words
    return line.split()

# Apply the flatMap transformation to split lines into words
words = lines.flatMap(split_words)

# Print the words
words.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы определяем функцию преобразования split_words(), которая разбивает каждую строку на слова. Преобразование flatMap() затем применяется к lines DStream с использованием функции split_words() в качестве функции сопоставления.

фильтр(функция):

Фильтрует элементы DStream на основе заданной функции. Он возвращает новый DStream только с элементами, удовлетворяющими условию.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Define the filter condition function
def filter_condition(line):
    # Filter lines that contain the word "spark"
    return "spark" in line.lower()

# Apply the filter transformation to filter lines
filtered_lines = lines.filter(filter_condition)

# Print the filtered lines
filtered_lines.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы определяем функцию условия фильтра filter_condition(), которая проверяет, присутствует ли слово «искра» в каждой строке. Затем к lines DStream применяется преобразование filter() с использованием функции filter_condition() в качестве функции фильтрации.

союз (другой поток):

Возвращает новый DStream, содержащий объединение элементов из текущего DStream и otherStream.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create two DStreams from TCP streams (localhost:9999 and localhost:8888)
lines1 = ssc.socketTextStream("localhost", 9999)
lines2 = ssc.socketTextStream("localhost", 8888)

# Apply the union transformation to combine the two DStreams
combined_lines = lines1.union(lines2)

# Print the combined lines
combined_lines.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы применяем преобразование union() для объединения двух DStream lines1 и lines2 в один DStream combined_lines.

передел():

Хотя PySpark Streaming не обеспечивает прямого преобразования repartition(), вы все равно можете добиться повторного разбиения на разделы в PySpark Streaming с помощью преобразования transform().

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Define the repartition function
def repartition_rdd(rdd):
    # Repartition the RDD into 4 partitions
    return rdd.repartition(4)

# Apply the transform transformation to repartition the RDD
repartitioned_lines = lines.transform(repartition_rdd)

# Print the number of partitions
def print_partitions(rdd):
    num_partitions = rdd.getNumPartitions()
    print(f"Number of partitions: {num_partitions}")

repartitioned_lines.foreachRDD(print_partitions)

# Start the streaming context
ssc.start()
ssc.awaitTermination()

мы определяем функцию repartition_rdd(), которая использует метод repartition() в RDD для перераспределения его на 4 раздела. Затем мы применяем преобразование transform() к DStream lines, используя функцию repartition_rdd().

Чтобы проверить количество разделов в перераспределенном СДР, мы определяем отдельную функцию print_partitions(), которая печатает количество разделов в СДР. Мы используем foreachRDD() для применения этой функции к каждому RDD в repartitioned_lines DStream.

считать():

Возвращает новый DStream, где каждый элемент является числом элементов в соответствующем RDD исходного DStream.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Apply the count transformation to count the number of lines
line_count = lines.count()

# Print the line count
line_count.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы применяем преобразование count() к DStream lines для подсчета количества строк в каждом пакете.

уменьшать():

Возвратите новый DStream из одноэлементных RDD, объединив элементы в каждом RDD исходного DStream с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть ассоциативной и коммутативной, чтобы ее можно было вычислять параллельно.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Apply the reduce transformation to compute the sum of numbers
sum_of_numbers = lines.reduce(lambda x, y: int(x) + int(y))

# Print the sum of numbers
sum_of_numbers.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы применяем преобразование reduce() к DStream lines для вычисления суммы чисел. В этом случае лямбда-функция (lambda x, y: int(x) + int(y)) используется для преобразования каждого элемента в целое число и выполнения операции суммирования.

уменьшить по ключу (функция):

Применяет функцию сокращения к элементам каждого ключа в DStream. Он возвращает новый DStream с уменьшенными значениями.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split())

# Create key-value pairs with word as the key and count as the value
word_counts = words.map(lambda word: (word, 1))

# Apply the reduceByKey transformation to get the count of each word
word_count_totals = word_counts.reduceByKey(lambda x, y: x + y)

# Print the word count totals
word_count_totals.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Каждую строку мы разбиваем на слова с помощью преобразования flatMap(). Затем мы создаем пары ключ-значение, где слово является ключом, а счетчик изначально устанавливается равным 1 с помощью преобразования map().

Наконец, мы применяем преобразование reduceByKey() к word_counts DStream, чтобы вычислить общее количество каждого слова. Лямбда-функция (lambda x, y: x + y) используется для суммирования значений для каждого ключа.

countByValue():

При вызове DStream элементов типа K возвращайте новый DStream пар (K, Long), где значением каждого ключа является его частота в каждом RDD исходного DStream.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Apply the countByValue transformation to count the occurrences of each value
value_counts = lines.countByValue()

# Print the value counts
value_counts.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы применяем преобразование countByValue() к DStream lines. Это преобразование подсчитывает количество вхождений каждого уникального значения в потоке.

присоединиться():

При вызове двух потоков DStream из пар (K, V) и (K, W) возвращает новый поток DStream из пар (K, (V, W)) со всеми парами элементов для каждого ключа.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create the first DStream from a TCP stream (localhost:9999)
lines1 = ssc.socketTextStream("localhost", 9999)

# Create the second DStream from a TCP stream (localhost:8888)
lines2 = ssc.socketTextStream("localhost", 8888)

# Create key-value pairs in the first DStream
key_value_pairs1 = lines1.map(lambda line: line.split()).map(lambda words: (words[0], words[1]))

# Create key-value pairs in the second DStream
key_value_pairs2 = lines2.map(lambda line: line.split()).map(lambda words: (words[0], words[2]))

# Join the two DStreams
joined_stream = key_value_pairs1.join(key_value_pairs2)

# Print the joined stream
joined_stream.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы выполняем необходимые преобразования для создания пар ключ-значение в каждом DStream. В этом случае мы разбиваем каждую строку на слова и сопоставляем их с парами ключ-значение.

Наконец, мы применяем преобразование join() к key_value_pairs1 DStream, объединяя его с key_value_pairs2 DStream. Полученный joined_stream будет содержать объединенные записи на основе совпадающих ключей.

когруппа():

При вызове DStream из пар (K, V) и (K, W) возвращает новый DStream из (K, Seq[V], Seq[W]) кортежей.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create the first DStream from a TCP stream (localhost:9999)
lines1 = ssc.socketTextStream("localhost", 9999)

# Create the second DStream from a TCP stream (localhost:8888)
lines2 = ssc.socketTextStream("localhost", 8888)

# Create key-value pairs in the first DStream
key_value_pairs1 = lines1.map(lambda line: line.split()).map(lambda words: (words[0], words[1]))

# Create key-value pairs in the second DStream
key_value_pairs2 = lines2.map(lambda line: line.split()).map(lambda words: (words[0], words[2]))

# Cogroup the two DStreams
cogrouped_stream = key_value_pairs1.cogroup(key_value_pairs2)

# Print the cogrouped stream
cogrouped_stream.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы выполняем необходимые преобразования для создания пар ключ-значение в каждом DStream. В этом случае мы разбиваем каждую строку на слова и сопоставляем их с парами ключ-значение.

Наконец, мы применяем преобразование cogroup() к key_value_pairs1 DStream, объединяя его с key_value_pairs2 DStream. Полученный cogrouped_stream будет содержать сгруппированные записи на основе совпадающих ключей. Каждая запись в сгруппированном потоке будет кортежем из ключа и двух последовательностей, содержащих все значения из каждого DStream.

преобразование (функция):

Позволяет применять к DStream произвольные преобразования RDD-RDD. Это преобразование можно использовать для доступа к базовым RDD DStream и применения любой операции RDD.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Define a transformation function to convert the lines to uppercase
def to_uppercase(rdd):
    return rdd.map(lambda x: x.upper())

# Apply the transform transformation to convert the lines to uppercase
uppercase_lines = lines.transform(to_uppercase)

# Print the uppercase lines
uppercase_lines.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

Мы определяем функцию преобразования to_uppercase, которая принимает RDD в качестве входных данных и преобразует каждый элемент в верхний регистр, используя преобразование map().

Наконец, мы применяем преобразование transform() к lines DStream, передавая функцию to_uppercase в качестве аргумента. Это преобразует каждый RDD в DStream, применяя функцию для преобразования строк в верхний регистр.

updateStateByKey (функция):

Возвращает новый DStream «состояния», в котором состояние для каждого ключа обновляется путем применения данной функции к предыдущему состоянию ключа и новым значениям для ключа. Это можно использовать для хранения произвольных данных состояния для каждого ключа.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with batch interval of 1 second
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 1)

# Enable checkpointing for stateful operations
ssc.checkpoint("checkpoint_dir")

# Create a DStream from a TCP stream (localhost:9999)
lines = ssc.socketTextStream("localhost", 9999)

# Parse the lines and create key-value pairs
key_value_pairs = lines.flatMap(lambda line: line.split()).map(lambda word: (word, 1))

# Define the update function to accumulate the word counts
def update_word_count(new_values, running_count):
    if running_count is None:
        running_count = 0
    return sum(new_values, running_count)

# Apply the updateStateByKey transformation to update the word counts
word_counts = key_value_pairs.updateStateByKey(update_word_count)

# Print the word counts
word_counts.pprint()

# Start the streaming context
ssc.start()
ssc.awaitTermination()

В этом примере мы создаем StreamingContext с пакетным интервалом в 1 секунду. Мы также включаем контрольные точки, указав каталог контрольных точек с помощью ssc.checkpoint("checkpoint_dir"). Контрольные точки необходимы для операций с отслеживанием состояния, таких как updateStateByKey().

Мы определяем функцию обновления update_word_count, которая принимает новые значения (количество для текущей партии) и текущий счетчик (накопленный счет для предыдущих партий). Функция суммирует новые значения с текущим счетчиком, чтобы обновить общий счетчик.

Наконец, мы применяем преобразование updateStateByKey() к key_value_pairs DStream, передавая функцию update_word_count в качестве аргумента. Это преобразование обновляет количество слов, сохраняя состояние на основе ключа.

Заключение

PySpark Streaming предоставляет мощную платформу для обработки потоковых данных в реальном времени с помощью механизма Spark. Мы изучили различные преобразования, которые можно применить к DStreams, такие как map(), flatMap(), filter(), repartition(), union(), count(), reduce(), countByValue(), reduceByKey(), join(), cogroup(), transform() и updateStateByKey().

Мои опубликованные библиотеки PySpark

PySpark — Основы SQL

PySpark — встроенные функции

PySpark — API DataFrame

PySpark — Стриминг