๐Ÿ“ฆ keyurkhant / StreamGuard

๐Ÿ“„ spark_stream.py ยท 42 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import udf
from textblob import TextBlob

# Sentiment analysis function
def analyze_sentiment(text):
    analysis = TextBlob(text)
    return 'positive' if analysis.sentiment.polarity > 0 else 'negative' if analysis.sentiment.polarity < 0 else 'neutral'

# Define UDF
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Spark session
spark = SparkSession.builder \
    .appName("TweetSentimentAnalysis") \
    .getOrCreate()

# Kafka stream
tweets = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "tweets") \
    .load()

# Define schema
schema = StructType().add("sentiment", StringType()).add("text", StringType())

# Process tweets
tweets_df = tweets.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data")).select("data.*")
processed_df = tweets_df.withColumn("sentiment_analysis", analyze_sentiment_udf(tweets_df.text))

# Write results to S3
query = processed_df.writeStream \
    .format("json") \
    .option("path", "s3://your-bucket/tweet-sentiments/") \
    .option("checkpointLocation", "s3://your-bucket/checkpoints/") \
    .start()

query.awaitTermination()