Bogdan's Docs

Table of Contents

Ideas

  • Artificial Intelligence
    • Bond Pricing
    • Kubernetes Inference
    • Knowledge Bot
    • Ready to Use Models
    • Satellite Damage Detection

Curs IoT si AI (ro) Resources

  • Links
    • Artificial Intelligence
  • Recipes
    • MacOS
Bogdan's Docs
Docs ยป cheat-sheets:pyspark

PySpark CheatSheet

  • Initialize Spark session:
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName(__file__).getOrCreate()
  • Create a DataFrame:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField. StringType, IntegerType
 
columns = [ "programming_language", "users_count" ]
data = [("Python", 100000), ("Java", 70000), ("C++", 50000)]
 
rdd = spark.sparkContext.parallelize(data) # Resilient Distributed Dataset
df_v1 = rdd.toDF() # Data Frame
 
df_v2 = spark.createDataFrame(data).toDF(*columns)
 
row_data = map(lambda row: Row(*row), data)
df_v3 = spark.createDataFrame(row_data, columns)
 
schema = StructType([
  StructField(name="programming_language", dataType=StringType(), nullable=True),
  StructField(name="users_count", dataType=IntegerType(), nullable=True),
])
df_v4 = spark.createDataFrame(data=data, schema=schema)
  • View data / structure of DataFrame
df_v1.printSchema()
df_v1.show()
  • Read files and convert to DataFrame
df_csv = spark.read.csv("/path/to/file.csv")
df_txt = spark.read.text("/path/to/file.txt")
df_json = spark.read.json("/path/to/file.json")
df_parquet = spark.read.parquet("/path/to/file.parquet")
  • Cast column to different types
from pyspark.sql.types import IntegerType
df_v1 = df.withColumn("age", df.age.cast(IntegerType()))
df_v2 = df.withColumn("age", df.age.cast("int"))
df_v3 = df.withColumn("age", df.age.cast("integer"))
 
df_v4 = df.select(col("age").cast("int").alias("age"))
 
df.createOrReplaceTempView("my_table")
df_v5 = spark.sql("SELECT INT(age) as age FROM my_table")
  • Write files
df.write.mode("overwrite").option("header", True).csv("/path/to/folder/which/will/host/the/csv/file")
df.write.format("csv").mode("overwrite").option("header", True).save("/path/to/folder/which/will/host/the/csv/file")
Previous Next