Ví dụ đơn giản về Spark script

 Hôm này đụng lại Spark trong việc xử lý data transformation. 

Bài này nói về  tổng quan về Spark API, spark được xây dựng trên nền tảng distributed datasets. Nó chứa đựng các đối tượng tùy ý. Bạn tạo một dataset từ dữ liệu ngoài, rồi áp nó vào xử lý theo chế độ đồng thời nhiều luồng parallel. Các block của Spark được gọi là RDD(Resilient Distributed Datasets) API, trong RDD API có 2 kiểu xử lý là : transformations và actions 

- Transformations: là những dataset được định nghĩa dựa trên các dataset trước đó. Một vài transformation trên RDD là : flatMap(), map(), reduceKey(), filter() và sortByKey()

- Actions: là khởi đầu thực thi 1 job trong cluster. Một vài actions trên RDD là: 

 count(), collect(), first(), max(), reduce() và ...

Ở high-level RDD API nó cung cấp : DataFrame API và Machine Learning API. 

Sử dụng hàm createDataFrame()


val data = Seq(('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
)

val columns = Seq("firstname","middlename","lastname","dob","gender","salary")
df = spark.createDataFrame(data), schema = columns).toDF(columns:_*)

Ta sử dụng thư viện PyPark trong Python: 


import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

Chúng ta có thể ghi kết quả vào file results.txt

with open('results.txt', 'w') as file_obj:
    file_obj.write(f'Number of lines: {txt.count()}\n')
    file_obj.write(f'Number of lines with python: {python_lines.count()}\n')


Ví dụ RDD API :

Word count (đếm từ )

Trong ví dụ này ta xài một vài transformations để xây dựng dataset của String, int gọi là counts và lưu chúng vào 1 file. 

text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...") 


Trong đoạn code trên counts dùng hàm lambda của python và load text_file từ HDFS và xử lý. 

Ví dụ DataFrame API: 

Text Search: 


textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()

Xử lý data đơn giản: 


# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

Ví dụ về Machine Learning: 

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

Nhận xét

Bài đăng phổ biến từ blog này

Trang web medium.com chết, vì sao ?

Cách sử dụng sys.argv trong python.

Thiết kế một RESTful API bằng python và flask.