平田です。
先日のBRMS 5.3のハンズオンではComplex Event Processing (CEP: 複合イベント処理) の説明や演習が無かったので、調査がてらDrools Fusionのbrokerサンプルを眺めてみました。
Drools Fusion
Drools Fusionは、DroolsエンジンにおけるCEPモジュールです。
初めてDroolsを調べたとき、「Complex Event Processing」という字面だけ見て「なぜにイベントがファーストクラス扱いなの?」と思って掘り下げなかったのですが、CEPという技術用語らしいです。検索すると色々と出てきますが、不正検知やリソース最適化などに使われているようです。
brokerサンプル概要
Drools Fusionのサンプルは、githubかDrools Downloadsページの「Drools and jBPM integration」から取得できます。サンプルプログラムが三つ入っており、brokerがDrools Fusionのサンプルになります。
brokerは、いわゆるアルゴリズムトレードのプログラムです。runExamples.batを実行すると、株価ボードが起動し、値動きに応じて自動的に株式売買を行い、そのログが画面右側に出力されます。
brokerは以下の様な構成要素から成ります。
- org.drools.examples.broker.BrokerExample
- mainメソッドを持つブートストラップクラス
- org.drools.examples.broker.Broker
- このプログラムのコントローラ
- receiveメソッドが値動きの度に呼ばれます。
- org.drools.examples.broker.ui パッケージ
- Swing GUIコンポーネント
- org.drools.examples.broker.events, misc, model パッケージ
- 値動きのイベントやルール用のファクト、ユーティリティなど
- src/main/resources
- stocktickstream.dat
- 値動きのテストデータ
- 各列はそれぞれ「開始からのミリ秒」「銘柄」「株価」
- broker.drl, notify.drl, position.drl
- ルール定義
- position.rf
- 値動きを検知した際のルールフロー
Javaプログラム部分の動き
brokerプログラムは、stocktickstream.datが発生させる値動きイベントに反応して動きます。EventFeederやStockTickPersisterが、この辺りの処理を実装しています。
値動きの度にBrokerコントローラのreceiveが呼ばれ、ルールが起動されます。
public void receive(Event<?> event) { try { StockTick tick = ((Event<StockTick>) event).getObject(); //値動きイベントデータ Company company = this.companies.getCompany( tick.getSymbol() ); this.tickStream.insert( tick ); this.session.getAgenda().getAgendaGroup( "evaluation" ).setFocus(); this.session.fireAllRules(); //ルール起動 window.updateCompany( company.getSymbol() ); window.updateTick( tick ); } catch ( Exception e ) { System.err.println("============================================================="); System.err.println("Unexpected exception caught: "+e.getMessage() ); e.printStackTrace(); } }
値動きイベント処理
値動きイベントに関する処理は、broker.drlに定義されています。”Update stock price”ルールでは、値動きのたびにワーキングメモリ内のファクトデータ(Company: 銘柄、StockTick: 値動き)を更新しています。
# a simple rule to show that it is possible to join # events from an entry-point (stream) with facts # present in the working memory rule "Update stock price" agenda-group "evaluation" lock-on-active when $cp : Company( $sb : symbol ) $st : StockTick( symbol == $sb, $pr : price ) from entry-point "StockTick stream" then // This shows an update on working memory facts with data from joined events modify( $cp ) { currentPrice = $pr } // Although events are considered immutable, a common pattern is to use a class // to represent an event and enrich that event instance with data derived from other facts/events. // Below we "enrich" the event instance with the percentual change in the price, // based on the previous price modify( $st ) { delta = $cp.delta } end
また、”sudden drop”で、下げ幅が5%を超えた場合にSuddenDropEventを発生させています。
# this rule shows a trick to get the last available event as well as # how to call global services from the consequence rule "sudden drop" enabled true agenda-group "report" when $st : StockTick( $sb : symbol, $ts : timestamp, $pr : price, $dt : delta < -0.05 ) from entry-point "StockTick stream" not( StockTick( symbol == $sb, timestamp > $ts ) from entry-point "StockTick stream" ) then services.log( "Drop >5%: "+$sb+" delta: "+ Utils.percent($dt)+" price: $"+$pr ); # we also want to create an event and forward it into the engine to a predefined entry point # that is being listened by other rules with( sde = new SuddenDropEvent() ) { symbol = $sb, percent = $dt, timestamp = $ts } entryPoints["Analysis Events"].insert( sde ); end
値下がりイベント処理
SuddenDropEventが発生すると、position.drlの”Start adjust position process”がマッチし、position.rfで定義したルールフローを開始します。
# here we have an example of a rule that controls a process rule "Start adjust position process" when $sde : SuddenDropEvent( ) from entry-point "Analysis Events" then variables = [ "symbol" : $sde.symbol ]; drools.getKnowledgeRuntime().startProcess( "adjust position", variables ); end
position.rfでは、まず”Evaluate Position”ノードでposition.drlの”If the drop is…”のルールを実行し、下げ幅に応じたアクション(買い、売り、スルー)を決定します。
# below we have rules controlled by the process, # i.e., the process will fire these rules when necessary # to re-evaluate the position rule "If the drop is between 6% and 8%, buy more shares" ruleflow-group "evaluate position" when $sde : SuddenDropEvent( percent >= -0.08 && < -0.06 ) from entry-point "Analysis Events" then with( pa = new PortfolioAction() ) { action = Action.BUY, symbol = $sde.symbol, quant = 100 } insert( pa ); end rule "If the drop is on more than 8%, sell shares" ruleflow-group "evaluate position" when $sde : SuddenDropEvent( percent < -0.08 ) from entry-point "Analysis Events" then with( pa = new PortfolioAction() ) { action = Action.SELL, symbol = $sde.symbol, quant = 100 } insert( pa ); end rule "If the drop is between 5% and 6%, do nothing" ruleflow-group "evaluate position" when $sde : SuddenDropEvent( percent >= -0.06 ) from entry-point "Analysis Events" then with( pa = new PortfolioAction() ) { action = Action.NOACTION, symbol = $sde.symbol } insert( pa ); end
ワーキングメモリに投入されたPortfolioActionをもとにルールフローの分岐を遷移し、アクションを実行(ログ出力のみ)します。
アクション実行後、Notifyノードからnotify.drlに定義されたファクトの削除処理が呼ばれます。
rule "Portfolio action no longer needed" dialect "mvel" ruleflow-group "notify" when $pa : PortfolioAction() then retract( $pa ); end
まとめ
「全部Javaで書けば良いじゃん」と思うかもしれませんが、ルール部分は差し替え可能であり、プログラムの修正無しに下げ幅の閾値を調整したり、アクションを追加できるのがポイントになります。