""" Example consumer for SQS queue fed by Firehose. This script will only stop for keyboard interrupts or killing the process. """ import json import boto.sqs import sys import time import base64 import zlib import logging, logging.handlers import pprint #Global variables for SQS connection information confFileName = "fhConConf.json" baseOutFileName = "firehoselog.txt" queueName = '' aws_key ='' aws_secret_key = '' aws_region = '' def read(logger, queue): """ This method reads a message from the queue, removes it, and returns a list of Dictionaries """ #Read single message containing event batch try: rs = queue.get_messages(num_messages=1) m = rs[0] except: logger.exception ('No available messages in queue') return None #Remove message from queue queue.delete_message(m) #Returns decompressed, base64decoded Dictionary uzipmsg = zlib.decompress(base64.b64decode(m.get_body()), zlib.MAX_WBITS | 16) #print uzipmsg mjson = json.loads(uzipmsg) return mjson def write(data_dict, logger): """ This method needs to be completed by the client. It's content is totally based on how the data will be stored or otherwise processed """ # For now we will print the data logger.info('%s', data_dict) # Logger is here in case there are exceptions def init_logger(logLevel): """ Inits the logger and returns the logger object """ global baseOutFileName lgr = logging.getLogger('sqsReader') lgr.setLevel(logging.INFO) handler = logging.FileHandler(filename=baseOutFileName) handler.setLevel(logLevel) lgr.addHandler(handler) return lgr def initVars(logger): """ This method loads the configs from the config file. Please add another json object to the config file to include your database connection info. """ global aws_key global aws_secret_key global aws_region global queueName global confFileName #Read config file and set global variables try: conf = json.load(open(confFileName, 'r')) queueName = conf['conn_conf']['sqs_queue_name'] aws_key = conf['conn_conf']['aws_key'] aws_secret_key = conf['conn_conf']['aws_secret_key'] aws_region = conf['conn_conf']['aws_region'] except Exception, e: logger.error('Config File unable to be opened %s', e) sys.exit() if __name__ == '__main__': #Init the logger and global vars logger = init_logger(logging.INFO) initVars(logger) print boto.Version #Get the SQS connection try: conn = boto.sqs.connect_to_region(aws_region, aws_access_key_id=aws_key, aws_secret_access_key=aws_secret_key) q = conn.get_queue(queueName) except Exception, e: logger.error('Unable to connect to queue %s', e) sys.exit() #Iterate until messages are exhausted while True: evt = read(logger, q) #If there are no available messages, sleep a second and try again if evt is not None: write(evt, logger) else: time.sleep(1) conn.close()