DEBS Grand Challenge: Continuous Analytics on Geospatial Data Streams with WSO2 Complex Event Processor Sachini Jayasekara, Srinath Perera, Miyuru Dayarathna, Sriskandarajah Suhothayan WSO2 Inc.
Problem o Dataset Taxi rides collected from New York in year 2013[1] o Each line has timestamp, start end locations, fare details etc. o 13K cars, 173 million events o 2 Queries o Queries based on 0.5km and 0.25km cells over New York. [1]. Chris Whong (http://chriswhong.com/open-data/foil_nyc_taxi/)
CEP Operators 1. Filters or transformations (process a single event) from Ball[v>10] select .. insert into .. 2. Windows + aggregation (track window of events: time, length) from Ball#window.time(30s) select avg(v) .. 3. Joins (join two event streams to one) from Ball#window.time(30s) as b join Players as p on p.v < b.v 4. Patterns (state machine implementation) from Ball[v>10], Ball[v<10]*,Ball[v>10] select .. 5. Event tables (map a database as an event stream) Define table HitV (v double) using .. db info ..
Complex Event Processing see http://goo.gl/BaPFYA for more info.
Query 1: Frequent Routes o Output 10 most frequent routes in last 30 minutes o Need to output when value has changed ( current time derived from event’s timestamp attribute)
Query 2: Profitable Areas o Find the cells that are most profitable for taxi drivers at the given moment. o Profitability = median (fare + tip) for last 15 minutes divided by the number of taxi drivers who have dropped-off and have not taken a new trip in the last 30 minutes per cell.
Optimizations o WSO2 CEP o Object Pooling o Only keep required Attributes (e.g., in window) o Algorithmic o String Lookup o Reusing windows o Avoid Join o FrequentK o Counting Pattern o Median (Bucket) o Fully use the computer
Avoid Joins o Q2 process median and taxi counting in parallel o But join is expensive due to ordering o Instead, calculate median, enrich the event with results, use enriched event to calculate empty taxi, then divide median by empty taxi without a join.
Taxi Counting Pattern Optimizations o Query creates a state machine to track taxi’s state, and update counts accordingly o Slow with CEP pattern as it searches all states to check for expiration o Fixed by keeping states sorted by starting time (2X improvement)
Fully use the Computer o So far, we remove unnecessary operations!! o Now we have to use all 4 cores of the VM o How? o Data Partition o Pipeline o Pipeline with single buffer
Data Partition : Issues o Need to reorder and send timing updates o But savings due to partition is small (e.g. frequentK is O(log (n)) and execution in a partition take O(log(n/p)) o All savings lost when reordering
Execution Pipeline o Break different stages to a pipeline o Now we can use 6 threads ( 1 and 6 does IO so OK) o 125K/sec now, but 50ms latency o Bottleneck is moving events between queues
Circular Buffer based Pipeline o One circular buffer with sequence barriers using LMAX disruptor o Avoid cost of moving events, reduce GC, and works well with the cache o 2X more throughput and 0ms latency
Results o Pretty good on real HW (8 core) and AWS ( 4 core), but not as good on VirtualBox ( 4 core) o Can run on 512M heap size with only 10% slowdown
Results: Speedup vs. Concurrency o Compared against single node version o Real HW scaled well, AWS less and VM scale up was very small
Results: Latency vs. Throughput o each point is (env, thread count, size of buffer)
Conclusion o All changes except final circular buffer in WSO2 CEP 4.0 ( released 2015 Q3) o WSO2 CEP is free and available under Apache Open source Licence o Fast and flexible, and already used in many critical use cases.

ACM DEBS Grand Challenge: Continuous Analytics on Geospatial Data Streams with WSO2 Complex Event Processor

  • 1.
    DEBS Grand Challenge:Continuous Analytics on Geospatial Data Streams with WSO2 Complex Event Processor Sachini Jayasekara, Srinath Perera, Miyuru Dayarathna, Sriskandarajah Suhothayan WSO2 Inc.
  • 2.
    Problem o Dataset Taxirides collected from New York in year 2013[1] o Each line has timestamp, start end locations, fare details etc. o 13K cars, 173 million events o 2 Queries o Queries based on 0.5km and 0.25km cells over New York. [1]. Chris Whong (http://chriswhong.com/open-data/foil_nyc_taxi/)
  • 3.
    CEP Operators 1. Filtersor transformations (process a single event) from Ball[v>10] select .. insert into .. 2. Windows + aggregation (track window of events: time, length) from Ball#window.time(30s) select avg(v) .. 3. Joins (join two event streams to one) from Ball#window.time(30s) as b join Players as p on p.v < b.v 4. Patterns (state machine implementation) from Ball[v>10], Ball[v<10]*,Ball[v>10] select .. 5. Event tables (map a database as an event stream) Define table HitV (v double) using .. db info ..
  • 4.
    Complex Event Processing seehttp://goo.gl/BaPFYA for more info.
  • 5.
    Query 1: FrequentRoutes o Output 10 most frequent routes in last 30 minutes o Need to output when value has changed ( current time derived from event’s timestamp attribute)
  • 6.
    Query 2: ProfitableAreas o Find the cells that are most profitable for taxi drivers at the given moment. o Profitability = median (fare + tip) for last 15 minutes divided by the number of taxi drivers who have dropped-off and have not taken a new trip in the last 30 minutes per cell.
  • 7.
    Optimizations o WSO2 CEP oObject Pooling o Only keep required Attributes (e.g., in window) o Algorithmic o String Lookup o Reusing windows o Avoid Join o FrequentK o Counting Pattern o Median (Bucket) o Fully use the computer
  • 8.
    Avoid Joins o Q2process median and taxi counting in parallel o But join is expensive due to ordering o Instead, calculate median, enrich the event with results, use enriched event to calculate empty taxi, then divide median by empty taxi without a join.
  • 9.
    Taxi Counting PatternOptimizations o Query creates a state machine to track taxi’s state, and update counts accordingly o Slow with CEP pattern as it searches all states to check for expiration o Fixed by keeping states sorted by starting time (2X improvement)
  • 10.
    Fully use theComputer o So far, we remove unnecessary operations!! o Now we have to use all 4 cores of the VM o How? o Data Partition o Pipeline o Pipeline with single buffer
  • 11.
    Data Partition :Issues o Need to reorder and send timing updates o But savings due to partition is small (e.g. frequentK is O(log (n)) and execution in a partition take O(log(n/p)) o All savings lost when reordering
  • 12.
    Execution Pipeline o Breakdifferent stages to a pipeline o Now we can use 6 threads ( 1 and 6 does IO so OK) o 125K/sec now, but 50ms latency o Bottleneck is moving events between queues
  • 13.
    Circular Buffer basedPipeline o One circular buffer with sequence barriers using LMAX disruptor o Avoid cost of moving events, reduce GC, and works well with the cache o 2X more throughput and 0ms latency
  • 14.
    Results o Pretty goodon real HW (8 core) and AWS ( 4 core), but not as good on VirtualBox ( 4 core) o Can run on 512M heap size with only 10% slowdown
  • 15.
    Results: Speedup vs.Concurrency o Compared against single node version o Real HW scaled well, AWS less and VM scale up was very small
  • 16.
    Results: Latency vs.Throughput o each point is (env, thread count, size of buffer)
  • 17.
    Conclusion o All changesexcept final circular buffer in WSO2 CEP 4.0 ( released 2015 Q3) o WSO2 CEP is free and available under Apache Open source Licence o Fast and flexible, and already used in many critical use cases.