Nothing Special   »   [go: up one dir, main page]

5 Spark Low Level-8

Download as pdf or txt
Download as pdf or txt
You are on page 1of 84

Big Data

Programation en Spark

Algassimou diallo

November 10, 2023


Transparents inspirés de Gianluca Quercini
Introduction
Big Data
Introduction

Introduction
• Il existe deux options pour écrire une application Spark :
• Programmation de bas niveau, en utilisant des opérations sur une
structure de données de bas niveau appelée Resilient Distributed
Dataset (RDD).
• Programmation de haut niveau, en utilisant des bibliothèques de haut
niveau, telles que SparkSQL et Structured Streaming.

Algassimou Diallo 3
Big Data
Introduction

Programation Spart bas niveau


• Un programme Spark utilise un objet appelé SparkContext.
• SparkContext représente une connexion à un cluster.
• Initialisation du SparkContext

from pyspark import SparkCon f, SparkContext


conf = SparkConf().setMaster(..).setAppName(..)
sc = SparkContext(conf = conf)

Algassimou Diallo 4
Big Data
Introduction

• Un programme Spark est une séquence d’opérations invoquées sur le


SparkContext (sc).
• Ces opérations manipulent un type spécial de structure de données,
appelé
Resilient Distributed Dataset (RDD) (ensemble de données distribuées
résilientes)

Algassimou Diallo 4
Big Data
Introduction

Resilient Distributed Dataset (RDD


• Un ensemble de données distribuées résilient, ou simplement RDD, est
une collection d’objets distribués immuables, immuable et distribuée
d’objets
• Les données de chaque RDD sont réparties entre plusieurs partitions.
• Chaque partition réside sur un nœud du cluster.
• Deux partitions peuvent résider sur le même nœud

Algassimou Diallo 5
Big Data
Introduction

Algassimou Diallo 5
Big Data
Introduction

Algassimou Diallo 5
Big Data
Introduction

Création d’une RDD


• A partir d’une collection en memoire (une liste, un set)

sc.parallelize([1, 5, 3, 2, 6, 7])

Cette méthode est utilisée pour le débogage et le prototypage sur de


petits ensembles de données
• à partir d’une source de données sur disque (par exemple, un fichier ou
une base de données)

Algassimou Diallo 6
Big Data
Introduction

sc.textFile("hdfs://sar01:9000/data/sample_text.txt")

Cette méthode est utilisée en production pour traiter de grands


ensembles de données.

Algassimou Diallo 6
Big Data
Introduction

Nombre de partitions
• RDDs créés avec parallelize
• Mode local : nombre de cœurs sur les machines locales.
• Mode cluster : nombre total de cœurs sur tous les noeuds d’exécution,
ou 2, le plus grand des deux.
• RDD créés à partir de fichiers stockés dans HDFS
• Nombre de blocs HDFS dans le fichier d’entrée, ou 2, la valeur la plus
élevée étant retenue.

Algassimou Diallo 7
Big Data
Introduction

Transformation RDD
• Une transformation est une opération qui prend en charge un ou
plusieurs RDD et renvoie un nouveau RDD.
• Une transformation est appliquée en parallèle sur chaque partition.

Algassimou Diallo 8
Big Data
Introduction

Algassimou Diallo 8
Big Data
Introduction

Transformation RDD: map


Une fonction 𝑓 et un RDD
< 𝑥(𝑖) | 0 ≤ 𝑖 ≤ 𝑛 > returns < 𝑓(𝑥(𝑖)) | 0 ≤ 𝑖 ≤ 𝑛 >.

Algassimou Diallo 9
Big Data
Introduction

La partition i du RDD d’entrée se trouve sur le même nœud que la


partition i du RDD de sortie.

Algassimou Diallo 9
Big Data
Introduction

Transformation RDD: flatMap


flatMap est utilisé à la place de map lorsque la fonction f renvoie une liste
et que les résultats doivent être aplatis.soient aplatis.

Algassimou Diallo 10
Big Data
Introduction

Transformation RDD: filter


Une fonction 𝑓 et un RDD
< 𝑥(𝑖) | 0 ≤ 𝑖 ≤ 𝑛 > returns < 𝜉 | 0 ≤ 𝑖 ≤ 𝑛, 𝑝(𝑥(𝑖)) est vrai >.

Algassimou Diallo 11
Big Data
Introduction

Transformation RDD: union


prend en compte deux RDD et renvoie un nouveau RDD contenant les
éléments du premier et du second RDD avec des répétitions.

Algassimou Diallo 12
Big Data
Introduction

Algassimou Diallo 12
Big Data
Introduction

Transformation RDD: distinct


prend un RDD en entrée et renvoie un nouveau RDD contenant les
éléments du RDD en entrée sans répétitions.

Algassimou Diallo 13
Big Data
Introduction

Contrairement aux transformations précédentes, les transformations


distinctes entraînent un brassage des données.

Algassimou Diallo 13
Big Data
Introduction

Brassage des données


A quelle partition appartient l’élément 23 dans le RDD obtenue après
l’application de la transformation distincte ?

Algassimou Diallo 14
Big Data
Introduction

Algassimou Diallo 14
Big Data
Introduction

Brassage des données


A quelle partition appartient l’élément 23 dans le RDD obtenue après
l’application de la transformation distincte ?

Algassimou Diallo 14
Big Data
Introduction

L’élément 23 appartient à la partition 3.


Lors du brassage, la partition de destination p d’un élément K dans un
RDD à n partitions est calculée de la manière suivante RDD avec n
partitions est calculée comme suit :

𝑝 = hashCode(𝐾) mod 𝑛

Algassimou Diallo 14
Big Data
Introduction

Transformation RDD: intersection


prend en compte un ou deux RDD et renvoie un nouveau RDD contenant
les éléments présents dans les deux RDD.

Algassimou Diallo 15
Big Data
Introduction

Algassimou Diallo 15
Big Data
Introduction

Transformation Narrow
Une transformation “Narrow” est une transformation dans laquelle
chaque partition du RDD de sortie dépend d’au plus une partition du RDD
d’entrée
• Les transformations “Narrow” sont peu coûteuses.
• Pas besoin de communication entre les executors
Lesquelles des transformations ci-dessus sont “Narrow” ?

Algassimou Diallo 16
Big Data
Introduction

Transformation Narrow
Une transformation “Narrow” est une transformation dans laquelle
chaque partition du RDD de sortie dépend d’au plus une partition du RDD
d’entrée
• Les transformations “Narrow” sont peu coûteuses.
• Pas besoin de communication entre les executors
filter, map, flatMap and union sont des transformations “Narrow"

Algassimou Diallo 16
Big Data
Introduction

Transformation Wide
Une transformation “Wide” est une transformation dans laquelle chaque
partition du RDD de sortie de sortie peut dépendre de plusieurs partitions
du RDD d’entrée.
• Les transformations “Wide” sont plus coûteuses.
• Les executor doivent communiquer.
• Les données sont réparties sur le cluster.
Lesquelles des transformations ci-dessus sont “Wide” ?

Algassimou Diallo 17
Big Data
Introduction

Transformation Wide
Une transformation “Wide” est une transformation dans laquelle chaque
partition du RDD de sortie de sortie peut dépendre de plusieurs partitions
du RDD d’entrée.
• Les transformations “Wide” sont plus coûteuses.
• Les executor doivent communiquer.
• Les données sont réparties sur le cluster.
distinct and intersection sont des transformations “Wide"

Algassimou Diallo 17
Big Data
Introduction

RDD actions
Une action est une opération qui prend en charge un RDD et renvoie une
valeur au pilote après avoir effectué un calcul de l’ensemble de données.

Algassimou Diallo 18
Big Data
Introduction

RDD actions
Une action est une opération qui prend en charge un RDD et renvoie une
valeur au pilote après avoir effectué un calcul de l’ensemble de données.
• Le résultat d’une action est envoyé au programe pilote.
• Si le résultat est une liste de valeurs, toutes les valeurs sont envoyées au
programe pilote.
• Le résultat d’une action peut également être écrit sur le disque(le
système de fichiers local ou sur HDFS)

Algassimou Diallo 18
Big Data
Introduction

Action RDD: reduce


prend en entrée un RDD et une fonction f et applique la fonction par
paire à tous les éléments du RDD d’entrée.

Algassimou Diallo 19
Big Data
Introduction

• La fonction f doit prendre 2 arguments.


• Le type de la valeur retournée par la fonction f doit être le que le type
des éléments du RDD d’entrée

Algassimou Diallo 19
Big Data
Introduction

Action RDD: collect


prend en compte un RDD et renvoie la liste des éléments de ce RDD.

Algassimou Diallo 20
Big Data
Introduction

Collect Sans Danger?


Quels sont les risques, s’il y en a, lors de l’invocation de collect() sur un
grand RDD ?

Algassimou Diallo 21
Big Data
Introduction

Collect Sans Danger?


Quels sont les risques, s’il y en a, lors de l’invocation de collect() sur un
grand RDD ?
• Trafic réseau élevé.
• La mémoire du pilote n’est peut-être pas suffisante pour stocker tous
les éléments RDD

Algassimou Diallo 21
Big Data
Introduction

Action RDD: count


prend un RDD et renvoie le nombre d’éléments dans le RDD.

Algassimou Diallo 22
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.map(lambda x: x.capitalize())

Algassimou Diallo 23
Big Data
Introduction

Algassimou Diallo 23
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.map(lambda x: x.capitalize())

• r2 est un RDD (résultat d’une transformation).


• r2 a autant d’éléments que r1.

Algassimou Diallo 23
Big Data
Introduction

• Chaque élément de r2 est une chaîne de r1 dont la première lettre est en


en majuscules.

Algassimou Diallo 23
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.filter(lambda x: len(x) > 10)

Algassimou Diallo 24
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.filter(lambda x: len(x) > 10)

• r2 est un RDD (résultat d’une transformation).


• r2 a moins d’éléments que r1.
• r2 ne contient que les éléments de r1 qui ont plus de 10 caractères

Algassimou Diallo 24
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.reduce(lambda x, y: "{} - {}".format(x, y))

Algassimou Diallo 25
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.reduce(lambda x, y: "{} - {}".format(x, y))

• r2 est une chaîne de caractères, pas un RDD (résultat d’une action).


• r2 est la chaîne “computer science - geology - chemistry - biology -
astronomy”.

Algassimou Diallo 25
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.reduce(lambda x, y: [x + y])

Algassimou Diallo 26
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["computer science", "geology", \


"chemistry", "biology", "astronomy"])
r2 = r1.reduce(lambda x, y: [x + y])

Le code est incorrect, car le type de retour (liste) de la fonction reduce est
différent du type des éléments RDD en entrée (string).

Algassimou Diallo 26
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["author", "title", "edition"])


r2 = r1.flatMap(lambda x: [c for c in x])

Algassimou Diallo 27
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["author", "title", "edition"])


r2 = r1.flatMap(lambda x: [c for c in x])

• r2 est un RDD (résultat d’une transformation).


• Chaque élément de r2 est une lettre d’une chaîne de caractères
contenue dans r1.

Algassimou Diallo 27
Big Data
Introduction

Comprendre le code
Que fait le code suivant ?

r1 = sc.parallelize(["author", "title", "edition"])


r2 = r1.flatMap(lambda x: [c for c in x])

• r2 est un RDD (résultat d’une transformation).


• Chaque élément de r2 est une lettre d’une chaîne de caractères
contenue dans r1.
• En quoi cela si nous avions utilisé map au lieu de flatMap ?

Algassimou Diallo 27
Big Data
Introduction

Clé-Valeur RDD
RDD dont chaque élément est une paire (k, v ), k étant la clé et v la valeur.
• Les RDD clé-valeur sont des éléments de base importants dans de
nombreuses applications.
• Les RDD clé-valeur prennent en charge toutes les transformations et
actions qui peuvent être appliquées aux RDD ordinaires.
• Les RDD clé-valeur prennent en charge des transformations et des
actions spéciales.

Algassimou Diallo 28
Big Data
Introduction

transformation RDD clé-valeur: reduceByKey


prend un RDD avec des paires (K,V) et une fonction f et renvoie un
nouveau RDD de paires (K,V ) où les valeurs de chaque clé sont agrégées à
l’aide de la fonction f qui doit être de type (𝑉 , 𝑉 ) → 𝑉

Algassimou Diallo 29
Big Data
Introduction

Algassimou Diallo 29
Big Data
Introduction

• Le RDD d’entrée comporte un certain nombre de partitions n.


• Aucune hypothèse ne peut être faite sur l’appartenance d’un élément à
une partition.
• Le RDD renvoyé par reduceByKey est partitionné par hachage. Chaque
élément appartient à une partition précise.
• Le numéro de partition p d’une paire (K,V) est calculé comme suit:
𝑝 = hashCode(𝐾) mod num partitions

Algassimou Diallo 29
Big Data
Introduction

transformation RDD clé-valeur: groupByKey


prend un RDD avec des paires (K, V) et renvoie un nouveau RDD de paires
(K, Iterable).

Algassimou Diallo 30
Big Data
Introduction

transformation RDD clé-valeur: mapValues


prend un RDD avec des paires (K, V) et une fonction f et renvoie un
nouveau RDD où la fonction f est appliquée à chaque valeur V.

Algassimou Diallo 31
Big Data
Introduction

Exemple: Nombre de mots


def word_count(input_file):
text = sc.textFile(input_file)
return text.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda x, y: x+y)

• La fonction textFile lit un fichier texte dans un RDD.

Algassimou Diallo 32
Big Data
Introduction

• Deux transformations “Narrow” (flatMap et map) et une transformation


Wide (reduceByKey).

Algassimou Diallo 32
Big Data
Introduction

Exemple: Nombre de mots


def word_count(input_file):
text = sc.textFile(input_file)
return text.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda x, y: x+y)

• La fonction textFile lit un fichier texte dans un RDD.

Algassimou Diallo 32
Big Data
Introduction

• Deux transformations “Narrow” (flatMap et map) et une transformation


Wide (reduceByKey).
Spark maintient un plan d’exécution logique (appelé RDD lineage) décrit
comme un graphe acyclique dirigé (DAG).

Algassimou Diallo 32
Big Data
Introduction

RDD lineage
Spark dispose d’un planificateur DAG
qui divise le graphe en plusieurs
“Stage” (étapes).

Algassimou Diallo 33
Big Data
Introduction

RDD lineage Stages


• Les séquences de transformations
“Narrow” sont regroupées en une
seule étape.
• Les transformations “Wide”
déclenchent toujours une nouvelle
étape.

Algassimou Diallo 34
Big Data
Introduction

Les étapes qui


n’ont pas de
dépendance
peuvent être
exécutées en
parallèle

Algassimou Diallo 34
Big Data
Introduction

RDD lineage Tasks


• Le planificateur DAG soumet les
étapes au planificateur de tâches.
• Il crée autant de tâches qu’il y a de
partitions dans le RDD.
• Les tâches sont exécutées en
parallèle.

Algassimou Diallo 35
Big Data
Introduction

RDD lineage: tolérance au faute

Algassimou Diallo 36
Big Data
Introduction

Que faire lorsqu’une partition est


perdue ?

Algassimou Diallo 36
Big Data
Introduction

• Les partitions perdues peuvent


être recalculées grâce au graphe
• Il n’est pas nécessaire d’enregistrer
les résultats intermédiaires sur le
disque.

Algassimou Diallo 36
Big Data
Introduction

Évaluation paresseuse
• Dans Spark, les transformations sont évaluées paresseusement.
• Lorsqu’une transformation est invoquée, Spark ne l’exécute pas
immédiatement. Les transformations ne sont exécutées que lorsque la
première action est appelée
• Un RDD peut être considéré comme un ensemble d’instructions sur la
manière de calculer les données que nous construisons au moyen de
transformations.

Algassimou Diallo 37
Big Data
Introduction

• L’évaluation paresseuse permet de réduire le nombre de passages


nécessaires pour charger et transformer les données.

Algassimou Diallo 37
Big Data
Introduction

Évaluation paresseuse: exemple


lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
nb_lines = exceptions.count()
print("Number of exception lines ", nb_lines)

Que se passe-t-il si Spark exécute immédiatement chaque


transformation ?

Algassimou Diallo 38
Big Data
Introduction

Algassimou Diallo 38
Big Data
Introduction

Évaluation paresseuse: exemple


lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
nb_lines = exceptions.count()
print("Number of exception lines ", nb_lines)

Que se passe-t-il si Spark exécute immédiatement chaque


transformation ?
• L’invocation de sc.textFile() ne charge pas immédiatement les données.

Algassimou Diallo 38
Big Data
Introduction

• La transformation filter() n’est pas appliquée lorsqu’elle est invoquée.


• Les transformations ne sont appliquées que lorsque l’action count() est
invoquée.
• Seules les données qui répondent à la contrainte du filtre sont chargées
à partir du fichier.
Sans l’évaluation paresseuse, nous aurions chargé dans la mémoire
principale tout l’intégralité du contenu du fichier d’entrée.

Algassimou Diallo 38
Big Data
Introduction

Évaluation paresseuse: consequence


• Le code suivant invoque deux actions : lesquelles ?
• Que se passe-t-il lorsque nous invoquons la deuxième action ?

lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
nb_lines = exceptions.count()
exceptions.collect()

Algassimou Diallo 39
Big Data
Introduction

Algassimou Diallo 39
Big Data
Introduction

Évaluation paresseuse: consequence


• Le code suivant invoque deux actions : lesquelles ?
• Que se passe-t-il lorsque nous invoquons la deuxième action ?

lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
nb_lines = exceptions.count()
exceptions.collect()

Algassimou Diallo 39
Big Data
Introduction

• Avec l’évaluation paresseuse, les transformations sont calculées chaque


fois qu’une action est invoquée sur un RDD donné.
• Dans l’exemple precedent, toutes les transformations sont calculées
lorsque nous invoquons la fonction count() et la fonction collect().

Algassimou Diallo 39
Big Data
Introduction

Évaluation paresseuse: consequence


• Le code suivant invoque deux actions : lesquelles ?
• Que se passe-t-il lorsque nous invoquons la deuxième action ?

lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
nb_lines = exceptions.count()
exceptions.collect()

Algassimou Diallo 39
Big Data
Introduction

• Avec l’évaluation paresseuse, les transformations sont calculées chaque


fois qu’une action est invoquée sur un RDD donné.
• Dans l’exemple precedent, toutes les transformations sont calculées
lorsque nous invoquons la fonction count() et la fonction collect().
Pour éviter de calculer les transformations plusieurs fois, nous pouvons
persister les données

Algassimou Diallo 39
Big Data
Introduction

Persistance des données


• La persistance des données consiste à mettre en cache le résultat des
transformations.
• Soit en mémoire principale (par défaut),
• soit sur disque
• soit les deux.
• Si un nœud du cluster tombe en panne, Spark recalcule les partitions
persistées.

Algassimou Diallo 40
Big Data
Introduction

• Nous pouvons répliquer les partitions persistantes sur d’autres nœuds


afin de récupérer des pannes sans avoir à recalculer

lines = sc.textFile("./data/logfile.txt")
exceptions = lines.filter(lambda line : "exception" in line)
exceptions.persist(StorageLevel.MEMORY_AND_DISK)
nb_lines = exceptions.count()
exceptions.collect()

• persist() est appelée juste avant la première action.

Algassimou Diallo 40
Big Data
Introduction

• persist() ne force pas l’évaluation des transformations.


• unpersist() peut être appelé pour expulser les partitions persistées.

Algassimou Diallo 40

You might also like