Chuyển đến nội dung chính

Môt số operation trong pyspark RDD

 Trong RDD viết tắt bởi Resilient Distributed Dataset pyspark có một số operation chú ý và RDDs là những thành phần không thể thay đổi được, chúng được thực thi trên đa node và thực thi một cách song song parrallel. 

1. Count() - đếm.  - Số lượng đơn vị element trong RDD trả về.

Ví dụ: 

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

và khi chaỵ thì output ra: 

Number of elements in RDD → 8.

 2. Collect() - Tất cả các thành phần trong RDD được trả về nguyên vặn: 

Ví dụ: 

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Và output ra là: 

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

3. foreach()

Nó trả về kết quả các thành phần khớp với điều kiện của function bên trong foreach, 

ví dụ: 

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Và output ra là: 

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

4. filter()

Trả về thành phần thỏa mãn function bên trong filter, ví dụ: 

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Và output ra là những string có chữ spark 

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

5. map() là apply 1 function cho mỗi thành phần trong RDD, 

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)

Vi dụ trên ta map một key value cho mỗi element có giá trị là 1

Output ra sẽ là: 

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

6. Reduce()

Ta lấy ví dụ trước: 

from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)

Kết quả output ra là: 

Adding all the elements -> 15

Trong hàm trên thì nó sẽ cộng: 1 + 2 + 3 + 4 + 5 lại với nhau. 

Vậy reduce nó sẽ thực thi viêc cộng dòn binary lại  với nhau. 

7. join 

Ví dụ: 

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)

output ra là : 

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

Trả về một pair của những thành phần match với key và những giá trị của key đó. 

8. cache()

Ấn định RDD này với việc lưu trự mặc định là trên memory 

from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)

Và output ra là: 

Words got cached -> True

Nhận xét

Bài đăng phổ biến từ blog này

Trang web medium.com chết, vì sao ?

 Medium.com là trang web có những bài viết về IT, phần mềm và công nghệ nói chung rất phổ biến và chất lượng.  Nhưng khoảng 1 tuần nay không vào được bằng cả mạng viettel và 4G của Mobi phone.  Dân It thường tham khảo các bài viết trên trang này. Nhiều bài viết hay và chất lượng. là một cơ sở dữ liệu lớn cho dân IT nhưng đã bị chặn.  Các bài viết này thường thì có tính quy chuẩn và dài hơn, các topic có chất lượng hơn so với stackoverflow.  Nhưng các subdomain thì vẫn truy cập được như:  https://about.medium.com Bạn có thể tham khảo lý do vì sao medium.com bị chặn ở Việt nam. tại trang tinh tế:  Nói chung mình không thích điều này.  https://tinhte.vn/thread/website-medium-khong-truy-cap-duoc-la-do-website-chet-hay-chan-ip-viet-nam-nhi.3231608/

GitHub Actions là gì ? Làm quen với GitHub Actions.

  Như ta đã biết github là nới chứa source code nổi tiếng thế giới hiện nay, ngoài github còn có gitlab, bitbucket, codecommit, ... Cơ bản github miễn phí cho người dùng developer, nếu nhu cầu sử dụng nhiều repo cũng như project có nhiều thanh viên developer tham gia thì bạn có thể mua bản nâng cao.  Về tiến trình CI/CD process, chúng ta có thể biết tới như Jenkins, Team City, Codepipeline trên AWS, ...  GitHub Actions mới ra đời gần đây , ngày 13 tháng 11 năm 2019, GitHub Actions ra phiên bản đầu tiên, trước đó khoảng 1 năm bản beta ra đời.  Tham khảo tại đây : https://github.blog/2019-08-08-github-actions-now-supports-ci-cd/ GitHub Actions khá đơn giản, khi tiến trình build, test và deploy được viết trong một file có định dạng yaml nằm trong thư mục .github/workflows/ trong chính source code của bạn.  GitHub Actions là một event-driven nghĩa là chương trình chạy một loạt các dòng lệnh khi bạn nhận được 1 event, ví dụ mỗi lần một developer nào đó tạo một pull request cho một repositor

Cách sử dụng sys.argv trong python.

Cách sử dụng sys.argv trong python. sys.argv là môt danh sách [list] trong python, nó được sư dụng khi bạn chạy một lệnh command-line nào đó trên hệ thống. Và argument này được đẩy vào script python để thực thi khi chạy câu lệnh. Ví dụ: python sys.argv arg1 arg2 Trước tiên bạn phải import mô đun sys trong script. import sys print "This is the name of the script: " , sys . argv [ 0 ] print "Number of arguments: " , len ( sys . argv ) print "The arguments are: " , str ( sys . argv ) Tên của script này : sysargv.py Số lượng arg là : 1 Arg là : ['sysargv.py'] python test1020.py 111 This is the name of the script:  test1020.py Number of arguments:  2 The arguments are:  ['test1020.py', '111']