Amazon-Kinesis [SOLVED]: Read from Kinesis is giving empty records when run using previous sequence number or timestamp

Amazon-Kinesis [SOLVED]: Read from Kinesis is giving empty records when run using previous sequence number or timestamp

Home Forums Amazon Web Services Amazon Kinesis Amazon-Kinesis [SOLVED]: Read from Kinesis is giving empty records when run using previous sequence number or timestamp

Viewing 2 posts - 1 through 2 (of 2 total)
  • Author
    Posts
  • #201351

    Cloudy Point
    Keymaster

    QuestionQuestion

    I am trying to read the messages pushed to Kinesis stream with the help of

    get_records() and get_shard_iterator() APIs.

    My producer keeps pushing the records when processed at it’s end and consumer also keeps running as a cron every 30 minutes. So, I tried storing the sequence number of the current message read in my database and use AFTER_SEQUENCE_NUMBER shard iterator along with the sequence number last read. However, the same won’t work for the second time (first time successfully read all messages in the stream) after new messages are pushed.

    I also tried using AT_TIMESTAMP along with message timestamp that producer pushed to stream as part of the message and stored that message to be further used. Again, first run processes all messages and from the second run I get empty records.

    I am really not sure where I am going wrong. I would appreciate if someone can help me in this.

    Providing the code below using timestamp but the same thing is done for sequence number method too.

    def listen_to_kinesis_stream():
    kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
    stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])
    
    for shard_info in stream_response['StreamDescription']['Shards']:
        kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
        last_read_ts = kinesis_stream_status.get('state', {}).get(
            shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))
    
        shard_iterator = kinesis_client.get_shard_iterator(
            StreamName=SETTINGS['kinesis_stream'],
            ShardId=shard_info['ShardId'],
            ShardIteratorType='AT_TIMESTAMP',
            Timestamp=last_read_ts)
    
        get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
        if len(get_response['Records']) == 0:
            continue
    
        message = json.loads(get_response['Records'][0]['Data'])
        process_resp = process_message(message)
        if process_resp['success'] is False:
            print process_resp
        generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
        print "Processed {0}".format(message)
    
        while 'NextShardIterator' in get_response:
            get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
            if len(get_response['Records']) == 0:
                break
    
            message = json.loads(get_response['Records'][0]['Data'])
            process_resp = process_message(message)
            if process_resp['success'] is False:
                print process_resp
            mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
            print "Processed {0}".format(message)
    
    logger.debug("Processed all messages from Kinesis stream")
    print "Processed all messages from Kinesis stream"
    

    #201352

    Cloudy Point
    Keymaster

    Accepted AnswerAnswer

    As per my discussion with AWS technical support person, there can be a few messages with empty records and hence it is not a good idea to break when len(get_response[‘Records’]) == 0.

    The better approach suggested was – we can have a counter indicating maximum number of messages that you read in a run and exit loop after reading as many messages.

    Source: https://stackoverflow.com/questions/46544716/read-from-kinesis-is-giving-empty-records-when-run-using-previous-sequence-numbe
    Author: kausal_malladi
    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.

Viewing 2 posts - 1 through 2 (of 2 total)

You must be logged in to reply to this topic.