PySpark CheatSheet
- Initialize Spark session:
from pyspark.sql import SparkSession spark = SparkSession.builder.appName(__file__).getOrCreate()
- Create a DataFrame:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField. StringType, IntegerType columns = [ "programming_language", "users_count" ] data = [("Python", 100000), ("Java", 70000), ("C++", 50000)] rdd = spark.sparkContext.parallelize(data) # Resilient Distributed Dataset df_v1 = rdd.toDF() # Data Frame df_v2 = spark.createDataFrame(data).toDF(*columns) row_data = map(lambda row: Row(*row), data) df_v3 = spark.createDataFrame(row_data, columns) schema = StructType([ StructField(name="programming_language", dataType=StringType(), nullable=True), StructField(name="users_count", dataType=IntegerType(), nullable=True), ]) df_v4 = spark.createDataFrame(data=data, schema=schema)
- View data / structure of DataFrame
df_v1.printSchema() df_v1.show()
- Read files and convert to DataFrame
df_csv = spark.read.csv("/path/to/file.csv") df_txt = spark.read.text("/path/to/file.txt") df_json = spark.read.json("/path/to/file.json") df_parquet = spark.read.parquet("/path/to/file.parquet")
- Cast column to different types
from pyspark.sql.types import IntegerType df_v1 = df.withColumn("age", df.age.cast(IntegerType())) df_v2 = df.withColumn("age", df.age.cast("int")) df_v3 = df.withColumn("age", df.age.cast("integer")) df_v4 = df.select(col("age").cast("int").alias("age")) df.createOrReplaceTempView("my_table") df_v5 = spark.sql("SELECT INT(age) as age FROM my_table")
- Write files
df.write.mode("overwrite").option("header", True).csv("/path/to/folder/which/will/host/the/csv/file") df.write.format("csv").mode("overwrite").option("header", True).save("/path/to/folder/which/will/host/the/csv/file")