Chuyển đến nội dung chính

Ví dụ đơn giản về Spark script

 Hôm này đụng lại Spark trong việc xử lý data transformation. 

Bài này nói về  tổng quan về Spark API, spark được xây dựng trên nền tảng distributed datasets. Nó chứa đựng các đối tượng tùy ý. Bạn tạo một dataset từ dữ liệu ngoài, rồi áp nó vào xử lý theo chế độ đồng thời nhiều luồng parallel. Các block của Spark được gọi là RDD(Resilient Distributed Datasets) API, trong RDD API có 2 kiểu xử lý là : transformations và actions 

- Transformations: là những dataset được định nghĩa dựa trên các dataset trước đó. Một vài transformation trên RDD là : flatMap(), map(), reduceKey(), filter() và sortByKey()

- Actions: là khởi đầu thực thi 1 job trong cluster. Một vài actions trên RDD là: 

 count(), collect(), first(), max(), reduce() và ...

Ở high-level RDD API nó cung cấp : DataFrame API và Machine Learning API. 

Sử dụng hàm createDataFrame()


val data = Seq(('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
)

val columns = Seq("firstname","middlename","lastname","dob","gender","salary")
df = spark.createDataFrame(data), schema = columns).toDF(columns:_*)

Ta sử dụng thư viện PyPark trong Python: 


import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

Chúng ta có thể ghi kết quả vào file results.txt

with open('results.txt', 'w') as file_obj:
    file_obj.write(f'Number of lines: {txt.count()}\n')
    file_obj.write(f'Number of lines with python: {python_lines.count()}\n')


Ví dụ RDD API :

Word count (đếm từ )

Trong ví dụ này ta xài một vài transformations để xây dựng dataset của String, int gọi là counts và lưu chúng vào 1 file. 

text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...") 


Trong đoạn code trên counts dùng hàm lambda của python và load text_file từ HDFS và xử lý. 

Ví dụ DataFrame API: 

Text Search: 


textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()

Xử lý data đơn giản: 


# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

Ví dụ về Machine Learning: 

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

Nhận xét

Bài đăng phổ biến từ blog này

Trang web medium.com chết, vì sao ?

 Medium.com là trang web có những bài viết về IT, phần mềm và công nghệ nói chung rất phổ biến và chất lượng.  Nhưng khoảng 1 tuần nay không vào được bằng cả mạng viettel và 4G của Mobi phone.  Dân It thường tham khảo các bài viết trên trang này. Nhiều bài viết hay và chất lượng. là một cơ sở dữ liệu lớn cho dân IT nhưng đã bị chặn.  Các bài viết này thường thì có tính quy chuẩn và dài hơn, các topic có chất lượng hơn so với stackoverflow.  Nhưng các subdomain thì vẫn truy cập được như:  https://about.medium.com Bạn có thể tham khảo lý do vì sao medium.com bị chặn ở Việt nam. tại trang tinh tế:  Nói chung mình không thích điều này.  https://tinhte.vn/thread/website-medium-khong-truy-cap-duoc-la-do-website-chet-hay-chan-ip-viet-nam-nhi.3231608/

GitHub Actions là gì ? Làm quen với GitHub Actions.

  Như ta đã biết github là nới chứa source code nổi tiếng thế giới hiện nay, ngoài github còn có gitlab, bitbucket, codecommit, ... Cơ bản github miễn phí cho người dùng developer, nếu nhu cầu sử dụng nhiều repo cũng như project có nhiều thanh viên developer tham gia thì bạn có thể mua bản nâng cao.  Về tiến trình CI/CD process, chúng ta có thể biết tới như Jenkins, Team City, Codepipeline trên AWS, ...  GitHub Actions mới ra đời gần đây , ngày 13 tháng 11 năm 2019, GitHub Actions ra phiên bản đầu tiên, trước đó khoảng 1 năm bản beta ra đời.  Tham khảo tại đây : https://github.blog/2019-08-08-github-actions-now-supports-ci-cd/ GitHub Actions khá đơn giản, khi tiến trình build, test và deploy được viết trong một file có định dạng yaml nằm trong thư mục .github/workflows/ trong chính source code của bạn.  GitHub Actions là một event-driven nghĩa là chương trình chạy một loạt các dòng lệnh khi bạn nhận được 1 event, ví dụ mỗi lần một developer nào đó tạo một pull request cho một repositor

Cách sử dụng sys.argv trong python.

Cách sử dụng sys.argv trong python. sys.argv là môt danh sách [list] trong python, nó được sư dụng khi bạn chạy một lệnh command-line nào đó trên hệ thống. Và argument này được đẩy vào script python để thực thi khi chạy câu lệnh. Ví dụ: python sys.argv arg1 arg2 Trước tiên bạn phải import mô đun sys trong script. import sys print "This is the name of the script: " , sys . argv [ 0 ] print "Number of arguments: " , len ( sys . argv ) print "The arguments are: " , str ( sys . argv ) Tên của script này : sysargv.py Số lượng arg là : 1 Arg là : ['sysargv.py'] python test1020.py 111 This is the name of the script:  test1020.py Number of arguments:  2 The arguments are:  ['test1020.py', '111']