Bạn thực hiện tạo SageMaker Notebook bằng 2 cách (chọn 1 trong 2):
Truy cập AWS Management Console
Chọn Notebooks
Nhập tên notebook là notebook
Đợi dao động khoảng 2-3 phút sẽ hoàn thành notebook.
Bạn Run đoạn code đầu để khởi tạo Session
Trước tiên, chúng ta tải file notebook từ First Cloud Journey.
Vậy là bạn đã hoàn thành khởi tạo 1 Interactive Session
Truy cập vào AWS Glue Studio
Trong giao diện Jobs
Hoàn thành tạo Notebook.
Hoàn thành khởi tạo session.
Đầu tiên, chúng ta sẽ import libraries
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
raw_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "raw2022")
reference_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "reference_data")
raw_data.printSchema()
reference_data.printSchema()
print('raw_data (Count) = ' + str(raw_data.count()))
print('reference_data (Count) = ' + str(reference_data.count()))
raw_data.toDF().show(5)
reference_data.toDF().show(5)
# Adding raw_data as a temporary table in sql context for spark
raw_data.toDF().createOrReplaceTempView("temp_raw_data")
# Running the SQL statement which
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print("Running (count) : " + str(runningDF.count()))
runningDF.show(5)
# Running the SQL statement which
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print("Working (count) : " + str(workingDF.count()))
workingDF.show(5)
def filter_function(dynamicRecord):
if dynamicRecord['activity_type'] == 'Running':
return True
else:
return False
runningDF = Filter.apply(frame = raw_data, f = filter_function)
print("Running (count) : " + str(runningDF.count()))
workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')
print("Working (count) : " + str(workingDF.count()))
joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')
joined_data.printSchema()
joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])
try:
datasink = glueContext.write_dynamic_frame.from_options(
frame = joined_data_clean, connection_type = "s3",
connection_options = {"path": "s3://yourname-datalake-demo-bucket/data/processed-data/"},
format = "parquet")
print('Transformed data written to S3')
except Exception as ex:
print('Something went wrong')
print(ex)
glueclient = boto3.client('glue',region_name='us-east-1')
response = glueclient.start_crawler(Name='summitcrawler')
print('---')
crawler_state = ''
while (crawler_state != 'STOPPING'):
response = glueclient.get_crawler(Name='summitcrawler')
crawler_state = str(response['Crawler']['State'])
time.sleep(1)
print('Crawler : Stopped')
print('---')
time.sleep(3)
print('** Summitdb has following tables**')
response = glueclient.get_tables(
DatabaseName='summitdb',
)
for table in response['TableList']:
print(table['Name'])