Friday, March 25, 2016

Leveraging power of asynchronous in Java 8


Introduction



Emergent of multi-core hardware processor provides us the opportunity execute asynchronous and parallel tasks in modern machines. Java8 designers has put considerable effort to design the language in addition to Functional style programming, so the developers can write some powerful asynchronous tasks. Java7 was initially to respond with fork/join frame work so that you can slice the tasks with 'Future' with help of thread executor pools. But it was still felt not enough to leverage multi-core architectures for increasing complex problems. Java8 has introduced CompletableFuture<T> for a solution. Together with functional style programming offered by java these constructs provides real power to the language. Not using it would be a sin!

Lets checkout,
public CompletableFuture assembleGeneralTires() {
		CompletableFuture tireFuture = new CompletableFuture<>();
		new Thread(() -> {
			Tire tire = getTire("general");
			tireFuture.complete(tire);
		}).start();
		return tireFuture;
}

//Access value as:

CompletableFuture assembleGeneralTires = assembler.assembleGeneralTires();
Tire tire = assembleGeneralTires.get();

Seems everything would be fine, until something trips up, error. so, what to do if an error comes. We have ''completeExceptionally(Exception e)" for that,

public CompletableFuture assembleGeneralTiresCautiously() {
		CompletableFuture tireFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				Tire tire = getTire("general");
				tireFuture.complete(tire);
			}
			catch (Exception e) {
				tireFuture.completeExceptionally(e);
			}
		}).start();
		return tireFuture;
}

Can we improve bit,
Yes. Java8 'CompletableFuture' has some factory methods, we don't always need to manually wire the thread as we did before.

public CompletableFuture assembleGeneralTiresFactory() {
		return CompletableFuture.supplyAsync(()-> getTire("general"));
	}

supplyAsync() accepts a supplier as argument. if you would like to know what is supplier, it is a functional gets you <T> .

Leverage non-blocking async operations

Lets do a lousy sequential operation.

public int[] getAverageVehicleSpeeds(){
		Vehicle carLamborghini = new Vehicle("lamborghini", "car", 485);
		Vehicle truck = new Vehicle("volvo", "truck", 190);
		Vehicle van = new Vehicle("toyoto", "van", 255);
		Vehicle bike = new Vehicle("harley", "bike", 360);		
		
		List vehicles = Arrays.asList(carLamborghini,truck,van,bike);
		int[] speeds = vehicles.stream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray();
		
		return speeds;
}

public class Vehicle {
....
......

public int getAverageSpeed(){
		Util.delay(); //a delay is simulated
		return new Random(50).nextInt(topSpeed);
	}
}

As this operation is inherently sequential, this can perform terribly sluggish, if we think terms of improving, first things comes to mind is using parallel streams.

....
List vehicles = Arrays.asList(carLamborghini,truck,van,bike);
int[] speeds = vehicles.parallelStream().map(t->t.getAverageSpeed()).mapToInt(t->t.intValue()).toArray();
....

This may perform better, but we are not sure how this would scale, saying when we add many more vehicles in the future. Let try with our 'completeable future' way too

List> speedsFuture = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed()))
				.collect(Collectors.toList());

//This gives me list of completableFuture, that means each of this operation on the vehicles start asynchronously

//now lets take there values, to do that
int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray(); 


This will scale bit better(trust this, people have done the numbers against parallel stream, 'completable future' does better when things crowd up), but to get the real advantage we have another weapon, executors. CompletableFuture.supplyAsync(...supplier.., ,*executor ) accepts executors as another parameter. We should be able get fine tuned executors for our machine based on the number of core processors in our machine

For optimal cpu utilization, using following formula (refer Java concurrency in practice) we can count number of threads
Nthreads = NCPU * UCPU * (1 + W/C)
For 4 core cpu (NCPU), 100% (UCPC)  and (wait time )/(compute time ) ration => 50. We can have 200 threads in the pool for optimal use of the system resources


Executor executor = Executors.newFixedThreadPool(Math.min(vehicles.size(), 200), new ThreadFactory() {
	        @Override
	        public Thread newThread(Runnable r) {
	            Thread t = new Thread(r);
	            t.setDaemon(true);
	            return t;
	        }
});
//we set 200 as the limit so application inadvertently doesn't crash by overrunning number of threads

//then just simply pass the executor to supplyAsync

List> speedsFuture = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed(),executor))
				.collect(Collectors.toList());
		
int[] speeds = speedsFuture.stream().map(t->t.join()).mapToInt(t->t.intValue()).toArray();

//advantage here is we can fine tune with executor which actually more control

What if we want to pipeline asynchronous  tasks, suppose lets say we want find the engine type of vehicles based on there average speed. we have to use 'thenCompose'

List> enginesFutures = vehicles.stream()
				.map(t -> CompletableFuture
						.supplyAsync(() -> t.getAverageSpeed(),executor))
				.map(f->f.thenCompose(p-> CompletableFuture.supplyAsync(()-> Vehicle.getEngine(p),executor)))
				.collect(Collectors.toList());
		
List engines = enginesFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());

Here, even though it may look convoluted piece of code, it is a simple concept. To combine one input to a asynchronous functionality that is dependent on another asynchronous functionality we simply 'thenCompose'

What if we want execute asynchronous  and synchrous, we have to use 'thenApply'. Assume our 'getEngine' is not long running or I/O operation and it synchronous


List> collect = vehicles.stream()
		.map(t -> CompletableFuture
				.supplyAsync(() -> t.getAverageSpeed(),executor))
		.map(f->f.thenApply(Vehicle::getEngine)).collect(Collectors.toList());
List engines = enginesFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());


That is a little peek of what asynchronous Java8 has to offer. Explore API at your own will, and sure that will be a exiting ride. 

Wednesday, November 11, 2015

Increasing query performance with Secondary Indexes in AWS Dynamodb

Introduction

AWS Dynamodb is NoSql database, it can scale up to the requirement by providing high availability and durability. A good introduction can be found here

Problem statement

We have hit a stumbling block where one of our tables contains near a million of data and that is expected to out grow multiples of current size with the time, we needed query that returns fast response out of this millions of data set. Currently we have used scan, which performs a full table scan in order to return the results regardless of number of records in the database. Problem with this approach is that response is very slow and it guaranteed get even slow when records hit huge sizes.

Event table is typically a table that collects event data over a period, it is legacy table so that we can not modify or introduce any hash keys.

Attempts to improve the performance


This is how our scan looked liked initially, against table (Event)


   Map<String, Condition> scanFilter = new HashMap<String, Condition>(); 
Condition condition = new Condition().withComparisonOperator(ComparisonOperator.EQ.toString()) .withAttributeValueList(new AttributeValue().withS(attributeValue)); 
scanFilter.put(attributeType, condition);
scanExpression.setScanFilter(scanFilter); 
PaginatedScanList<T> scan = mapper.scan(Event.class, scanExpression); 
//Event class the model class against table Event in dynamodb





Results with scan

On average it ranged somewhere in 25-30 seconds, which is very sluggish.

Parallel Scan

We re-wrote some of the queries using parallel scan that, considerably improved the performance, but the application still looked bit of slump compared to its responsiveness. Writing parallel scans can be found here.

DynamoDBScanExpression scanExpression = new DynamoDBScanExpression();
Map scanFilter = new HashMap();
Condition condition = new Condition().withComparisonOperator(ComparisonOperator.EQ.toString()) .withAttributeValueList(new AttributeValue().withS(attributeValue)); scanFilter.put(attributeType, condition);
scanExpression.setScanFilter(scanFilter);
PaginatedParallelScanList scan = mapper.parallelScan(classType, scanExpression,totalSegments)


As you can see that, we have used parallel scans, this issues separates jobs over the large quantity of data divided by number of segments. When issuing a parallel scan you need to specify number of segments for which the table to be scanned to.
Another caveat in this approach is, we have to constantly fine tune the number of segments as our db records size grows, this would be become bit of a pain in terms maintenance of application.

Results with parallel scan


On average it ranged somewhere in 10-12 seconds with fine tuned number of segments (20-25).

A Solution

So the typical solution should be looked else where, and there was Secondary Indexes. How Secondary Indexes works exactly can be found in above link. You can create index with the subset of table (Event) fields and issue query or scan against this index. Since most of our queries are related to finding activity information on current day or yesterday. we created the Global Secondary Index using all necessary fields need to be in the result of query and making "occurDate" is the hash key. 

Eg. occurDate = "2015-06-04" 

When you create a index with a hash key, all records against that hash key will be stored under separate bucket for that hash key. For example, all activity records for 2015-06-04 will be under hash_key for "2015-06-04" and all activity records for 2015-06-05 will be under hash_key for "2015-06-05" so on. You can define composite hash key for an index as well, using a range key attribute along with primary hash key, such as another  field date ("valid_until") or number ("hits"). In our there wasn't a need for such a one.

Creating Secondary Index

Creating index via code


AmazonDynamoDBClient ddbClient; DynamoDBMapper mapper;
ddbClient = AmazonDynamoDBConnection.getDynamoDBClient();
mapper = new DynamoDBMapper(ddbClient);
ArrayList attributeDefinitions = new ArrayList();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurDate").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("channelNo").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("eventType").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("eventStatus").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("isActive").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("isAlarm").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("serialNumber").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurTime_24").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("failureCount").withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName("occurTime").withAttributeType("S"));
/*And you need to tell which attributes should be projected to index explicitly, alternatively you can ProjectionType.ALL but you need to be aware, additional attributes will cost space and incur cost during read and write as well*/
Projection p = new Projection().withProjectionType(ProjectionType.INCLUDE).withNonKeyAttributes( "channelNo", "eventType", "eventStatus", "isActive", "isAlarm", "serialNumber", "occurTime_24", "failureCount","occurTime"); //define the hash key
ArrayList indexKeySchema = new ArrayList();
indexKeySchema.add(new KeySchemaElement().withAttributeName("occurDate").withKeyType(KeyType.HASH));
//And you can specify the ready and write capacity
CreateGlobalSecondaryIndexAction action = new CreateGlobalSecondaryIndexAction().withIndexName("occurDateIndex") .withProjection(p).withKeySchema(indexKeySchema).withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits((long) 500).withWriteCapacityUnits((long) 100));
GlobalSecondaryIndexUpdate gsiu = new GlobalSecondaryIndexUpdate().withCreate(action);
//and tell against which table the index is created.
UpdateTableRequest uReq = new UpdateTableRequest().withGlobalSecondaryIndexUpdates(gsiu).withTableName("Event") .withAttributeDefinitions(attributeDefinitions);


//all good, finally create index
UpdateTableResult updateTable = ddbClient.updateTable(uReq);
You should see new index creation started on your table, this can be viewed via your aws console. Depends on the number of records this process takes a while. if index is ready you should see the status as "Active" in the index table

Issuing query against index

Now, with our index created ("occurDateIndex") we should be able to issue query against our indexes, and see how effectively the index responds.




AmazonDynamoDBClient ddbClient;


DynamoDBMapper mapper; ddbClient = AmazonDynamoDBConnection.getDynamoDBClient(); 
mapper = new DynamoDBMapper(ddbClient); DynamoDB dynamoDB = new DynamoDB(ddbClient);
Table table = dynamoDB.getTable("Event");
Index index = table.getIndex("occurDateIdx");
ItemCollection items = null;
QuerySpec querySpec = new QuerySpec(); /*since we know against which date we are going to issue query, we specify it our has value, so that query can immediately spot the bucket where it needed concentrate its working*/ querySpec.withHashKey("occurDate", "2015-06-04").withMaxResultSize(20000).withFilterExpression("serialNumber = :v_serialNumber and channelNo = :v_channel").withValueMap(new ValueMap() .withString(":v_serialNumber", "1B0111DPAYF8TG6").withString(":v_channel", "1")); items = index.query(querySpec); PageIterable pages = items.pages(); List list = new ArrayList<>(); items.forEach(t-> list.add(t.getJSONPretty("eventStatus")));
with million records in my database, our average responses range in 3 to 3.5 seconds with indexes where it was nearly about 25 seconds against the table scans. This is a dramatic gain in terms of performance. And we don't have to worry about the queries performance as the size of database grows sing our typically queries are just narrowed to a single bucket, so performance is likely to remain 3-4 seconds range.

NB: All time measurements include with delay over the wire ( network latency ) where still keeping in with a aws node close to our location. The main motive to show timing(average) is just to demonstrate improved performance, but not as any bench marks.

Friday, September 18, 2015

Resolving java.lang.UnsatisfiedLinkError: Native Library already loaded ...

 java.lang.UnsatisfiedLinkError

It was bit of relief when this issue was resolved, but it took a tormented a while before figuring how to resolve this issue. Solution was not much any big science, in fact very simple and outright.

What exactly is this about?

When you want to run 2 separate application in your tomcat container, and suppose both of them had to load a shared native wrapper, you are most likely to encounter this issue. It comes to single line that you would have anywhere in a place the application starts. If web application, a simple context listener class.

System.loadLibrary("excalibur"); 

Here by, you tell the listener to load the native library on application startup, in Windows system this this will load excalibur.dll from path this in "java.library.path", in Linux system this would be file named libexcalibur.so in "java.library.path". In Linux you can configure this path as follows 

export LD_LIBRARY_PATH=/path/to/whereyourlinklibrary

in turn, this path marked and added existing java.library.path.

Behind the scenes you Tomcat class loaders works bring this library to JVM. So, if you the same approach to load the same library for in another web application running in the container, you are in for surprise. Why, Tomcat, uses separate class loaders for each of the web application, and it wont allow you load a same native library more than once to JVM via another class loader. Hence you will get some decent message like this, 

Resolving java.lang.UnsatisfiedLinkError: Native Library already loaded
How exactly Tomcat class loaders works behind the scenes is explained here. So, how can you work this out

Use Tomcat shared class loader

Tomcat shared class loader make this possible, it loads library only once in container and allows that to be shared across all the web application in the container. First, you need to completely remove your wrapper jar, which you have built to invoke methods from libexcalibur.so. Say this wrapper library called knightofjustice.jar. In your maven, if you have marked this as a dependency, remember to mark this scope as provided. so it wont be packaged into your lib folder of your web application. 


<dependency>
 <artifactId>knightofjustice</artifactId>
 <groupId>org.rome.empire</groupId>
 <version>1.0.0</version>
 <scope>provided</scope>
</dependency> 

Then move your precious knightofjustice.jar inside the territory of (folder) Tomcat\lib. This makes tomcat to load the knightofjustice whenever Tomcat container starts.

ok, Now how do our applications get to know if libexcalibur.so link library available in JVM and start using it.

One guy(probably a descendant of King Arthur), has written utility library here, you can download it and build the source code, you get a jar dll-bootstrapper, move this library under same Tomcat\lib along with a property file called dll-bootstrapper.properties, where you are going to tell the tomcat which link libraries to be loaded, like our  libexcalibur.so.

Just add a line in dll-bootstrapper.properties

dll.0=excalibur

Remember, excalibur NOT libexcalibur.so, in Linux based system this will be interpreted correctly as libexcalibur.so

OR,

you can simply add a class with static initialization like this, (with the hard coded library name)

public class DLLBootstrapper {

 static {
  System.loadLibrary("excalibur ");
              }

public static void main(String args[]) {
  System.out.println("Loaded");
 }

}

and then compile the class and simply drop inside the Tomcat\lib folder, all good to go!

Ok, good. Now additionally you also need to reference the Loader class that has loaded your libexcalibur.so.

So, go back to your web application and add this line in a context listener, probably in contextInitialized method


Class.forName("msm.DLLBootstrapper");

//Now, you can call your wrapper library classes to invoke methods from it such as

HolyGrail grail = new HolyGrail();
grail.serve();
Some important things to remember
Never keep your wrapper inside your web application, if so Tomcat may try to load from using application class loader. And you will still get  Native Library already loaded ... message

if you still get java.lang.UnsatisfiedLinkError, check by printing java.library.path and see if you libraryxxx.so resides in any of the path printed. if so adjust it accordingly. or you can explicitly set the System.setProperty("java.library.path","/path/to/foldercontainingthelibrary")

String property = System.getProperty("java.library.path");

StringTokenizer parser = new StringTokenizer(property, ";");

while (parser.hasMoreTokens()) {

System.err.println(parser.nextToken());

}

Monday, August 31, 2015

How Swagger helped me with AWS API Gateway


Motive and background

Recently, I had to stumble onto a problem where I had to apply some authentication on top of considerable large legacy code. Time was pretty much limited, I had look for ways of quick solution. Then, I was introduced to API gateway from Amazon, from the outset which looked a perfect fit for my problem, why? firstly my API was running on EC2 and secondly, I did not want a convoluted authentication mechanism, so that I can set to each of my API end point to demand an access token, before it is invoked. But again, API interface was quite a large one, I simply had write an invocation points for each method. And then the Swagger came along.
What is AWS API Gateway, it is a managed service platform, that enables you to write, publish your new APIs or integrate with other third party API and still scale it; not least, some monitoring services such logs and etc. It can act as a façade face of your existing API and add some additional functionality with few Lambda Functions. The getting started guide helped me to good extent.
How swagger came into picture?,  you have a system and there are some REST APIs, certainly you may not have the control of which language they were written; and at some point, you want to talk with them, and if your system can talk to any of these API in a language agnostic way and still if you can also understand what is going on, wouldn’t it be great? that’s what exactly the swagger does. Swagger works on swagger definition, using a swagger you can write an implementation (using swagger code generation tool), or generate definition file out of existing API that in turn will be used by client. So using the swagger, I generated the definition then used that definition file to import the API resource points to AWS API gateway( this simply the methods of my existing API), don’t get confused over “resource”, it is just AWS way marking the methods of API.
setup

Overview of Implementation

Essentially my steps are based on this well written guide, typically this is 3 step process
  1. Adding Swagger's dependencies to your project.
  2. Hook Swagger into your JAX-RS application configuration.
  3. Configure and Initialize Swagger.
My API was written using REST easy, this how started add maven dependency
<dependency>
  <groupId>io.swagger</groupId>
  <artifactId>swagger-jaxrs</artifactId>
  <version>1.5.0</version>
</dependency>

then you can let the swagger to scan your root context, or add some provider class to your rest easy providers, I chose the add some providers
<servlet>
<servlet-name>Jersey2Config</servlet-name>
<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
<init-param>
<param-name>api.version</param-name>
<param-value>1.0.0</param-value>
</init-param>
<init-param>
<param-name>swagger.api.basepath</param-name>
<param-value>http://localhost:8080/api</param-value>
</init-param>
<load-on-startup>2</load-on-startup>
</servlet>
Just a servlet with no path mapping, which will be started during the application startup.

Are done, yes but only with the setup, still you need to decorate your API with annotations. So that swagger can catch them, lets do it!, it would be likely done as follows,

@Path("/ping")
@Api(value = "/ping", description = "All is well")
public class PingService {

private static final Logger LOGGER = LoggerFactory
.getLogger(PingService.class);

@GET
@ApiOperation(value = "Just ping", notes = "just ping.", position = 1)
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "system down"),
@ApiResponse(code = 404, message = "No available") })
public Response ping() {
LOGGER.info("Ping service active");
return new Response(ResponseStatus.SUCCESS,
"Welcome to TeletecAO2RWService");
}

}

And you have to annotate all the model objects as well,

So what do we have here now, we get the swagger definition file from context root of your API, such as above http://localhost:8080/api/swagger.json, which will look something like this very basic


{
"apiVersion": "",
"apis": [
],
"basePath": "http://192.168.1.1:8000",
"models": {
},
"resourcePath": "/api",
"swaggerVersion": "1.2"
}




while a full fledged definition may be like this

well swagger does the work for you, it crawls from your root context and find all API paths from there. At this point you don’t have to worry about any daunting size of definition file it generates.

So how to get this imported into the our API gateway, we have swagger importer, all you do is get the aws-apigateway-swagger-importer-{version}-dependencies.jar to your local folder and run the following,

./aws-api-import.sh --create path/to/swagger.json --profile optimusprime

Remember, the "optimusprime" is the profile that contains the valid access key and secret keys for you aws api gateway. but one thing to note that, you may to run it on your Linux machine with AWS CLI installed. And if you are to build this from maven scratch, build it from Linux machine itself. I had waste an hour debugging my build and moving the my EC2 RHEL instance. Script aws-api-import.sh will struggle to execute in your Linux environment if you do so.

Having everything worked fine, importer ran properly you now will see the resource points listed in your AWS. If you feeling relaxed at this point, all you need to do is just finish off with your rest of AWS API work.

Click on these method, and map it to your API, in my case it was on EC2, so when selecting the integration type, due to the fact my API was on EC2, I chose AWS service proxy. Before this, I created ARN for my existing service using a role. How exactly do that? you can find here.



Add authentication


To to your resource, and select method GET/POST or what ever, and select “Method Request”

API-Get

API-Get_security

You can select AWS-IAM and select the security token, at least this is all I had to, in your case if you insist the client to pass some header values you can add and then wire some lambda function to validate them (lambda function are interesting too, I suggest you to take a look at them). And then just navigate through rest of the flow to do the mapping etc. Once all done just deploy your API via deploy tool.

So what is the url, that you will be invoking the ARN which you can find the the “Method Request”, it will be such as this

arn:aws:execute-api:us-west-2:900272013338:02mubm2sui/*/GET/department

There is a pretty good written documentation from AWS for this, (if I can understand, your grandma will definitely understand), try this.

So that’s how the Rover landed in Mars, kinda cool IMHO, AWS api gateway seems to amazing feature rich tool, for a while I was wondering if this is an orchestration tool, such as Mule, but I would hardly think if that was the intention of API Gateway, probably not.

Getting the the definition file for other language written such Jersey or etc should be straight forward.

 Common Errors


1. ERROR - Could not load AWS configuration. Please run 'aws configure' - This is due to that importer could not find config file. which should be located at {user.home}/.aws/config. So you may have to create a one. To do this you need to install AWS CLI if you have not already done, instruction provided here. Once done you need to do configure a profile via
aws configure --profile optimusprime

you will be prompted for you access and secret keys.

2. ERROR -  Cross-account pass role is not allowed. (Service: null; Status Code: 403; Error Code: null; Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx)

This is because you have forgot to put you account id, "x-amazon-apigateway-integration" section of swagger definition file

so once you have done it should be something like as follows, ( just replace ACCOUNT_ID with your proper account id

.....
"x-amazon-apigateway-integration" : {
                    "type" : "aws",
                    "uri" : "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:865555555:function:myFunction/invocations",
                    "httpMethod" : "POST",
                    "credentials" : "arn:aws:iam::865555555:role/lambda_exec_role",
                    "requestTemplates" : {
                        "application/json" : "json request template 2",
                        "application/xml" : "xml request template 2"
                    }
.....

3. ERROR-  Invalid ARN specified in the request
This is again usually arises from wrong account id of format, remember your credential should be some thing like  "arn:aws:iam::865555555:role/lambda_exec_role", with exact number of colons.

4. Everything gone fine, but still i can't see the API listed in my console.
Nothing to worry, he might have been listed under another AWS region. Check the uri, if your under us-west-2, above uri should be

.....
"uri" : "arn:aws:apigateway:us-west-2:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:865555555:function:myFunction/invocations"
.....

 

 

Sunday, August 23, 2015

Writing Authority Connector for Apache ManifoldCF

Writing Authority Connector for Apache ManifoldCF


My interest in Apache ManifoldCF has been growing, this time I decided to spend some time on writing about the an Authority connector for ManifoldCF,  writing an authority connector is pretty much the same as repository connector, but it aim of a such connector is to retrieve some token values for the user against the repository.  One thing to keep remember about some default assumptions made by the ManifoldCF framework, that is, if you don’t specify any authority connector for your repository connector Manifold assumes by default it is Active Directory in charge, hence for these cases access token Active Directory SID. A more complete description is available here.

Security Model





How does ManifoldCF uses the authority connector? Framework will invoke all the authority connectors that are configured in ManifoldCF, and retrieve the tokens against each of those repositories. When you invoke the authority service which is available at http://<host>:8345/mcf-authority-service/UserACLs location it will scrap all the tokens against these repos.  Let say you have following authority connectors configured,  JIRAAuthorityConnector, ActiveDirectoryAuthorityConnector, LDAPAuthorityConnector and if you pass a username to retrieve the relevant tokens, authority connectors which understand this username will return the access tokens for that username. Finally all these tokens amalgamated and return as json. More over there is something called authority groups, when you create an authority connector you have to create it under a authority group, and a authority connector will belong to only one authority group. This allows some separation, means that token are valid only within the group.  For the complete understanding of how the ManifoldCF works, it is described in following location which is pretty good explanatory.



Ok, how can you get the access tokens for the user/username. it can be invoked from following http://localhost:8345/mcf-authority-service/UserACLs?username= leagueofshadows, and it will return tokens either in form of access tokens or deny tokens, if both present deny token will win over any access tokens
Sample,

AUTHORIZED:amazons3
TOKEN:myauthoritygroup:kuhajeyan

Overview of writing a authority connector.

So typically you would start extending from base connector org.apache.manifoldcf.authorities.authorities.BaseAuthorityConnector, so as it was about implementing/overriding some methods with repository connector there are few methods which you may have implement, flowingly

Method
What it should do
getAuthorizationResponse()
Obtain the authorization response, given a user name
outputConfigurationHeader()
Output the head-section part of an authority connection ConfigParams editing page
outputConfigurationBody()
Output the body-section part of an authority connection ConfigParams editing page
processConfigurationPost()
Receive and process form data from an authority connection ConfigParams editing page
viewConfiguration()
Output the viewing HTML for an authority connection ConfigParams object

And,
Connect – Some connection key values are initialized here.
Check – This will periodically check the, connection status. Some meaningful readable string is returned to inform the user/admin about the connection status at an instance.
isConnected – will return a boolean telling if the connection is alive or otherwise.
viewConfiguration – will be called when body of configuration page is displayed
outputConfigurationHeader – will be called in the header section of configuration page
outputConfigurationBody – will be called in body section of configuration page, but difference is that, this will be called when configuration is saved and posted
processConfigurationPost – will be called to process when configuration is posted
getAuthorizationResponse – Get the access token for a username against the repository
getDefaultAuthorizationResponse – Gets the default access token for the repository

Mainly we need to look into the implementation of getAuthorizationResponse here, returning the access token and how you want the tokens to be formatted (but it should be finally a string array) is solely dependent on your preference. A typical very simplem implementation would look like this
@Override
                public AuthorizationResponse getAuthorizationResponse(String userName)
                                                throws ManifoldCFException {
                                if (checkUserExists(userName))
                                                return new AuthorizationResponse(new String[] { userName },
                                                                                AuthorizationResponse.RESPONSE_OK);
                                return RESPONSE_USERNOTFOUND;
                }

A fully implemented version of code is available at this location

Monday, August 3, 2015

Repository Connector - Apache ManifoldCF

Writing Repository connector for Apache ManifoldCF

Apache Manifoldcf is framework that lets you connects some source repositories and index the documents, it has an in built security model that allows your target repositories to represent source security model. Target repository is, where you will have the indexes to reside. More on information about the technical structure about ManifoldCF can be found here. My aim would be walking through writing a repository connector, and I have chosen Atlassian confluence repository for the example, and we will be using confluence REST API to retrieve the confluence contents. 

ManifoldCF provides you a framework, that allows you to write repository connector, which is class that will be invoked by the jobs that will run on schedule. By writing this class, framework allows you to wire the UI elements such as form and etc. For example, if you want to write repository connector for confluence, you need some way of telling the ManifoldCF, how to get the confluence API url of the server and credentials that you will need to connect, those are the values coming from relevant UI forms. If you want to write a repository connector, you should start writing a one from inheriting base connector class BaseRepositoryConnector provided by ManifoldCF itself. There few methods that you need to provide implementation. 

You can get the source code that is built against ManifoldCF 1.8 here.

Methods to be overridden and implemented

connect() - public void connect(ConfigParams configParams) , this method lets you to make the connection to the source repository the configParams is sent from UI form of the repository connector. You can use these values to make a connection


check() -  public String check() throws ManifoldCFException,  this method allows you to check if the connection is valid with respect to the values that you have collected via connect method. it returns the string that gives you some description about the validity of current connection. For example if you cannot make the connection, you can simply let it return a string “Connection Failed”


isConnected - public boolean isConnected() returns a Boolean true, if the current connection status is successful, will be utilized by the framework when running the job.

addSeedDocuments - public void addSeedDocuments(ISeedingActivity activities,
                                                DocumentSpecification spec, long startTime, long endTime,
                                                int jobMode) throws ManifoldCFException, ServiceInterruption

This does the actual job of retrieving the contents from the source repository, retrieved contents will be inform of something called seeds, then process documents use this seeds to extract meta-data and indexes the document

getDocumentVersions - public String[] getDocumentVersions(String[] documentIdentifiers,
                                                DocumentSpecification spec) throws ManifoldCFException,
                                                ServiceInterruption

Framework, will use this version numbers to check if a content needs to be re-crawled or not, usually this version number is last modified date of the document


processDocuments - public void processDocuments(String[] documentIdentifiers,
                                                String[] versions, IProcessActivity activities,
                                                DocumentSpecification spec, boolean[] scanOnly)
                                                throws ManifoldCFException, ServiceInterruption

this method will use the seeds, and extract the meta-data and indexes each content, these will be typically transferred to your target repository such as Solr

viewConfiguration - public void viewConfiguration(IThreadContext threadContext,
                                                IHTTPOutput out, Locale locale, ConfigParams parameters)
                                                throws ManifoldCFException, IOException

UI utility method, typically you will fill the parameters with the values that were saved on earlier occasion. Such as, url, API credentials that were persisted in context (usually you would have retrieved those values initially when you tried to connect, using processConfiguration method). Method will be called when UI displays values in “view” mode.

outputConfigurationHeader - public void outputConfigurationHeader(IThreadContext threadContext,
                                                IHTTPOutput out, Locale locale, ConfigParams parameters,
                                                List<String> tabsArray) throws ManifoldCFException, IOException

UI method, which will be invoked by framework to populate the header details in UI. Implementation typically include tab information along with any defaults ones.

processConfigurationPost - public String processConfigurationPost(IThreadContext threadContext,
                                                IPostParameters variableContext, ConfigParams parameters)
                                                throws ManifoldCFException

You will save and posted values from UI, such as API url , API credentials etc.  You will retrieve values from variableContext and save them back to parameters

viewSpecification - public void viewSpecification(IHTTPOutput out, Locale locale,
                                                DocumentSpecification ds) throws ManifoldCFException, IOException


When you want to view the Job specification details of the repository connector, this method will be invoked.

processSpecificationPost - public String processSpecificationPost(IPostParameters variableContext,
                                                DocumentSpecification ds) throws ManifoldCFException

Identical to processConfigurationPost but values posted are relevant Job than repository.  You may process values such as any custom parameters to your API queries.  

outputSpecificationBody - public void outputSpecificationBody(IHTTPOutput out, Locale locale,
                                                DocumentSpecification ds, String tabName)
                                                throws ManifoldCFException, IOException

This method is invoked, when you view the specification details of the job.

outputSpecificationHeader - public void outputSpecificationHeader(IHTTPOutput out, Locale locale,
                                                DocumentSpecification ds, List<String> tabsArray)
                                                throws ManifoldCFException, IOException


Identical to outputConfigurationHeader, but this is for Job.

Structure of a Repository connector.

How does manifold recognize a new connector?, it all works on OSGI, you need create a jar file containing your repository connector and a security connector ( will be looked at later) and drop into connector libraries folder. Once it Manifold starts it will automatically pick your new connector and definitely you need to watch out for the log file manifoldcf.log that can be found in logs folder

1.       Create project, just extent a POM version from the parent Manifold you will have your most of the necessary dependencies imported. Sample pom file may look like this

2.       Resource files, this will contain typical html , javascript files that you need make them available on classpath to be picked by framework, such as file editConfiguration_conf_server.html will be loaded to contain your repository connector details. And you will explicitly locate these files in relevant UI methods described above.

connect - method
super.connect(configParams);

                                confprotocol = params
                                                                .getParameter(ConfluenceConfig.CONF_PROTOCOL_PARAM);
                                confhost = params.getParameter(ConfluenceConfig.CONF_HOST_PARAM);
                                confport = params.getParameter(ConfluenceConfig.CONF_PORT_PARAM);
                                confpath = params.getParameter(ConfluenceConfig.CONF_PATH_PARAM);
                                confsoapapipath = params
                                                                .getParameter(ConfluenceConfig.CONF_SOAP_API_PARAM);
                                clientid = params.getParameter(ConfluenceConfig.CLIENT_ID_PARAM);
                                clientsecret = params
                                                                .getObfuscatedParameter(ConfluenceConfig.CLIENT_SECRET_PARAM);

                                confproxyhost = params
                                                                .getParameter(ConfluenceConfig.CONF_PROXYHOST_PARAM);
                                confproxyport = params
                                                                .getParameter(ConfluenceConfig.CONF_PROXYPORT_PARAM);
                                confproxydomain = params
                                                                .getParameter(ConfluenceConfig.CONF_PROXYDOMAIN_PARAM);
                                confproxyusername = params
                                                                .getParameter(ConfluenceConfig.CONF_PROXYUSERNAME_PARAM);
                                confproxypassword = params
                                                                .getObfuscatedParameter(ConfluenceConfig.CONF_PROXYPASSWORD_PARAM);

                                try {
                                                getConfluenceService();
                                } catch (ManifoldCFException e) {
                                                Logging.connectors.error(e);
                                }


Take values available in configParam and use the to connect to confluence server

check –
try {
                                                return checkConnection();
                                } catch (ServiceInterruption e) {
                                                Logging.connectors.error("Error ", e);
                                                return "Connection temporarily failed: ";

                                } catch (ManifoldCFException e) {
                                                Logging.connectors.error("Error ", e);
                                                return "Connection failed: ";
                                }

Instantiating a separate thread the will check if the connection is valid, but it is not necessary that you need to do this via a thread.

protected String checkConnection() throws ManifoldCFException,
                                                ServiceInterruption {
                                String result = "Unknown";
                                getConfluenceService();
                                CheckConnectionThread t = new CheckConnectionThread(getSession(),
                                                                service);
                                try {
                                                t.start();
                                                t.finishUp();
                                                result = t.result;
                                } catch (InterruptedException e) {
                                                t.interrupt();
                                                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
                                                                                ManifoldCFException.INTERRUPTED);
                                } catch (java.net.SocketTimeoutException e) {
                                                handleIOException(e);
                                } catch (InterruptedIOException e) {
                                                t.interrupt();
                                                handleIOException(e);
                                } catch (IOException e) {
                                                handleIOException(e);
                                } catch (ResponseException e) {
                                                handleResponseException(e);
                                }

                                return result;
                }

addSeedDocuments –
GetSeedsThread t = new GetSeedsThread(getSession(), confDriveQuery);
                                try {
                                                t.start();

                                                boolean wasInterrupted = false;
                                                try {
                                                                XThreadStringBuffer seedBuffer = t.getBuffer();

                                                                while (true) {
                                                                                String contentKey = seedBuffer.fetch();
                                                                                if (contentKey == null)
                                                                                                break;
                                                                                // Add the pageID to the queue
                                                                                activities.addSeedDocument(contentKey);
                                                                }
                                                } catch (InterruptedException e) {
                                                                wasInterrupted = true;
                                                                throw e;
                                                } catch (ManifoldCFException e) {
                                                                if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
                                                                                wasInterrupted = true;
                                                                throw e;
                                                } finally {
                                                                if (!wasInterrupted)
                                                                                t.finishUp();
                                                }
                                } catch (InterruptedException e) {
                                                t.interrupt();
                                                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
                                                                                ManifoldCFException.INTERRUPTED);
                                } catch (java.net.SocketTimeoutException e) {
                                                handleIOException(e);
                                } catch (InterruptedIOException e) {
                                                t.interrupt();
                                                handleIOException(e);
                                } catch (IOException e) {
                                                handleIOException(e);
                                } catch (ResponseException e) {
                                                handleResponseException(e);
                                }

Here again a new thread is created to add the seeds, but framework does not necessarily required you to do so.

processDocuments –
for (int i = 0; i < documentIdentifiers.length; i++) {
                                                String nodeId = documentIdentifiers[i];
                                                String version = versions[i];

                                                long startTime = System.currentTimeMillis();
                                                String errorCode = "FAILED";
                                                String errorDesc = StringUtils.EMPTY;
                                                Long fileSize = null;
                                                boolean doLog = false;

                                                try {
                                                                if (Logging.connectors != null) {
                                                                                Logging.connectors.debug("Confluence "
                                                                                                                + ": Processing document identifier '" + nodeId
                                                                                                                + "'");
                                                                }

                                                                if (!scanOnly[i]) {
                                                                                if (version != null) {
                                                                                                doLog = true;

                                                                                                try {
                                                                                                                errorCode = processConfluenceDocuments(nodeId,
                                                                                                                                                activities, version, fileSize);
                                                                                                } catch (Exception e) {
                                                                                                                if (Logging.connectors != null) {
                                                                                                                                Logging.connectors.error(e);
                                                                                                                }
                                                                                                }

                                                                                } else {
                                                                                                activities.deleteDocument(nodeId);
                                                                                }

                                                                                // //
                                                                }
                                                } finally {
                                                                if (doLog)
                                                                                activities.recordActivity(new Long(startTime),
                                                                                                                ACTIVITY_READ, fileSize, nodeId, errorCode,
                                                                                                                errorDesc, null);
                                                }
                                }

You can simply loop through the available seeds and do anything relevant, such as extracting the meta-data or etc.
Due to keep this very brevity, I have omitted other methods, but you can have look on the source code to follow the rest