Wednesday, November 11, 2015

Increasing query performance with Secondary Indexes in AWS Dynamodb

Introduction

AWS Dynamodb is NoSql database, it can scale up to the requirement by providing high availability and durability. A good introduction can be found here

Problem statement

We have hit a stumbling block where one of our tables contains near a million of data and that is expected to out grow multiples of current size with the time, we needed query that returns fast response out of this millions of data set. Currently we have used scan, which performs a full table scan in order to return the results regardless of number of records in the database. Problem with this approach is that response is very slow and it guaranteed get even slow when records hit huge sizes.

Event table is typically a table that collects event data over a period, it is legacy table so that we can not modify or introduce any hash keys.

Attempts to improve the performance


This is how our scan looked liked initially, against table (Event)


   Map<String, Condition> scanFilter = new HashMap<String, Condition>(); 
Condition condition = new Condition().withComparisonOperator(ComparisonOperator.EQ.toString()) .withAttributeValueList(new AttributeValue().withS(attributeValue)); 
scanFilter.put(attributeType, condition);
scanExpression.setScanFilter(scanFilter); 
PaginatedScanList<T> scan = mapper.scan(Event.class, scanExpression); 
//Event class the model class against table Event in dynamodb





Results with scan

On average it ranged somewhere in 25-30 seconds, which is very sluggish.

Parallel Scan

We re-wrote some of the queries using parallel scan that, considerably improved the performance, but the application still looked bit of slump compared to its responsiveness. Writing parallel scans can be found here.

DynamoDBScanExpression scanExpression = new DynamoDBScanExpression();
Map scanFilter = new HashMap();
Condition condition = new Condition().withComparisonOperator(ComparisonOperator.EQ.toString()) .withAttributeValueList(new AttributeValue().withS(attributeValue)); scanFilter.put(attributeType, condition);
scanExpression.setScanFilter(scanFilter);
PaginatedParallelScanList scan = mapper.parallelScan(classType, scanExpression,totalSegments)


As you can see that, we have used parallel scans, this issues separates jobs over the large quantity of data divided by number of segments. When issuing a parallel scan you need to specify number of segments for which the table to be scanned to.
Another caveat in this approach is, we have to constantly fine tune the number of segments as our db records size grows, this would be become bit of a pain in terms maintenance of application.

Results with parallel scan


On average it ranged somewhere in 10-12 seconds with fine tuned number of segments (20-25).

A Solution

So the typical solution should be looked else where, and there was Secondary Indexes. How Secondary Indexes works exactly can be found in above link. You can create index with the subset of table (Event) fields and issue query or scan against this index. Since most of our queries are related to finding activity information on current day or yesterday. we created the Global Secondary Index using all necessary fields need to be in the result of query and making "occurDate" is the hash key. 

Eg. occurDate = "2015-06-04" 

When you create a index with a hash key, all records against that hash key will be stored under separate bucket for that hash key. For example, all activity records for 2015-06-04 will be under hash_key for "2015-06-04" and all activity records for 2015-06-05 will be under hash_key for "2015-06-05" so on. You can define composite hash key for an index as well, using a range key attribute along with primary hash key, such as another  field date ("valid_until") or number ("hits"). In our there wasn't a need for such a one.

Creating Secondary Index

Creating index via code


AmazonDynamoDBClient ddbClient; DynamoDBMapper mapper;
ddbClient = AmazonDynamoDBConnection.getDynamoDBClient();
mapper = new DynamoDBMapper(ddbClient);
ArrayList attributeDefinitions = new ArrayList();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurDate").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("channelNo").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("eventType").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("eventStatus").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("isActive").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("isAlarm").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("serialNumber").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurTime_24").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("failureCount").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurTime").withAttributeType("S"));
/*And you need to tell which attributes should be projected to index explicitly, alternatively you can ProjectionType.ALL but you need to be aware, additional attributes will cost space and incur cost during read and write as well*/
Projection p = new Projection().withProjectionType(ProjectionType.INCLUDE).withNonKeyAttributes( "channelNo", "eventType", "eventStatus", "isActive", "isAlarm", "serialNumber", "occurTime_24", "failureCount","occurTime"); //define the hash key
ArrayList indexKeySchema = new ArrayList();
indexKeySchema.add(new KeySchemaElement().withAttributeName("occurDate").withKeyType(KeyType.HASH));
//And you can specify the ready and write capacity
CreateGlobalSecondaryIndexAction action = new CreateGlobalSecondaryIndexAction().withIndexName("occurDateIndex") .withProjection(p).withKeySchema(indexKeySchema).withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits((long) 500).withWriteCapacityUnits((long) 100));
GlobalSecondaryIndexUpdate gsiu = new GlobalSecondaryIndexUpdate().withCreate(action);
//and tell against which table the index is created.
UpdateTableRequest uReq = new UpdateTableRequest().withGlobalSecondaryIndexUpdates(gsiu).withTableName("Event") .withAttributeDefinitions(attributeDefinitions);


//all good, finally create index
UpdateTableResult updateTable = ddbClient.updateTable(uReq);
You should see new index creation started on your table, this can be viewed via your aws console. Depends on the number of records this process takes a while. if index is ready you should see the status as "Active" in the index table

Issuing query against index

Now, with our index created ("occurDateIndex") we should be able to issue query against our indexes, and see how effectively the index responds.




AmazonDynamoDBClient ddbClient;


DynamoDBMapper mapper; ddbClient = AmazonDynamoDBConnection.getDynamoDBClient(); 
mapper = new DynamoDBMapper(ddbClient); DynamoDB dynamoDB = new DynamoDB(ddbClient);
Table table = dynamoDB.getTable("Event");
Index index = table.getIndex("occurDateIdx");
ItemCollection items = null;
QuerySpec querySpec = new QuerySpec(); /*since we know against which date we are going to issue query, we specify it our has value, so that query can immediately spot the bucket where it needed concentrate its working*/ querySpec.withHashKey("occurDate", "2015-06-04").withMaxResultSize(20000).withFilterExpression("serialNumber = :v_serialNumber and channelNo = :v_channel").withValueMap(new ValueMap() .withString(":v_serialNumber", "1B0111DPAYF8TG6").withString(":v_channel", "1")); items = index.query(querySpec); PageIterable pages = items.pages(); List list = new ArrayList<>(); items.forEach(t-> list.add(t.getJSONPretty("eventStatus")));
with million records in my database, our average responses range in 3 to 3.5 seconds with indexes where it was nearly about 25 seconds against the table scans. This is a dramatic gain in terms of performance. And we don't have to worry about the queries performance as the size of database grows sing our typically queries are just narrowed to a single bucket, so performance is likely to remain 3-4 seconds range.

NB: All time measurements include with delay over the wire ( network latency ) where still keeping in with a aws node close to our location. The main motive to show timing(average) is just to demonstrate improved performance, but not as any bench marks.