import datetime from pytz import timezone import re import pytz from gunclock.GunClock import GunClock from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata print "=========================================================================================" print ('(1) request to produce Kafka message') print print ("request to produce Kafka message... [https://gunclocks-rails.herokuapp.com/gunclocks/tweet]") import requests requests.get('https://gunclocks-rails.herokuapp.com/gunclocks/tweet') consumer = KafkaConsumer( 'lxau-gunman-topic', group_id='my-group', bootstrap_servers=['steamer-01.srvs.cloudkafka.com:9093','steamer-02.srvs.cloudkafka.com:9093','steamer-03.srvs.cloudkafka.com:9093'], security_protocol='SSL', ssl_check_hostname=True, ssl_cafile='/dbfs/FileStore/tables/4l13fnd71500225010104/CARoot.pem', ssl_certfile='/dbfs/FileStore/tables/4l13fnd71500225010104/certificate.pem', ssl_keyfile='/dbfs/FileStore/tables/4l13fnd71500225010104/key.pem', auto_offset_reset='latest') print ("wait for Kafka message...") print print ("kafka message received.") print "=========================================================================================" print ('(2) listing as csvdata') print "--------------------------------------------------------------------------------" messagenum=0 csvdata = [] for message in consumer: consumer.commit() csvline = str(messagenum) + "," + message.value.replace("\n", "
") + "," + str(message.partition) + "," + str(message.offset) print (csvline) csvdata = csvdata + [ csvline.decode('utf-8') ] messagenum+=1 if messagenum == 10: break print "--------------------------------------------------------------------------------" print print print "=========================================================================================" print "(3) create RDD from csvdata" print print '[python] rdd = sc.parallelize(csvdata).map(lambda line: line.split(",")))' #[u'0', u'0001', u'2017-07-29 14:17:26 +0000', u'1210toki', u'message...', u'3', u'4433'] rdd = sc.parallelize(csvdata).map(lambda line: line.split(",")) #print(rdd.collect()) print print "[result: rdd]" print "--------------------------------------------------------------------------------" print "#, seq, dateTime, user-id, tweet-message, partition, offset" print "--------------------------------------------------------------------------------" for d in rdd.collect(): print(d[0] + ", " + d[1] + ", " + d[2] + ", " + d[3] + ", " + d[4][0:20].replace("\n", " ") + "..." + ", " + d[5] + ", " + d[6]) print "--------------------------------------------------------------------------------" print print print "=========================================================================================" print "(4) map(change column-order) & sort(by seq-number)" print print "[python] rdd2 = rdd.map(lambda d: d[0:2] + d[5:7] + d[2:5]).sortBy(lambda d: d[1])" rdd2 = rdd.map(lambda d: d[0:2] + d[5:7] + d[2:5]).sortBy(lambda d: d[1]) print print "[result: rdd2]" print "--------------------------------------------------------------------------------" print "#, seq, pt, ofs, dateTime, user-id,\ttweet-message" print "--------------------------------------------------------------------------------" for d in rdd2.collect(): print(d[0] + ", " + d[1] + ", " + d[2] + ", " + d[3] + ", " + d[4] + ", " + d[5] + ",\t" + d[6][0:20].replace("\n", " ")+"...") print "--------------------------------------------------------------------------------" #print(rdd2.collect()) print print print "=========================================================================================" print "(5) map(select column)" print print "[python] rdd_time = rdd2.map(lambda d: d[4])" rdd_time = rdd2.map(lambda d: d[4]) print print "[result: rdd_time]" print "--------------------------------------------------------------------------------" print "dateTime" print "--------------------------------------------------------------------------------" for d in rdd_time.collect(): print(d) print "--------------------------------------------------------------------------------" print print print "=========================================================================================" print "(6) reduce (compareTime)" print print "[python]" print "-------------------------------------" print "def compareTime( t1, t2 ):" print " if ( t1 > t2 ):" print " return t1" print " else:" print " return t2" print print "guessedCurrentTime = rdd_time.reduce(compareTime)" print "-------------------------------------" def compareTime( t1, t2 ): if ( t1 > t2 ): return t1 else: return t2 guessedCurrentTime = rdd_time.reduce(compareTime) print print "[result: guessedCurrentTime]" print "--------------------------------------------------------------------------------" print str(guessedCurrentTime) print "--------------------------------------------------------------------------------" s = re.search(r'(....-..-.. ..:..:..)', str(guessedCurrentTime)) timeString = s.group(1) nativeTime = datetime.datetime.strptime(timeString, '%Y-%m-%d %H:%M:%S') utcTime = timezone('UTC').localize(nativeTime) tztokyo = pytz.timezone('Asia/Tokyo') jstTime = utcTime.astimezone(tztokyo) print "--------------------------------------------------------------------------------" print(str(jstTime) + " [JST]") print "--------------------------------------------------------------------------------" guessedCurrentJstTime = jstTime.strftime('%H:%M (%z)') gunclockStr = GunClock(18,guessedCurrentJstTime[0:5]).toString() print print print "=========================================================================================" print "(7) GunClock" print print "--------------------------------------------------------------------------------" print gunclockStr print "--------------------------------------------------------------------------------"