Môt số operation trong pyspark RDD

 Trong RDD viết tắt bởi Resilient Distributed Dataset pyspark có một số operation chú ý và RDDs là những thành phần không thể thay đổi được, chúng được thực thi trên đa node và thực thi một cách song song parrallel. 

1. Count() - đếm.  - Số lượng đơn vị element trong RDD trả về.

Ví dụ: 

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

và khi chaỵ thì output ra: 

Number of elements in RDD → 8.

 2. Collect() - Tất cả các thành phần trong RDD được trả về nguyên vặn: 

Ví dụ: 

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Và output ra là: 

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

3. foreach()

Nó trả về kết quả các thành phần khớp với điều kiện của function bên trong foreach, 

ví dụ: 

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Và output ra là: 

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

4. filter()

Trả về thành phần thỏa mãn function bên trong filter, ví dụ: 

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Và output ra là những string có chữ spark 

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

5. map() là apply 1 function cho mỗi thành phần trong RDD, 

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)

Vi dụ trên ta map một key value cho mỗi element có giá trị là 1

Output ra sẽ là: 

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

6. Reduce()

Ta lấy ví dụ trước: 

from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)

Kết quả output ra là: 

Adding all the elements -> 15

Trong hàm trên thì nó sẽ cộng: 1 + 2 + 3 + 4 + 5 lại với nhau. 

Vậy reduce nó sẽ thực thi viêc cộng dòn binary lại  với nhau. 

7. join 

Ví dụ: 

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)

output ra là : 

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

Trả về một pair của những thành phần match với key và những giá trị của key đó. 

8. cache()

Ấn định RDD này với việc lưu trự mặc định là trên memory 

from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)

Và output ra là: 

Words got cached -> True

Nhận xét

Bài đăng phổ biến từ blog này

Trang web medium.com chết, vì sao ?

Cách sử dụng sys.argv trong python.

Thiết kế một RESTful API bằng python và flask.