You can create SageMaker Notebook in 2 ways (choose 1 of 2):
Go to AWS Management Console
notebook
First, we will import libraries
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
raw_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "raw2022")
reference_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "reference_data")
raw_data.printSchema()
reference_data.printSchema()
print('raw_data (Count) = ' + str(raw_data.count()))
print('reference_data (Count) = ' + str(reference_data.count()))
raw_data.toDF().show(5)
reference_data.toDF().show(5)
# Adding raw_data as a temporary table in sql context for spark
raw_data.toDF().createOrReplaceTempView("temp_raw_data")
# Running the SQL statement which
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print("Running (count) : " + str(runningDF.count()))
runningDF.show(5)
# Running the SQL statement which
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print("Working (count) : " + str(workingDF.count()))
workingDF.show(5)
def filter_function(dynamicRecord):
if dynamicRecord['activity_type'] == 'Running':
return True
else:
return False
runningDF = Filter.apply(frame = raw_data, f = filter_function)
print("Running (count) : " + str(runningDF.count()))
workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')
print("Working (count) : " + str(workingDF.count()))
joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')
joined_data.printSchema()
joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])
try:
datasink = glueContext.write_dynamic_frame.from_options(
frame = joined_data_clean, connection_type = "s3",
connection_options = {"path": "s3://yourname-datalake-demo-bucket/data/processed-data/"},
format = "parquet")
print('Transformed data written to S3')
except Exception as ex:
print('Something went wrong')
print(ex)
glueclient = boto3.client('glue',region_name='us-east-1')
response = glueclient.start_crawler(Name='summitcrawler')
print('---')
crawler_state = ''
while (crawler_state != 'STOPPING'):
response = glueclient.get_crawler(Name='summitcrawler')
crawler_state = str(response['Crawler']['State'])
time.sleep(1)
print('Crawler : Stopped')
print('---')
time.sleep(3)
print('** Summitdb has following tables**')
response = glueclient.get_tables(
DatabaseName='summitdb',
)
for table in response['TableList']:
print(table['Name'])