Batch Processing

Reverse Engineering Data-flow in a Data Platform with Thousands of tables?

Ever been tasked to inherit, or migrate an existing (legacy) Data Platform? There are numerous Open Source (Hadoop/Sqoop, Schedulix …) and Commercial tools (BMC Control-M, Appliedalgo.com, stonebranch …etc) which can help you operate the Data Platform – typically gives you multitude of platform services:
• Job Scheduling
• ETL
• Load Balancing & Grid Computing
• Data Dictionary / Catalogue
• Execution tracking (track/persist job parameters & output)

Typical large scale application has hundreds to thousands of input data files, queries and intermediate/output data tables.
DataPlatform_DataflowMapping

Mentioned Open Source and Commercial packages facilitates operation of Data Platform. Tools which helps generates ERD diagrams typically relies on PK-FK relationships being defined – but of course more often than not this is not the case. Example? Here’s how you can Drag-drop tables in a Microsoft SQL Server onto a Canvas to create ERD – https://www.youtube.com/watch?v=BNx1TYItQn4
DataPlatform_DataflowMapping_Who

If you’re tasked to inherit or migrate such Data Platform, first order of business is to manually map out data flow. Why? To put in a fix, or enhancement, you’d first need to understand data flow before any work can commence.

And, that’s a very expensive, time consuming proposition.

There’re different ways to tackle the problem. Here’s one (Not-so-Smart) option:
• Manually review database queries and stored procedures
• Manually review application source code and extract from it embedded SQL statements

Adding to complexity,
• Dynamic SQL
• Object Relational Mapper (ORM)

The more practical approach would be to employ a SQL Profiler. Capture SQL Statements executed, and trace the flow manually. Even then, this typically requires experienced developers to get the job done (Which isn’t helping when you want to keep the cost down & delivery lead time as short as possible). As such undertaking is inherently risky – as you can’t really estimate how long it’ll take to map out the flow until you do.

There’s one command line utility MsSqlDataflowMapper (Free) from appliedalgo.com which can help. Basically, MsSqlDataflowMapper takes SQL Profiler trace file as input (xml), analyze captured SQL Statements. Look for INSERT’s and UPDATE’s. Then automatically dump data flow to a flow chart (HTML 5). Behind the scene, it uses SimpleFlowDiagramLib from Gridwizard to plot the flow chart – https://gridwizard.wordpress.com/2015/03/31/simpleflowdiagramlib-simple-c-library-to-serialize-graph-to-xml-and-vice-versa/

Limitation?
• Microsoft SQL Server only (To get around this, you can build your own tool capture SQL statements against Oracle/Sybase/MySQL…etc, analyze it, look up INSERT’s and UPDATE’s, then route result to SimpleFlowDiagramLib to plot the flow chart)
MsSqlDataflowMapper operates on table-level. It identify source/destination tables in process of mapping out the flow. However, it doesn’t provide field-level source information (a particular field in output table comes from which source tables?)
• The tool does NOT automatically *group* related tables into different Regions in diagram (This requires a lot more Intelligence in construction of the tool – as we all know, parsing SQL is actually a very complex task! https://gridwizard.wordpress.com/2014/11/08/looking-for-a-sql-parser-for-c-dotnet). At the end of the day, it still takes skilled developer to Make Sense of the flow.

Happy Coding!

Advertisements

Realtime Analytics: Just how many CPU/Processors would you need if your Market Data Service feeding thousands of ticks per second?

The Problem Statement

Ok, here’s the problem statement. You are building the application infrastructure for a group of 100 traders. The process starts with market data feed. A PriceServer stands between your corporate network and external market data vendors, Bloomberg, Reuters, Morningstar, Xgnite…etc, pumping in price ticks at rate of 3k per sec. CalcServer’s sitting in the back picks up the ticks, run some calculations, then publish results to trading/risk management screens.

Question is,

Just how many CPU would you need in your server farm if your Market Data Service feeding thousands of ticks per second?

 

Architecture

Different Loading Conditions - Different Tools

The problem we have at hand falls under CASE 3 or CASE 4 where updates frequency is high and distribution latency permitted is low, whether calculations is short, or lengthy.

Consider this hypothetical architecture. No scheduler, no load balancer, no caching or persistence support, minimal logging (As those found in tools such as appliedalgo.com, BMC control-m, or Autosys). Analogy is Space Shuttle construction, eliminate any components or weights you don’t need.

PriceServer, CalcServer and SimpleTradingScreen all feeds directly from RabbitMQ. The intention is to minimize latency/overhead cost along execution path starting from PriceServer, to CalcServer, then finally SimpleTradingScreen.

CalcFarm Architecture

How many processors we need in our CalcFarm? The maths is actually not that complicated. PriceServer is feeding the message bus (RabbitMQ) at rate of 3k ticks per second. That’s 1/3 ms per update. If CalcServer farm fails to process (Dequeue from RabbitMQ, run calc, then publish result back to RabbitMQ) a single ticks in 1/3 ms, then it’d lag behind PriceServer. If CalcServer farm takes 1 ms to process one tick, then it’s three times slower than PriceServer. Essentially, every additional 1ms in processing of published ticks requires additional 3 CPU’s in CalcServer farm in order that CalcServer farm stay in pace with PriceServer tick rate of 3k ticks/sec.

Imagine your calculations on average takes 100 ms to execute. That’s equivalent to 300 additional CPU’s. Or 37.5 servers each with 8 processors.

What you are doing here is essentially two things:

(a) To balance data velocity supported by each components along the overall execution chain

(b)  minimize latency along execution path between two points: PriceServer and SimpleTradingScreen.

 

Source Code? 

Want to experiment with this hands on? We’ve uploaded the source code to Git. Everything is built in .NET 4.5/C#.  Note that you need to install RabbitMQ first.

https://github.com/gridwizard/CalcFarm.git

Youtubehttp://youtu.be/FfwxRcZiEFw

One aspect worthy of special attention is that serialization/deserialization is high traffic area.  This happens when you queue or dequeue to/from RabbitMQ. General consensus is that “System.Runtime.Serialization.DataContractSerializer” and “System.Xml.Serialization.XmlSerializer” are relatively slow. For demo purpose, we’ve chosen “NetSerializer” – a free serializer from Codeproject.com. Source from Git: https://github.com/tomba/netserializer

REF:
(a) http://maxondev.com/serialization-performance-comparison-c-net-formats-frameworks-xmldatacontractserializer-xmlserializer-binaryformatter-json-newtonsoft-servicestack-text/
(b) http://www.codeproject.com/Articles/351538/NetSerializer-A-Fast-Simple-Serializer-for-NET

 

Start CalcServer Farm?

STEP 1. RabbitMQ

Please download+install RabbitMQ, make sure the service is running.

STEP 2. Start CalcServer

From command prompt, navigate to \CalcServer\bin\Release\

Simply type CalcServer. There’re only three settings in app.config:

<appSettings>

    <add key=”QueueUrl” value=”localhost”/>  – Point at RabbitMQ

    <add key=”DetailLog” value=”false”/> – For debugging only. If DetailLog=true, every price tick received would be logged and printed to screen. Set to false otherwise it will slow down CalcServer significantly.

    <add key=”MaxThreadPoolSize” value=”8″ /> – Maximum size of thread pool. Note, threads and concurrent instances of CalcServer on same physical machine share same set of Physical Resources, configure this wisely.

  </appSettings>

STEP 3. Start PriceServer

From command prompt, navigate to \PriceServer\bin\Release\

Simply type PriceServer. There’re only four settings in app.config:

<appSettings>

    <add key=”QueueUrl” value=”localhost”/>

    <add key=”DetailLog” value=”false”/>

    <add key=”PerSecPublishThrottle” value=”3000″/>     – Max publish rate

    <add key=”MaxCountPublishesCulmulative” value=”0″/>  – Max cumulative # publishes before stop

  </appSettings>

STEP 4. Start SimpleTradingScreen

From command prompt, navigate to \SimpleTradingScreen\bin\Release\

Simply type SimpleTradingScreen. There’re only two settings in app.config, I’m not going to repeat what’s already been covered.

<appSettings>

    <add key=”QueueUrl” value=”localhost”/>

    <add key=”DetailLog” value=”false”/>

  </appSettings>

 

How do we know that it works?

  • Tested on Intel 2.6GHz (Single Processor) with 4GB RAM (Very low end dev test machine, so if it can feed thru 3k/sec, so can you)
  • CalcServer dump performance statistics into “CalcServerStatistics”.

Basically,

  1. CountPxUpdateRecv is # ticks CalcServer has picked up from RabbitMQ.
  2. CountCalcCompleted is # calculations (dequeue+calc+publish result) been completed.
  3. Gap = CountPxUpdateRecv – CountCalcCompleted

If CalcServer is able to keep in pace with PriceServer, what you’d see is Gap stabilize.

The following result is done with PerSecPublishThrottle = 3000 (i.e. 3000 ticks per second)

Try set this to, for example, 10000. With Single instance of CalcServer running you will start seeing “Gap Widening” (i.e. Gap keeps increasing most of the time)

CalcServerStatistics

 

We’ve also written a simple NUnit test case.

It simply kicks off CalcServer and PriceServer. Let it sits for a few minutes, then examine CalcServer performance dump file \CalcServer\bin\Release\CalcServerStatistics.log

Then assert that “Average Gap” < 10k

It’s same as manually examining “CalcServerStatistics.log”, and confirm that “Gap” isn’t monotonically increasing, which is an indication that CalcServer can’t keep up with PriceServer.

CalcFarmNUnitTest

 

In addition to “Gap”, which is a measurement of throughput disparity, latency can also be measured simply by adding time stamps to messages.

latency = timestamp (PriceServer publish) – timestamp (SimpleTradingScreen recv)

Happy Coding!

Big Data, Small Data, Grid Computing, Cloud Computing, Distributed Computation vs Distributed Persistence

gridwizard

Hadoop (http://hadoop.apache.org) has gained a lot of popularity in recent years – and have claimed to throne in Grid Computing. There’s has been a lot of confusion what’s meant by Grid, Load Balancing, Big Data, Cloud…etc. There’re grids that’s geared towards persisting, parsing and analyzing non-relational data (Social media, web scrapping …etc) – Hadoop is one such example. There’re software vendors that cater for simple Enterprise workflow (i.e. Scheduling, Job Chaining) – BMC Control-M, Schedulix for instance. There’re also data platform that’s geared towards Numerical and Quantitative Analysis (Data in relational format)Applied Algo ETL Suite. How do we decide what’s suitable for what purpose?

What is Big Data?

We will start with this easy one – Big Data is just Big Data. Wikipedia says Big Data is simply a lot of data: Big data[1][2] is the term for a collection of 

View original post 1,451 more words

Grid Computing: Performance Tuning

There are many things you can do to improve performance of your grid. There’re couple of things you need to check before throwing in money on additional hardware – CPU’s, memory, faster disks, or adding nodes.

Before we proceed, it’s important to emphasis that, in general, Grid is only responsible for:

  • Scheduling of Jobs and tracking their execution status
  • Distribution of Jobs to nodes

If your jobs are not running fast enough, there’re two possibilities

  • Your job code has not been sufficiently optimized, or there’s a genuine performance issue in your job code.
  • Bottleneck in your Grid infrastructure

The following passage is to provide a quick check list on the latter, with reference to graphics and architecture of grid from Applied Algo (https://appliedalgo.com) as example.

Grid Computing - Performance Tuning

When troubleshooting performance issues in general, you start by asking yourself these two questions:

  • Identify Where the bottleneck is
  • What is the nature of the problem – is it CPU maxing out? Memory? Or Disk (Thrashing? Is it paging a lot? http://www.programmerinterview.com/index.php/operating-systems/how-virtual-memory-works)? Is your job code moving too much data across the different tiers?

Scheduling Settings

Multiple Jobs referencing same data on same external data source? You may be better off running them sequentially – run one after another.

Input Data – Bottleneck in Database Tier?

  • Partitioning Strategy: Jobs referencing data residing in same table, database? Partition your data across multiple data tables, data files on separate disks, multiple database instances/SQL Clusters.
  • SQL Optimization: Review Query Execution Plan; add primary key, indexes, foreign keys to optimize joins.

Grid Load Balancer

  • Node Affinity: Run fast jobs in one node group, slow jobs on another.
  • Throttling settings? If your node is working too hard, for example long disk queue, pushing it by queuing up jobs it cannot start won’t help.

Nodes

  • Actual job running on the nodes optimized?
  • Nodes proximity to input data – slow link? Your grid resides in the Cloud? What about your Data Source? Are you sending too much data over the wire?
  • Perform preliminary operations (filtering, simple aggregations for example) on input data in SQL, BEFORE fetching data into the nodes to minimize traffic, optimize consumption.
  • Excessive Logging?
  • Excessive threading won’t help – you’d just get a lot of context switches. Try limit # threads to # CPU’s

Happy Coding!

Big Data, Small Data, Grid Computing, Cloud Computing, Distributed Computation vs Distributed Persistence

Hadoop (http://hadoop.apache.org) has gained a lot of popularity in recent years – and have claimed to throne in Grid Computing. There’s has been a lot of confusion what’s meant by Grid, Load Balancing, Big Data, Cloud…etc. There’re grids that’s geared towards persisting, parsing and analyzing non-relational data (Social media, web scrapping …etc) – Hadoop is one such example. There’re software vendors that cater for simple Enterprise workflow (i.e. Scheduling, Job Chaining) – BMC Control-M, Schedulix for instance (Here’s a decent survey http://www.softpanorama.org/Admin/job_schedulers.shtml). There’re also data platform that’s geared towards Numerical and Quantitative Analysis (Data in relational format)Applied Algo ETL Suite. How do we decide what’s suitable for what purpose?

What is Big Data?

We will start with this easy one – Big Data is just Big Data. Wikipedia says Big Data is simply a lot of data: “”Big data is high volume, high velocity, and/or high variety information assets … Big data[1][2] is the term for a collection of data sets so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications. “ http://en.wikipedia.org/wiki/Big_data.

That’s not very helpful and we need a better answer than that do we? Big Data is…

a. Not Terabytes of data, but Petabytes of data

b. Data Platform which allows efficient storage (and retrieval) of such amount of Data. Typically, this means Distributed Storage.

c. Grid Computing Platform – which allows parallel processing against “Big Data” (Or, “Small Data”).

Find out if your data size is in range of hundred of GB’s? TB? Or really in PB range. Ask question, Where’s my data coming from? Am I purging and archiving away old obsolete entries from my database?

There’re many “Big Data” articles merely trying to establish a use case for “Big Data” technologies where it’s not needed. Example, http://www.syoncloud.com/case_study

First off, 50GB is not from “Big Data”. Secondly, my humble experience from brokerages, hedge funds, investment banks – nobody has “Big Data” (And Hadoop), except edge cases such as Exchanges, or firms mining the Internet.

So called “Traditional Approach” in the article where intra-application feeds by csv/xml/Excel/txt is tried and true approach still practiced in many of the biggest investment banks – support personnel loves it, because in event of issue, they can simply track down problem by opening the feed file. Further, there’re those where market data, positions, risks are communicated among different components in application chain by way of Message Bus (Tibco rendezvous for instance). There are smaller infrastructures where different components talks over Web Services or sockets.

There’re also others where a convincing use case is presented – imagine Mining companies collecting seismic data sets over wide geographical area (Many probes sending in periodic updates)?

http://www.teradata.com/articles/Data-Analytics-for-Oil-and-Gas-The-Business-Case/?utm_content=buffer3b455&utm_medium=social&utm_source=twitter.com&utm_campaign=buffer

Unless you’re Big Brother, or your business is Social Media, News/Media Mining, Retail businesses such as Amazon, or Credit Card companies, Security audits (Scans & surveillance can generate a lot of data), Real-time analysis of data stream from Mobile & Wearable devices (Ask why you’re persisting the data), IoT (Internet of Things) … etc- Firms with public, unstructured data. Otherwise, don’t rush off to a Big Data implementation.

“Don’t use Hadoop – your data isn’t that big by Chris Stucchio

http://www.chrisstucchio.com/blog/2013/hadoop_hatred.html

What is Load Balancing?

If you look around, “Load Balancing” is typically used in context of “Network Load Balancing” – which isn’t “Grid Computing”http://en.wikipedia.org/wiki/Load_balancing_(computing)

What is a Grid?

1. Distributed Computation – that is calculation/computation can be distributed around across machines in server farm. Distribution algorithms: server status (CPU/Memory/Disk Activity), round robin, node affinity ..etc. And, proximity to input data.

Hadoop’s Mapreduce algorithm (Excellent illustration here, http://ayende.com/blog/4435/map-reduce-a-visual-explanation) is closely tied with “Distributed Persistence” where slaves (Task Trackers) running on multiple machines, fetching input data from multiple “Data Nodes”, executes concurrently towards a single goal – i.e. Stateful Distribution of Computation Load for Incremental Analytics. This is particularly fit for Big Data analysis: Social Media (Example Facebook), news mining (Example Reuters).

Stateless Distribution, on the contrary, distributes loads without consideration to node’s proximity to input data.

2. Distributed Persistence – (a) Input data can be fetched from multiple sources residing on multiple machines, (b) Output data can be persisted on multiple machines. Hadoop HDFS is one such implementation where NameNode acts as Central Registry of all DataNodes. DataNodes stores data locally on disk. NoSQL (http://nosql-database.org)/MongoDb and Hadoop represents the leader in recent movement towards persistence in non-relational format.

3. Scheduling (Aka “Batch Processing”, “Workload Automation”)

Scheduler determines when to execute jobs. Most modern scheduler supports Job Chaining. Parent-Child job hierarchy and conditional execution with execution tracking. The word “Scheduling” in context of Quantitative Finance carries quite different meaning general IT (Infrastructure), which refers to it as “Workload Automation”, or “Batch Processing” – for example, daily cleanup log folders, shrink database, clean import/export folders, purge stale data, scan sensitive folders permissions…etc, Infrastructure/Platform related tasks in general. On the contrary, typical EOD (End of Day) batch in a Hedge Fund/Investment Bank Derivative Trading desks includes:

STEP 1. Mark to Market

STEP 2. Pnl Calculation

STEP 3. EOD Pricing

STEP 4. Stressing and Scenario Analysis

STEP 5. Downstream Feed

Quartz is the most prominent Open Source library at the time of writing, supporting both job scheduling and load balancing (aka “Cluster”), with specific limitation (For example Quartz.NET – jobs must be .NET and implement IJob interface). Further all Open Source libraries lacks GUI and persistence of anything (Schedules, Execution Status/Timestamp/History, Parameters, output results..etc). Standalone solutions: BMC Control-M (Commercial, approx >USD20k base), Applied Algo ETL Suite (Commercial ~USD1000 base, depending on how many nodes in server farm), Schedulix (Open Source + consulting fees).

4. GUI

Grid is NOT a GUI and GUI isn’t a mandatory component of Grid. But one observation is the absence of GUI for Cloud based solutions. For Hadoop, web based user interface under separate projects (Pig and Oozie) which you’d need to download, install and configure separately and is geared towards Hadoop specialists.

What is Cloud?

Cloud doesn’t means “Grid” – it simply means it’s a Hosted Platform. But Grid can be hosted in “The Cloud”. Google Compute Engine for example is a Grid Hosted in The Cloud. https://developers.google.com/compute/docs/load-balancing/

Financial sector has been slow to adopt due to Regulatory and Compliance constraints, but we’re seeing incremental additions. AWS Nasdaq Finqloud: http://aws.amazon.com/solutions/case-studies/nasdaq-finqloud/

Before anyone embark on a Cloud implementation – Is your data accessible from The Cloud? If not, you can forget it.

What’s Out There?

Here’s a list of Open Source and Commercial “Grids” and “Schedulers” – so can run of The Cloud, others are meant for local installation. Hadoop is no doubt leader in Big Data (Despite some bad presses in usability, and thus overall implementation cost: http://www.forbes.com/sites/danwoods/2012/07/27/how-to-avoid-a-hadoop-hangover)  with legions of vendors building on top (Hortonworks, Cloudera, Microsoft HDInsight…etc). You also have BMC Control-M (http://bmc.com) for traditional Enterprises Workflow. Applied Algo ETL Suite (https://appliedalgo.com) designed and priced for smaller Application Teams, in particular, Analytic/Quantitative Analysis with Automatic Persistence of numerical output in tabula format (Relational Database).

https://appliedalgo.com/appliedalgoweb/Doc/CompetitiveAnalysis/AppliedAlgo.Scheduler_LoadBalancer.FeatureComparison.htm

Is Cloud preferred over The Grid?

Simply put,

  • Cloud = Grid that resides in the Internet, and
  • Grid = Grid that resides in the Intranet.

Despite Cloud advocates pushing the idea Cloud is the next big thing and everything should run on Cloud (http://gavinbadcock.wordpress.com/2012/11/22/cloud-vs-grid-computing-why-are-we-leaving-the-grid-behind), it isn’t everything. Well, still, is it most of it?

Well, in terms of data size in #bytes, perhaps – after all, Social Media and Unstructured public information/raw data – maybe yes, Internet is infinitely vast. However, all financial/accounting/statistical data are still stored in relational format/databases and for security/compliance reasons and technical practicality, it’s going to remain this way. The Private Grid isn’t going away.

Now, in terms of speed, anyone who claim calculation runs faster in The Cloud is just clueless. I am not going to even attempt to argue for or against it!

This brings us to the core question: Would you [Rent] your apartment, or [Buy] it? Right answer depends on your purposes and intent, despite all the Hype, Cloud isn’t for everything. Renting isn’t always cheaper than Buying.

Simple math:

  • Google Compute Engine (Cloud) USD1000 per month base + implementation cost for scheduling/integration/GUI
  • BMC Control-M USD20k base + 1000 schedules @USD200 each (i.e. USD 200k)
  • Schedulix – free software + USD 10k for initial setup/consulting
  • Applied Algo ETL Suite USD1250 + hardware + intial implementation

One year tenure, while BMC Control M may be a bit expensive, Google Compute Engine isn’t really a lot more cost effective. Given it a couple years, even BMC be cheaper than Google Compute Engine – One catch: Private Grids generally requires dedicated staff to maintain – this will add to operating cost. So here, Google Compute Engine may still be cheaper than BMC Control-M. However, you still can’t beat Schedulix and Applied Algo. Further, if your data is not accessible from IP addresses outside corporate Firewall, Cloud based solution is simply not viable. General rule of thumb, in terms of Security and Performance, your data should be close to your Grid.

  • Google Compute Engine (Cloud) = USD12k + Development cost (Scheduling/GUI/Persistence..etc – say 365MD to get it built) = USD150k
  • BMC Control-M USD 250k
  • Schedulix USD 20k (Free software, Hardware + consulting)
  • Applied Algo ETL Suite USD2k + hardware = USD 10k

Notice, none of above is “Big Money” – To put it in perspective, USD200k is equivalent to just two mid-range developers for One year. Question is, would you pay USD200k for a grid?

Fundamentally, when you hear advocates claiming Cloud is cheaper than Private Grid – cross examine the maths. Pay attention to Overall Life-Cycle Cost, not just up-front/monthly recurring cost.

More inspiration: http://java.dzone.com/articles/compute-grids-vs-data-grids