Introducere în Spark
Descarcă și instalează Java Runtime Environment (JRE) de la Oracle de pe site-ul oficial. Alege versiunea pe 64 de biți deoarece aceasta este cea mai sigură alegere.
Pentru a testa dacă a fost instalată cu succes, folosește comanda:
java -version
Descarcă Spark și Hadoop de pe site-ul oficial Apache Software Foundation. Se recomandă versiunea 3.2.1 pentru Spark și versiunea 2.7 pentru Hadoop, pe care le poți obține ambele dintr-un singură arhivă făcând clic aici.
Dacă utilizezi Microsoft Windows, descarcă instrumentele winutils din repozitoriul oficial. După ce descarci întreaga arhivă și o deschizi, copiază directorul hadoop-2.7.1 din arhivă în directorul spark și redenumește-l doar în hadoop, astfel încât să ai directorul hadoop/bin în interiorul directorului spark.
Copiază totul din spark/hadoop/bin/ în spark/bin și înlocuiește orice fișiere dacă este necesar.
Configurează următoarele variabile de mediu:
SPARK_HOMEcu valoarea către directorulspark.HADOOP_HOMEcu valoarea%SPARK_HOME%/hadoop.JAVA_HOMEcu valoareaC:\Program Files\Java\jre1.8.0_321.PYSPARK_PYTHONcu valoareapython.
Pentru a testa dacă pyspark a fost instalat cu succes, folosește comanda:
winutils ls </code > <code python> from pyspark.sql import SparkSession spark = SparkSession.builder.appName('My Amazing App').getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField(name = 'col_a', dataType = StringType(), nullable = True), StructField('col_b', IntegerType(), True), StructField('col_c', StringType(), True), ]) data = [ ('a', 1, 'a-1'), ('b', 2, 'b-2'), ('c', 3, 'c-3'), ]
rdd = spark.sparkContext.parallelize(data) # resilient distributed dataset df = spark.createDataFrame(rdd, schema) df.show()
df.createOrReplaceTempView('my_table') res_df = spark.sql('select col_a from my_table') res_df.show()
pandas_df = df.toPandas() pandas_df
pandas_df.to_csv('df.csv', index = False)
read_df = spark.read.option('header', 'true').csv('df.csv') read_df.show()
read_df.repartition(1).write.option('header', 'true').csv('folder_df')
read_folder_df = spark.read.option('header', 'true').csv('folder_df') read_folder_df.show()