Складні типи, I/O та підготовка даних у Spark

Моделі та методи обробки великих даних

Ігор Мірошниченко

КНУ імені Тараса Шевченка, ФІТ

Що попереду

План лекції

flowchart LR
    A["Обробка<br/>null"] --> B["explode та<br/>складні типи"]
    B --> C["Читання<br/>даних"]
    C --> D["Запис<br/>даних"]
    D --> E["JDBC:<br/>бази даних"]
    E --> G["Практика"]
    style A fill:#f9b928,stroke:#333,stroke-width:3px

На минулій лекції ми розглянули основи Spark: створення DataFrame, типи даних, select, filter, withColumn, when/otherwise, рядкові та датні функції, groupBy, joins, SQL.

Сьогодні — нові теми: обробка пропущених значень, розгортання масивів та словників (explode), детальне читання/запис даних із різних джерел, JDBC.

Обробка null-значень

Проблема пропущених даних

employees = spark.createDataFrame([
    (1, "Олена Петренко", "Data Science", 45000, "Київ,Львів", "2022-03-15"),
    (2, "Андрій Коваленко", "Engineering", 52000, "Харків", None),
    (3, "Марія Шевченко", None, 48000, "Одеса,Київ,Дніпро", "2023-01-10"),
    (4, "Петро Бондаренко", "Engineering", None, "Львів", "2020-11-20"),
    (5, "Ірина Мельник", "Marketing", 42000, None, "2021-08-05"),
    (6, "Сергій Ткаченко", None, None, "Київ", None),
], schema=["id", "name", "department", "salary", "cities", "start_date"])

employees.show(truncate=False)
+---+----------------+------------+------+-----------------+----------+
|id |name            |department  |salary|cities           |start_date|
+---+----------------+------------+------+-----------------+----------+
|1  |Олена Петренко  |Data Science|45000 |Київ,Львів       |2022-03-15|
|2  |Андрій Коваленко|Engineering |52000 |Харків           |NULL      |
|3  |Марія Шевченко  |NULL        |48000 |Одеса,Київ,Дніпро|2023-01-10|
|4  |Петро Бондаренко|Engineering |NULL  |Львів            |2020-11-20|
|5  |Ірина Мельник   |Marketing   |42000 |NULL             |2021-08-05|
|6  |Сергій Ткаченко |NULL        |NULL  |Київ             |NULL      |
+---+----------------+------------+------+-----------------+----------+

isNull, isNotNull, isnan

# Рядки з null у department
employees.filter(F.col("department").isNull()).show(truncate=False)
+---+---------------+----------+------+-----------------+----------+
|id |name           |department|salary|cities           |start_date|
+---+---------------+----------+------+-----------------+----------+
|3  |Марія Шевченко |NULL      |48000 |Одеса,Київ,Дніпро|2023-01-10|
|6  |Сергій Ткаченко|NULL      |NULL  |Київ             |NULL      |
+---+---------------+----------+------+-----------------+----------+
# Рядки БЕЗ null у salary
employees.filter(F.col("salary").isNotNull()).show(truncate=False)
+---+----------------+------------+------+-----------------+----------+
|id |name            |department  |salary|cities           |start_date|
+---+----------------+------------+------+-----------------+----------+
|1  |Олена Петренко  |Data Science|45000 |Київ,Львів       |2022-03-15|
|2  |Андрій Коваленко|Engineering |52000 |Харків           |NULL      |
|3  |Марія Шевченко  |NULL        |48000 |Одеса,Київ,Дніпро|2023-01-10|
|5  |Ірина Мельник   |Marketing   |42000 |NULL             |2021-08-05|
+---+----------------+------------+------+-----------------+----------+
  • isNull() — перевірка на null
  • isNotNull() — перевірка на не-null
  • isnan() — перевірка на NaN (тільки для числових стовпців типу float/double)

coalesce: заміна null

employees.select(
    "name",
    F.coalesce(F.col("department"), F.lit("Не визначено")).alias("department"),
    F.coalesce(F.col("salary"), F.lit(0)).alias("salary"),
    F.coalesce(F.col("cities"), F.lit("Невідомо")).alias("cities"),
).show(truncate=False)
+----------------+------------+------+-----------------+
|name            |department  |salary|cities           |
+----------------+------------+------+-----------------+
|Олена Петренко  |Data Science|45000 |Київ,Львів       |
|Андрій Коваленко|Engineering |52000 |Харків           |
|Марія Шевченко  |Не визначено|48000 |Одеса,Київ,Дніпро|
|Петро Бондаренко|Engineering |0     |Львів            |
|Ірина Мельник   |Marketing   |42000 |Невідомо         |
|Сергій Ткаченко |Не визначено|0     |Київ             |
+----------------+------------+------+-----------------+
  • coalesce(col1, col2, ...) — повертає перше не-null значення зі списку
  • Аналог COALESCE у SQL, fill_null у Polars

fillna та dropna

# fillna — заміна null для конкретних стовпців
employees.fillna({
    "department": "Не визначено",
    "salary": 0,
    "start_date": "1970-01-01",
}).show(truncate=False)
+---+----------------+------------+------+-----------------+----------+
|id |name            |department  |salary|cities           |start_date|
+---+----------------+------------+------+-----------------+----------+
|1  |Олена Петренко  |Data Science|45000 |Київ,Львів       |2022-03-15|
|2  |Андрій Коваленко|Engineering |52000 |Харків           |1970-01-01|
|3  |Марія Шевченко  |Не визначено|48000 |Одеса,Київ,Дніпро|2023-01-10|
|4  |Петро Бондаренко|Engineering |0     |Львів            |2020-11-20|
|5  |Ірина Мельник   |Marketing   |42000 |NULL             |2021-08-05|
|6  |Сергій Ткаченко |Не визначено|0     |Київ             |1970-01-01|
+---+----------------+------------+------+-----------------+----------+
# dropna — видалення рядків з null
# how="any" — хоча б один null (за замовчуванням)
# how="all" — усі null
# subset — тільки вказані стовпці
employees.dropna(subset=["department", "salary"]).show(truncate=False)
+---+----------------+------------+------+----------+----------+
|id |name            |department  |salary|cities    |start_date|
+---+----------------+------------+------+----------+----------+
|1  |Олена Петренко  |Data Science|45000 |Київ,Львів|2022-03-15|
|2  |Андрій Коваленко|Engineering |52000 |Харків    |NULL      |
|5  |Ірина Мельник   |Marketing   |42000 |NULL      |2021-08-05|
+---+----------------+------------+------+----------+----------+

Обробка null при when/otherwise

employees.select(
    "name",
    "salary",
    F.when(F.col("salary").isNull(), F.lit("Немає даних"))
     .when(F.col("salary") >= 50000, F.lit("Високий"))
     .when(F.col("salary") >= 40000, F.lit("Середній"))
     .otherwise(F.lit("Базовий"))
     .alias("salary_grade"),
    F.when(
        F.col("salary").isNull() | F.col("salary").isNull(),
        F.lit(0)
    ).otherwise(F.col("salary")).alias("salary_safe"),
).show(truncate=False)
+----------------+------+------------+-----------+
|name            |salary|salary_grade|salary_safe|
+----------------+------+------------+-----------+
|Олена Петренко  |45000 |Середній    |45000      |
|Андрій Коваленко|52000 |Високий     |52000      |
|Марія Шевченко  |48000 |Середній    |48000      |
|Петро Бондаренко|NULL  |Немає даних |0          |
|Ірина Мельник   |42000 |Середній    |42000      |
|Сергій Ткаченко |NULL  |Немає даних |0          |
+----------------+------+------------+-----------+

Попередження

when(col > 50000) для null поверне null, не False! Завжди перевіряйте null першою умовою.

explode та складні типи

Нагадування: складні типи Spark

graph TD
    A["Складні типи"] --> C1["ArrayType<br/>[1, 2, 3]"]
    A --> C2["MapType<br/>{key: value}"]
    A --> C3["StructType<br/>{name: ..., age: ...}"]
    C1 --> E1["explode → окремі рядки"]
    C2 --> E2["explode → key + value"]
    C3 --> E3["col('struct.field')"]
    style A fill:#f9b928,stroke:#333
    style E1 fill:#d9f6ec,stroke:#28a87d
    style E2 fill:#d9f6ec,stroke:#28a87d
    style E3 fill:#d9f6ec,stroke:#28a87d

На минулій лекції ми створювали складні типи. Сьогодні — навчимось їх розгортати та обробляти.

explode: розгортання ArrayType

df_skills = spark.createDataFrame([
    ("Олена", ["Python", "SQL", "Spark"]),
    ("Андрій", ["Scala", "Java"]),
    ("Марія", ["Python", "R", "SQL", "Tableau"]),
], schema=StructType([
    StructField("name", StringType()),
    StructField("skills", ArrayType(StringType())),
]))

df_skills.select(
    "name",
    F.explode("skills").alias("skill"),
).show(truncate=False)
+------+-------+
|name  |skill  |
+------+-------+
|Олена |Python |
|Олена |SQL    |
|Олена |Spark  |
|Андрій|Scala  |
|Андрій|Java   |
|Марія |Python |
|Марія |R      |
|Марія |SQL    |
|Марія |Tableau|
+------+-------+

flowchart LR
    A["Олена: [Python, SQL, Spark]"] --> B["Олена — Python<br/>Олена — SQL<br/>Олена — Spark"]
    style A fill:#f9b928,stroke:#333
    style B fill:#d9f6ec,stroke:#28a87d

explode_outer vs explode

df_with_empty = spark.createDataFrame([
    ("Олена", ["Python", "SQL"]),
    ("Андрій", []),
    ("Марія", None),
], schema=StructType([
    StructField("name", StringType()),
    StructField("skills", ArrayType(StringType())),
]))

explode — видаляє порожні:

df_with_empty.select(
    "name", F.explode("skills").alias("skill"),
).show()
+-----+------+
| name| skill|
+-----+------+
|Олена|Python|
|Олена|   SQL|
+-----+------+

explode_outer — зберігає з null:

df_with_empty.select(
    "name", F.explode_outer("skills").alias("skill"),
).show()
+------+------+
|  name| skill|
+------+------+
| Олена|Python|
| Олена|   SQL|
|Андрій|  NULL|
| Марія|  NULL|
+------+------+

split + explode: нормалізація даних

# Comma-separated cities → окремі рядки
employees.filter(F.col("cities").isNotNull()).select(
    "id",
    "name",
    F.explode(F.split("cities", ",")).alias("city"),
    "salary",
).show(truncate=False)
+---+----------------+------+------+
|id |name            |city  |salary|
+---+----------------+------+------+
|1  |Олена Петренко  |Київ  |45000 |
|1  |Олена Петренко  |Львів |45000 |
|2  |Андрій Коваленко|Харків|52000 |
|3  |Марія Шевченко  |Одеса |48000 |
|3  |Марія Шевченко  |Київ  |48000 |
|3  |Марія Шевченко  |Дніпро|48000 |
|4  |Петро Бондаренко|Львів |NULL  |
|6  |Сергій Ткаченко |Київ  |NULL  |
+---+----------------+------+------+
# Повний патерн з explode_outer (зберігає рядки без міст)
employees.select(
    "id", "name",
    F.explode_outer(F.split("cities", ",")).alias("city"),
    "salary",
)

Порада

split()ArrayTypeexplode() → окремі рядки — стандартний патерн для нормалізації comma-separated значень.

explode для MapType

students = spark.createDataFrame([
    ("Олена", {"math": 95, "code": 90, "stat": 88}),
    ("Андрій", {"math": 78, "code": 95}),
    ("Марія", {"math": 92, "code": 85, "stat": 91}),
], schema=StructType([
    StructField("name", StringType()),
    StructField("scores", MapType(StringType(), IntegerType())),
]))

students.select(
    "name",
    F.explode("scores").alias("subject", "score"),
).show(truncate=False)
+------+-------+-----+
|name  |subject|score|
+------+-------+-----+
|Олена |stat   |88   |
|Олена |code   |90   |
|Олена |math   |95   |
|Андрій|code   |95   |
|Андрій|math   |78   |
|Марія |stat   |91   |
|Марія |code   |85   |
|Марія |math   |92   |
+------+-------+-----+
  • Для MapType explode створює два стовпці: key та value
  • .alias("subject", "score") — перейменування обох одразу

Доступ до MapType та ArrayType

students.select(
    "name",
    F.col("scores")["math"].alias("math"),
    F.col("scores")["code"].alias("code"),
    F.map_keys("scores").alias("subjects"),
    F.map_values("scores").alias("values"),
    F.size("scores").alias("num_subjects"),
).show(truncate=False)
+------+----+----+------------------+------------+------------+
|name  |math|code|subjects          |values      |num_subjects|
+------+----+----+------------------+------------+------------+
|Олена |95  |90  |[stat, code, math]|[88, 90, 95]|3           |
|Андрій|78  |95  |[code, math]      |[95, 78]    |2           |
|Марія |92  |85  |[stat, code, math]|[91, 85, 92]|3           |
+------+----+----+------------------+------------+------------+
Функція Опис
col("map")["key"] Значення за ключем
map_keys() Масив усіх ключів
map_values() Масив усіх значень
size() Кількість елементів (Map або Array)
col("array")[idx] Елемент масиву за індексом
array_contains() Чи містить масив значення

Доступ до StructType

orders = spark.createDataFrame([
    (1, "Олена", ("Ноутбук", 1, 25000.0)),
    (2, "Андрій", ("Телефон", 2, 12000.0)),
    (3, "Марія", ("Планшет", 1, 15000.0)),
], schema=StructType([
    StructField("order_id", IntegerType()),
    StructField("customer", StringType()),
    StructField("item", StructType([
        StructField("product", StringType()),
        StructField("quantity", IntegerType()),
        StructField("price", DoubleType()),
    ])),
]))
# Доступ через крапку
orders.select(
    "order_id",
    "customer",
    F.col("item.product").alias("product"),
    F.col("item.quantity").alias("qty"),
    F.col("item.price").alias("price"),
    (F.col("item.quantity") * F.col("item.price")).alias("total"),
).show(truncate=False)
+--------+--------+-------+---+-------+-------+
|order_id|customer|product|qty|price  |total  |
+--------+--------+-------+---+-------+-------+
|1       |Олена   |Ноутбук|1  |25000.0|25000.0|
|2       |Андрій  |Телефон|2  |12000.0|24000.0|
|3       |Марія   |Планшет|1  |15000.0|15000.0|
+--------+--------+-------+---+-------+-------+
# Розгортання всіх полів структури
orders.select("order_id", "customer", "item.*").show(truncate=False)
+--------+--------+-------+--------+-------+
|order_id|customer|product|quantity|price  |
+--------+--------+-------+--------+-------+
|1       |Олена   |Ноутбук|1       |25000.0|
|2       |Андрій  |Телефон|2       |12000.0|
|3       |Марія   |Планшет|1       |15000.0|
+--------+--------+-------+--------+-------+

Вкладені структури з масивами

complex_orders = spark.createDataFrame([
    (1, "Олена", [("Ноутбук", 1, 25000.0), ("Мишка", 1, 500.0)]),
    (2, "Андрій", [("Телефон", 2, 12000.0)]),
], schema=StructType([
    StructField("order_id", IntegerType()),
    StructField("customer", StringType()),
    StructField("items", ArrayType(StructType([
        StructField("product", StringType()),
        StructField("quantity", IntegerType()),
        StructField("price", DoubleType()),
    ]))),
]))

# explode масиву структур → плоска таблиця
complex_orders.select(
    "order_id",
    "customer",
    F.explode("items").alias("item"),
).select(
    "order_id",
    "customer",
    "item.product",
    "item.quantity",
    "item.price",
    (F.col("item.quantity") * F.col("item.price")).alias("line_total"),
).show(truncate=False)
+--------+--------+-------+--------+-------+----------+
|order_id|customer|product|quantity|price  |line_total|
+--------+--------+-------+--------+-------+----------+
|1       |Олена   |Ноутбук|1       |25000.0|25000.0   |
|1       |Олена   |Мишка  |1       |500.0  |500.0     |
|2       |Андрій  |Телефон|2       |12000.0|24000.0   |
+--------+--------+-------+--------+-------+----------+

Читання даних: детально

CSV: повний набір опцій

Уявімо CSV-файл data.csv з роздільником ; та значенням N/A для пропусків:

name;department;salary;start_date
Олена Петренко;Data Science;45000;15.03.2022
Андрій Коваленко;Engineering;52000;01.07.2021
Марія Шевченко;;48000;10.01.2023
Петро Бондаренко;Engineering;N/A;20.11.2020
# CSV з нестандартним роздільником, null-значеннями, датами
df_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("sep", ";")               # роздільник — крапка з комою
    .option("encoding", "UTF-8")      # кодування
    .option("nullValue", "N/A")       # "N/A" → null
    .option("inferSchema", True)      # автовизначення типів (повільно!)
    .load("data.csv")
)
# +------------------+-----------+------+----------+
# |              name| department|salary|start_date|
# +------------------+-----------+------+----------+
# |  Олена Петренко  |Data Scien…| 45000|15.03.2022|
# |Андрій Коваленко  |Engineering| 52000|01.07.2021|
# |  Марія Шевченко  |       NULL| 48000|10.01.2023|
# |Петро Бондаренко  |Engineering|  NULL|20.11.2020|
# +------------------+-----------+------+----------+

CSV: явна схема та дати

csv_schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("department", StringType()),
    StructField("salary", IntegerType()),
    StructField("start_date", StringType()),
])

df_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("sep", ";")
    .option("nullValue", "N/A")
    .schema(csv_schema)               # явна схема — без inferSchema!
    .load("data.csv")
)

# Конвертація дати з українського формату
df_csv = df_csv.withColumn(
    "start_date",
    F.to_date("start_date", "dd.MM.yyyy")
)
# root
#  |-- name: string (nullable = false)
#  |-- department: string (nullable = true)
#  |-- salary: integer (nullable = true)
#  |-- start_date: date (nullable = true)

Порада

Явна схема через .schema()швидше (без зайвого проходу) і надійніше (контроль типів).

Повний довідник опцій CSV

Опція Опис Значення за замовчуванням
header Перший рядок — заголовки False
sep Роздільник стовпців ,
encoding Кодування файлу UTF-8
inferSchema Автовизначення типів False
nullValue Рядок, що інтерпретується як null ""
nanValue Рядок для NaN NaN
dateFormat Формат дати yyyy-MM-dd
timestampFormat Формат timestamp yyyy-MM-dd'T'HH:mm:ss
multiLine Значення на кількох рядках False
quote Символ лапок "
escape Символ екранування \
mode Поведінка при помилках: PERMISSIVE, DROPMALFORMED, FAILFAST PERMISSIVE

Читання JSON: вкладені структури

Уявімо файл orders.json (JSON Lines — один об’єкт на рядок):

{"order_id": 1, "customer": {"name": "Олена", "city": "Київ"}, "items": [{"product": "Ноутбук", "price": 25000}, {"product": "Мишка", "price": 500}]}
{"order_id": 2, "customer": {"name": "Андрій", "city": "Львів"}, "items": [{"product": "Телефон", "price": 12000}]}
df_json = spark.read.json("orders.json")
df_json.printSchema()
# root
#  |-- customer: struct
#  |    |-- city: string
#  |    |-- name: string
#  |-- items: array
#  |    |-- element: struct
#  |    |    |-- price: long
#  |    |    |-- product: string
#  |-- order_id: long
# Розгортання вкладеної структури
df_json.select(
    "order_id",
    F.col("customer.name").alias("customer_name"),
    F.col("customer.city").alias("city"),
    F.explode("items").alias("item"),
).select(
    "order_id", "customer_name", "city",
    F.col("item.product"), F.col("item.price"),
).show(truncate=False)
# +--------+-------------+-----+--------+-----+
# |order_id|customer_name| city| product|price|
# +--------+-------------+-----+--------+-----+
# |       1|        Олена| Київ| Ноутбук|25000|
# |       1|        Олена| Київ|   Мишка|  500|
# |       2|       Андрій|Львів| Телефон|12000|
# +--------+-------------+-----+--------+-----+

Читання з S3

# Налаштування конекту до S3
spark = (
    SparkSession.builder
    .appName("S3 Reader")
    .config("spark.hadoop.fs.s3a.access.key", "YOUR_ACCESS_KEY")
    .config("spark.hadoop.fs.s3a.secret.key", "YOUR_SECRET_KEY")
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
    .getOrCreate()
)

# Читання Parquet з S3
df = spark.read.parquet("s3a://my-bucket/data/events.parquet")

# Читання всіх файлів у папці
df = spark.read.parquet("s3a://my-bucket/data/events/")

# Читання з партиціюванням (Spark автоматично розпізнає partition columns)
df = spark.read.parquet("s3a://my-bucket/data/events/year=2024/month=01/")

Попередження

Протокол s3a:// (не s3:// або s3n://) — сучасний конектор для великих файлів.

JDBC: читання з бази даних

# Базове читання з PostgreSQL
URL = "jdbc:postgresql://localhost:5432/mydb"

df = (
    spark.read
    .format("jdbc")
    .option("url", URL)
    .option("dbtable", "orders")
    .option("user", "postgres")
    .option("password", "secret")
    .option("driver", "org.postgresql.Driver")
    .load()
)
# SQL-запит замість таблиці — фільтрація на стороні бази
df = (
    spark.read
    .format("jdbc")
    .option("url", URL)
    .option("dbtable", "(SELECT * FROM orders WHERE status = 'active') AS t")
    .option("user", "postgres")
    .option("password", "secret")
    .load()
)

Паралельне читання JDBC

# Паралельне читання — набагато швидше для великих таблиць
df = (
    spark.read
    .format("jdbc")
    .option("url", URL)
    .option("dbtable", "orders")
    .option("user", "postgres")
    .option("password", "secret")
    .option("partitionColumn", "order_id")   # числовий стовпець
    .option("lowerBound", "1")                # мінімальне значення
    .option("upperBound", "1000000")          # максимальне значення
    .option("numPartitions", "8")             # паралельних з'єднань
    .option("fetchsize", "10000")             # рядків за один запит
    .load()
)

flowchart LR
    DB["PostgreSQL<br/>1M рядків"] --> P1["Partition 1<br/>id: 1 — 125K"]
    DB --> P2["Partition 2<br/>id: 125K — 250K"]
    DB --> P3["..."]
    DB --> P8["Partition 8<br/>id: 875K — 1M"]
    P1 --> S["Spark DataFrame<br/>(8 партицій)"]
    P2 --> S
    P3 --> S
    P8 --> S
    style DB fill:#f9b928,stroke:#333
    style S fill:#d9f6ec,stroke:#28a87d

Запис даних

Режими запису

df = spark.createDataFrame([
    ("Олена", "DS", 45000),
    ("Андрій", "Eng", 52000),
    ("Марія", "DS", 48000),
], schema=["name", "department", "salary"])

# overwrite — перезаписати (видаляє існуючі дані!)
df.write.mode("overwrite").parquet("output/employees")

# append — додати до існуючих
df.write.mode("append").parquet("output/employees")

# ignore — нічого не робити, якщо дані існують
df.write.mode("ignore").parquet("output/employees")

# error (за замовчуванням) — помилка, якщо дані існують
df.write.mode("error").parquet("output/employees")
Режим Поведінка Коли використовувати
overwrite Видаляє старі, записує нові Повне оновлення таблиці
append Додає до існуючих Інкрементальне завантаження
ignore Нічого не робить, якщо існують Ідемпотентні ETL
error Помилка, якщо існують За замовчуванням (безпечний)

Контроль кількості файлів та партиціювання

# Контроль кількості вихідних файлів
df.coalesce(1).write.mode("overwrite").parquet("output/one_file/")     # 1 файл
df.repartition(4).write.mode("overwrite").parquet("output/balanced/")  # 4 файли

# Розподіл по папках за значеннями стовпця (Partition Pruning при читанні)
df.write.mode("overwrite").partitionBy("city").parquet("output/sales/")
# → output/sales/city=Київ/part-00000.parquet
# → output/sales/city=Львів/part-00000.parquet
coalesce(n) repartition(n) partitionBy("col")
Що робить Зменшує кількість партицій Змінює кількість партицій Розподіляє по папках
Shuffle Ні Так Залежить
Коли Менше файлів при записі Рівномірний розподіл Фільтрація при читанні

Порада

Детально про repartition vs coalesce, партиціювання та проблему маленьких файлів — на наступній лекції.

Запис у JDBC

# Запис у PostgreSQL
URL = "jdbc:postgresql://localhost:5432/mydb"

(
    df.write
    .format("jdbc")
    .option("url", URL)
    .option("dbtable", "employees")
    .option("user", "postgres")
    .option("password", "secret")
    .option("driver", "org.postgresql.Driver")
    .option("batchsize", "10000")       # рядків в одному INSERT
    .option("numPartitions", "4")       # паралельних з'єднань
    .mode("overwrite")
    .save()
)
Параметр Опис Рекомендація
batchsize Рядків за один INSERT 5000–10000
numPartitions Паралельних з’єднань для запису 2–8
truncate Очистити таблицю (TRUNCATE) перед записом True (швидше за DROP+CREATE)

Порада

mode("overwrite") + option("truncate", True) — зберігає індекси та обмеження таблиці.

Практика: ETL-пайплайн

Задача: обробка замовлень

raw_orders = spark.createDataFrame([
    (1, "Олена Петренко", "15.03.2024", "Ноутбук:25000,Мишка:500", "Київ", "Нова Пошта"),
    (2, "Андрій Коваленко", "22.03.2024", "Телефон:12000", None, "Укрпошта"),
    (3, "Марія Шевченко", "01.04.2024", "Планшет:15000,Чохол:300,Плівка:100", "Одеса", None),
    (4, "Петро Бондаренко", None, "Навушники:2000", "Львів", "Нова Пошта"),
    (5, "Ірина Мельник", "10.04.2024", "Ноутбук:30000,Сумка:1200", "Київ", "Самовивіз"),
], schema=["order_id", "customer", "order_date", "items_str", "city", "delivery"])

raw_orders.show(truncate=False)
+--------+----------------+----------+----------------------------------+-----+----------+
|order_id|customer        |order_date|items_str                         |city |delivery  |
+--------+----------------+----------+----------------------------------+-----+----------+
|1       |Олена Петренко  |15.03.2024|Ноутбук:25000,Мишка:500           |Київ |Нова Пошта|
|2       |Андрій Коваленко|22.03.2024|Телефон:12000                     |NULL |Укрпошта  |
|3       |Марія Шевченко  |01.04.2024|Планшет:15000,Чохол:300,Плівка:100|Одеса|NULL      |
|4       |Петро Бондаренко|NULL      |Навушники:2000                    |Львів|Нова Пошта|
|5       |Ірина Мельник   |10.04.2024|Ноутбук:30000,Сумка:1200          |Київ |Самовивіз |
+--------+----------------+----------+----------------------------------+-----+----------+

ETL: обробка null та дат

cleaned = (
    raw_orders
    .withColumn("city", F.coalesce(F.col("city"), F.lit("Невідомо")))
    .withColumn("delivery", F.coalesce(F.col("delivery"), F.lit("Не вказано")))
    .withColumn("order_date", F.to_date("order_date", "dd.MM.yyyy"))
    .withColumn("order_date", F.coalesce(F.col("order_date"), F.current_date()))
)
cleaned.show(truncate=False)
+--------+----------------+----------+----------------------------------+--------+----------+
|order_id|customer        |order_date|items_str                         |city    |delivery  |
+--------+----------------+----------+----------------------------------+--------+----------+
|1       |Олена Петренко  |2024-03-15|Ноутбук:25000,Мишка:500           |Київ    |Нова Пошта|
|2       |Андрій Коваленко|2024-03-22|Телефон:12000                     |Невідомо|Укрпошта  |
|3       |Марія Шевченко  |2024-04-01|Планшет:15000,Чохол:300,Плівка:100|Одеса   |Не вказано|
|4       |Петро Бондаренко|2026-04-01|Навушники:2000                    |Львів   |Нова Пошта|
|5       |Ірина Мельник   |2024-04-10|Ноутбук:30000,Сумка:1200          |Київ    |Самовивіз |
+--------+----------------+----------+----------------------------------+--------+----------+

ETL: розгортання товарів

# "Ноутбук:25000,Мишка:500" → окремі рядки з назвою та ціною
items_flat = (
    cleaned
    .select(
        "order_id", "customer", "order_date", "city", "delivery",
        F.explode(F.split("items_str", ",")).alias("item_raw"),
    )
    .withColumn("product", F.split("item_raw", ":")[0])
    .withColumn("price", F.split("item_raw", ":")[1].cast("double"))
    .drop("item_raw")
)
items_flat.show(truncate=False)
+--------+----------------+----------+--------+----------+---------+-------+
|order_id|customer        |order_date|city    |delivery  |product  |price  |
+--------+----------------+----------+--------+----------+---------+-------+
|1       |Олена Петренко  |2024-03-15|Київ    |Нова Пошта|Ноутбук  |25000.0|
|1       |Олена Петренко  |2024-03-15|Київ    |Нова Пошта|Мишка    |500.0  |
|2       |Андрій Коваленко|2024-03-22|Невідомо|Укрпошта  |Телефон  |12000.0|
|3       |Марія Шевченко  |2024-04-01|Одеса   |Не вказано|Планшет  |15000.0|
|3       |Марія Шевченко  |2024-04-01|Одеса   |Не вказано|Чохол    |300.0  |
|3       |Марія Шевченко  |2024-04-01|Одеса   |Не вказано|Плівка   |100.0  |
|4       |Петро Бондаренко|2026-04-01|Львів   |Нова Пошта|Навушники|2000.0 |
|5       |Ірина Мельник   |2024-04-10|Київ    |Самовивіз |Ноутбук  |30000.0|
|5       |Ірина Мельник   |2024-04-10|Київ    |Самовивіз |Сумка    |1200.0 |
+--------+----------------+----------+--------+----------+---------+-------+

ETL: аналітика та запис

# Статистика по містах
items_flat.groupBy("city").agg(
    F.count("*").alias("items_sold"),
    F.round(F.sum("price"), 2).alias("total_revenue"),
    F.round(F.avg("price"), 2).alias("avg_price"),
    F.countDistinct("order_id").alias("orders"),
).orderBy(F.desc("total_revenue")).show(truncate=False)
+--------+----------+-------------+---------+------+
|city    |items_sold|total_revenue|avg_price|orders|
+--------+----------+-------------+---------+------+
|Київ    |4         |56700.0      |14175.0  |2     |
|Одеса   |3         |15400.0      |5133.33  |1     |
|Невідомо|1         |12000.0      |12000.0  |1     |
|Львів   |1         |2000.0       |2000.0   |1     |
+--------+----------+-------------+---------+------+
# Топ товарів
items_flat.groupBy("product").agg(
    F.count("*").alias("times_ordered"),
    F.round(F.sum("price"), 2).alias("total_revenue"),
).orderBy(F.desc("total_revenue")).show()
+---------+-------------+-------------+
|  product|times_ordered|total_revenue|
+---------+-------------+-------------+
|  Ноутбук|            2|      55000.0|
|  Планшет|            1|      15000.0|
|  Телефон|            1|      12000.0|
|Навушники|            1|       2000.0|
|    Сумка|            1|       1200.0|
|    Мишка|            1|        500.0|
|    Чохол|            1|        300.0|
|   Плівка|            1|        100.0|
+---------+-------------+-------------+
# Запис результатів
items_flat.write.mode("overwrite").partitionBy("city").parquet("output/orders_clean")

Підсумок

Що ми вивчили

  1. Обробка null: isNull, coalesce, fillna, dropna
  2. explode для ArrayType та MapType
  3. explode_outer для збереження null-рядків
  4. split + explode: нормалізація даних
  5. Доступ до StructType через крапку
  1. CSV: повний набір опцій
  2. JSON: вкладені структури
  3. S3: протокол s3a://
  4. JDBC: читання та паралельне читання
  5. Запис: режими, partitionBy, JDBC

Ресурси

Домашнє завдання

  1. Створіть DataFrame замовлень (>10 рядків) з полями: order_id, customer_name, order_date (рядок у форматі dd.MM.yyyy), items (comma-separated: “товар:ціна”), city, delivery_method — з кількома null-значеннями
  2. Null: обробіть пропущені значення через coalesce, fillna, dropna
  3. explode: нормалізуйте items через split + explode, витягніть назву та ціну товару
  4. Агрегації: загальна сума по містах, середній чек по днях, топ-3 товари за виручкою
  5. Дати: конвертуйте з dd.MM.yyyy, обчисліть кількість днів від замовлення до сьогодні
  6. when/otherwise: створіть рівень замовлення (>10000 — “VIP”, >5000 — “Стандарт”, інше — “Базовий”)
  7. Запишіть результат у Parquet з partitionBy("city")
  8. Побудуйте 2+ візуалізації

Дякую за увагу!



Матеріали курсу

ihor.miroshnychenko@knu.ua

Data Mirosh

@ihormiroshnychenko

@aranaur