Bạn thực hiện tạo SageMaker Notebook bằng 2 cách (chọn 1 trong 2):
Truy cập AWS Management Console

Chọn Notebooks

Nhập tên notebook là notebook

Đợi dao động khoảng 2-3 phút sẽ hoàn thành notebook.

Bạn Run đoạn code đầu để khởi tạo Session

Trước tiên, chúng ta tải file notebook từ First Cloud Journey.

Vậy là bạn đã hoàn thành khởi tạo 1 Interactive Session

Truy cập vào AWS Glue Studio

Trong giao diện Jobs

Hoàn thành tạo Notebook.

Hoàn thành khởi tạo session.

Đầu tiên, chúng ta sẽ import libraries
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

raw_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "raw2022")
reference_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "reference_data")

raw_data.printSchema()
reference_data.printSchema()

print('raw_data (Count) = ' + str(raw_data.count()))
print('reference_data (Count) = ' + str(reference_data.count()))

raw_data.toDF().show(5)
reference_data.toDF().show(5)

# Adding raw_data as a temporary table in sql context for spark
raw_data.toDF().createOrReplaceTempView("temp_raw_data")
# Running the SQL statement which 
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print("Running (count) : " + str(runningDF.count()))
runningDF.show(5)

# Running the SQL statement which 
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print("Working (count) : " + str(workingDF.count()))
workingDF.show(5)

def filter_function(dynamicRecord):
	if dynamicRecord['activity_type'] == 'Running':
		return True
	else:
		return False
runningDF = Filter.apply(frame = raw_data, f = filter_function)
print("Running (count) : " + str(runningDF.count()))

workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')
print("Working (count) : " + str(workingDF.count()))

joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')
joined_data.printSchema()

joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])


try:
    datasink = glueContext.write_dynamic_frame.from_options(
        frame = joined_data_clean, connection_type = "s3",
        connection_options = {"path": "s3://yourname-datalake-demo-bucket/data/processed-data/"},
        format = "parquet")
    print('Transformed data written to S3')
except Exception as ex:
    print('Something went wrong')
    print(ex)

glueclient = boto3.client('glue',region_name='us-east-1')
response = glueclient.start_crawler(Name='summitcrawler')
print('---')
crawler_state = ''
while (crawler_state != 'STOPPING'):
    response = glueclient.get_crawler(Name='summitcrawler')
    crawler_state = str(response['Crawler']['State'])
    time.sleep(1)
print('Crawler : Stopped')
print('---')
time.sleep(3)

print('** Summitdb has following tables**')
response = glueclient.get_tables(
    DatabaseName='summitdb',
)
for table in response['TableList']:
    print(table['Name'])



