Developer Blog

Tipps und Tricks für Entwickler und IT-Interessierte

PySpark | Cookbook

Websites

  • The Blaze Ecosystem (Blaze)
  • Dask: Flexible library for parallel computing in Python.
  • DataShape: Data layout language for array programming. 
  • Odo: Shapeshifting for your data It efficiently migrates data from the source to the target through a network of conversions.

Reading Textfiles

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Demo") \
    .config("spark.demo.config.option", "demo-value") \
    .getOrCreate()

df = spark.read.csv("input.csv",header=True,sep="|");
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example') 
sql_sc = SQLContext(sc)

p_df= pd.read_csv('file.csv')  # assuming the file contains a header
# p_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()
sc.textFile("input.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
spark.read.csv(
    "input.csv", header=True, mode="DROPMALFORMED", schema=schema
)
(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("input.csv"))

Read CSV file with known structure

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("input.csv"))