Pyspark snippets#

Misc#

Extracting a single batch from a streaming DataFrame as a DataFrame#

Useful for debugging purposes.

from typing import Optional
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.streaming.query import StreamingQuery


def capture_single_microbatch(
    spark: SparkSession,
    stream_df: DataFrame,
) -> Optional[DataFrame]:
    """Capture a single microbatch and return it as a standard DataFrame.

    Args:
        spark: The active Spark session
        stream_df: The streaming DataFrame to capture from

    Returns:
        Optional[DataFrame]: The captured microbatch as a standard DataFrame, 
                           or None if no data was available
    """

    captured_batch = None

    def capture_batch(micro_batch_df: DataFrame, batch_id: int) -> None:
        nonlocal captured_batch
        if micro_batch_df.count() > 0:
            # Cache the DataFrame to avoid recomputation
            captured_batch = micro_batch_df.cache()

    # Process exactly one microbatch
    query = (
        stream_df.writeStream
        .trigger(once=True)
        .foreachBatch(capture_batch)
        .outputMode("append")
        .start()
    )

    # Wait for completion
    query.awaitTermination()

    return captured_batch

my_df = capture_single_microbatch(
    spark=spark, 
    stream_df=stream_df,
)

Local Pytest#

When using pytest routines with PySpark, you may encounter a situation where the pyspark interpreter don’t properly recognize python installation.

This can be fixed by integrating (temporarly) a environment variable in the __init__.py located at the root of your testing folder:

import os
os.environ[
    "PYSPARK_PYTHON"
] = r"C:\micromamba\.micromamba\envs\weather-pipelines\python.exe"