NHN Cloud Meetup 編集部
Springからの要求に応じた付加応答を追加する(2) – reactor適用
2018.05.21
2,204
はじめに
前回開発したソースには、2つの問題がありました。
- Network I / Oを順次実行
- O(n)時間がかかる:timeout * attachment改修
- AsyncでO(1)で終わるようにチューニングが必要
- Failover
- attachmentは単純付加情報にもかかわらずattachmentServiceでexceptionが発生した場合、何の情報も取得できない
- attachは失敗してもBoard情報と残りの成功したattachmentは表示する必要がある
今回は、このような課題をreactorを使って解決してみよう。
Reactor
まず、なぜreactorを使用するのか、簡単に整理してみよう。
- Rx(Reactive Extension)を実装し、簡単に非同期プログラミングが可能
- 別のRx実装チェーンRxJavaと比較したとき、次のような利点がある
- Spring5に統合しやすい
- Java8対応
- rxJavaは1.6バージョンから使え、独自のFunction
を実装して使用
- ReactorはJava8から使え、Java8 ApiとOptionalなどをサポートしている
- rxJavaは1.6バージョンから使え、独自のFunction
ここではReactor APIの基本的な内容は言及しません。ちなみにreactorはJavaのバージョンに影響を受けます。本文のソースはSpring 5で作成しているが、Java8を使用する場合は、以下のソースをSpring4に適用しても、正常に動作することを確認する必要があります。
AttachmentWrapperItem
問題を解決する前にまずリファクタリングが必要です。以前の内容を振り返ってみよう。BoardDtoは以下のようにAttachmentWrapperを持っています。
@Data @JsonInclude(JsonInclude.Include.NON_NULL) public class BoardDto implements Attachable { private Long id; private String title; private String content; @Setter(AccessLevel.PRIVATE) @JsonIgnore private AttachmentWrapper attachmentWrapper = new AttachmentWrapper(); }
AttachmentWrapperはAttachmentTypeとAttachmentを別々に受け取ります。
@ToString @EqualsAndHashCode public class AttachmentWrapper { //... void put(AttachmentType type, Attachment attachment); }
reactorを使うと、Mono<T>, Flux<T>のようにGenericで表現できるように、AttachmentTypeとAttachment
を1つにまとめるAttachmentWrapperItem
クラスを作成し、これをAttachmentWrapper
に反映しなければなりません。
AttachmentWrapperItem
@Value public class AttachmentWrapperItem { // 例外発生時に返却するインスタンス public static final AttachmentWrapperItem ON_ERROR = new AttachmentWrapperItem(null, null); private AttachmentType type; private Attachment attachment; }
AttachmentWrapper適用
@ToString @EqualsAndHashCode public class AttachmentWrapper { interface AttachmentMap { boolean isEmpty(); Set<Map.Entry<AttachmentType, Attachment>> entrySet(); } @Delegate(types = AttachmentMap.class) private Map<AttachmentType, Attachment> value = new EnumMap<>(AttachmentType.class); public void putAll(Collection<AttachmentWrapperItem> items) { this.value.putAll(items.stream().collect(Collectors.toMap(AttachmentWrapperItem::getType, AttachmentWrapperItem::getAttachment))); } }
Attachable interface変更
既存の2つのパラメータで受け取っていたものを、1つのパラメータで受け取るように変更します。
// 変更前 interface Attachable { //... default void attach(Map<? extends AttachmentType, ? extends Attachment> attachment) { getAttachmentWrapper().putAll(attachment); } } // 変更後 interface Attachable { //... default void attach(Collection<AttachmentWrapperItem> items) { getAttachmentWrapper().putAll(items); } }
AttachService変更
getAttachmentの戻り値をAttachmentWrapperItem
に変えます。
AttachmentWrapperItem getAttachment(Attachable attachable);
AttachWriterToBoardService変更
AttachServiceの変更されたロジックを反映します。
// 変更前 @Override public Attachment getAttachment(Attachable attachment) { BoardDto boardDto = supportType.cast(attachment); return writerClient.getWriter(boardDto.getWriterId()); } // 変更後 @Override public AttachmentWrapperItem getAttachment(Attachable attachable) { BoardDto boardDto = supportType.cast(attachable); Attachment attachment = writerClient.getWriter(boardDto.getWriterId()); return new AttachmentWrapperItem(supportAttachmentType, attachment); }
AttachmentAspect変更
// 変更前 private void executeAttach(Attachable attachable) { Set<AttachmentType> types = attachmentTypeHolder.getTypes(); Map<AttachmentType, Attachment> attachmentMap = types.stream() .flatMap(type -> typeToServiceMap.get(type).stream()) .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass())) .collect(Collectors.toMap(AttachService::getSupportAttachmentType, service -> service.getAttachment(attachable))); attachable.attach(attachmentMap); } // 変更後 private void executeAttach(Attachable attachable) { Set<AttachmentType> types = attachmentTypeHolder.getTypes(); List<AttachmentWrapperItem> items = types.stream() .flatMap(type -> typeToServiceMap.get(type).stream()) .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass())) .map(service -> service.getAttachment(attachable)) .collect(Collectors.toList()); attachable.attach(items); }
reactorで非同期プログラミングを適用
attachService.getAttachment()を呼び出すときにNetwork I/Oが発生しています。問題は、このメソッドがattachmentの件数分、実行されるという点です。非同期プログラミングを適用してこれを解決してみよう。
依存性の設定
compile('io.projectreactor:reactor-core:3.1.5.RELEASE')
AttachService修正
getAttachmentの戻り値の型をMono<AttachmentWrapperInfo>に変更します。
public interface AttachService<T extends Attachable> { AttachmentType getSupportAttachmentType(); Class<T> getSupportType(); Mono<AttachmentWrapperItem> getAttachment(Attachable attachable); }
AttachWriterToBoardService修正
修正したAttachServiceの実装体であるAttachWriterToBoardServiceに変更内容を反映してみよう。
@Override public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) { return Mono.defer(() -> executeGetAttachment(attachable)) // Network I/Oを使用、elastic()で生成されたthreadで実行されるように宣言 .subscribeOn(Schedulers.elastic()); } // getAttachmentが実行していた部分をもってくる // 戻り値にMono.just()をかぶせる private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) { BoardDto boardDto = supportType.cast(attachable); Attachment attachment = writerClient.getWriter(boardDto.getWriterId()); return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment)); }
AttachmetAspect修正
Attachableの実装体のタイプに合わせてserviceを実行します。取得したMonoのListをそれぞれ非同期で実行させ、block()
を呼び出して同期します。
private void executeAttach(Attachable attachable) { List<Mono<AttachmentWrapperItem>> monoItems = createMonoList(attachable); List<AttachmentWrapperItem> items = executeMonoAndCollectList(monoItems); attachable.attach(items); } // Attachableのタイプに合わせてサービス実行後、List<Mono>を生成する private List<Mono<AttachmentWrapperItem>> createMonoList(Attachable attachable) { Set<AttachmentType> types = attachmentTypeHolder.getTypes(); return types.stream() .flatMap(type -> typeToServiceMap.get(type).stream()) .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass())) .map(service -> service.getAttachment(attachable)) .collect(Collectors.toList()); } // List<Mono>をzip()でそれぞれ実行しながら、List<attachmentWrapperItem>を作成して返却 // それぞれのMonoは内部でelastic()により非同期で実行され // block()を介して最終的に同期する private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) { return Mono.zip(monoItems, this::filterItems) .block(); } private List<AttachmentWrapperItem> filterItems(Object[] itemArray) { return Stream.of(itemArray) .map(AttachmentWrapperItem.class::cast) .collect(Collectors.toList()); }
実行
非同期に戻ることを確認しよう。
テストコードを組んで確認するのが最良ですが、ここでは簡単にThread.sleep(3000)で確認してみましょう。
// 書き込みサービスに3秒スリープ private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) { try { Thread.sleep(3000); } catch (InterruptedException e) { } BoardDto boardDto = supportType.cast(attachable); Attachment attachment = new SimpleAttachmentCollection<>(commentClient.getComments(boardDto.getId())); return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment)); } // 作成者情報サービスに3秒スリープ private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) { try { Thread.sleep(3000); } catch (InterruptedException e) { } BoardDto boardDto = supportType.cast(attachable); Attachment attachment = writerClient.getWriter(boardDto.getWriterId()); return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment)); }
3秒以上、6秒以内にリクエストがきたら成功です。
reactorでエラー克服
reactorでエラーを克服する方法は簡単です。エラーが発生した場合、以前作成したAttachmentWrapperItem.ON_ERRORを返すようにすればよいでしょう。Rxはこのような状況のためのAPIが、すべて定義しています。
AttachServiceで例外発生時の処理
AttachWriterToBoardServiceでWriterをインポートしている間に、Exceptionが発生した場合、AttachmentWrapperItem.ON_ERROR
を送信するように変更します。
@Slf4j @Component public class AttachWriterToBoardService implements AttachService<BoardDto> { //... private final WriterClient writerClient; private final Duration timeout; @Autowired public AttachWriterToBoardService(@NonNull WriterClient writerClient, @Value("${attach.writer.timeoutMillis:5000}") long timeout) { this.writerClient = writerClient; this.timeout = Duration.ofMillis(timeout); } //... @Override public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) { return Mono.defer(() -> executeGetAttachment(attachable)) .subscribeOn(Schedulers.elastic()) .timeout(timeout) // reactorにtimeoutをかける。 .doOnError(e -> log.warn(e.getMessage(), e)) // エラーが発生したらlogを残す。代替値を返すのでwarnに指定。 .onErrorReturn(AttachmentWrapperItem.ON_ERROR); // エラー発生でON_ERRORを返す。 } }
AttachmentAspectでON_ERRORをろ過するようにロジックを変更
前述のAttachmentAspectからList<Mono>
を非同期で実行し、結果値をList<AttachmentWrapperItem>にまとめてAttachable
に入れました。簡単に非同時実行結果がON_ERROR
ある場合をフィルタリングすれば、成功結果だけを集めてList<AttachmentWrapperItem>
を作ることができます。
@Slf4j @Component @Aspect public class AttachmentAspect { //... private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) { // timeoutでMonoが実行される最大時間を指定することもできる return Mono.zip(monoItems, this::filterItems) .doOnError(e -> log.warn(e.getMessage(), e)) .onErrorReturn(Collections.emptyList()) // すべてのMonoを実行して集合する過程でエラーが発生した場合、emptyList()を返す .block(); } private List<AttachmentWrapperItem> filterItems(Object[] itemArray) { return Stream.of(itemArray) .map(AttachmentWrapperItem.class::cast) .filter(item -> item != AttachmentWrapperItem.ON_ERROR) // 例外処理によって失敗した要請は除く .collect(Collectors.toList()); } }
実行
以前、100番の掲示板を呼び出すとき、作成者情報を取得しようとするとFeignClientで404を投げ、下記のようにAPI自体が失敗しました。
GET /boards/100?attachment=comments,writer
{ "timestamp": "2018-03-08T07:55:22.127+0000", "status": 500, "error": "Internal Server Error", "message": "status 404 reading WriterClient#getWriter(long); content: {}", "path": "/boards/100" } feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {} at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na] ... at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
failover適用後、例外はwarnでログを残し、成功した部分までは応答できるようになりました。
{ { "id": 100, "title": "title100", "content": "content100", "comments": [ { "id": 496, "email": "Zola@lizzie.com", "body": "neque unde voluptatem iure\nodio excepturi ipsam ad id\nipsa sed expedita error quam\nvoluptatem tempora necessitatibus suscipit culpa veniam porro iste vel" } ] } }
2018-03-08 19:59:12.056 WARN 64890 --- [ elastic-5] c.p.s.s.a.s.w.AttachWriterToBoardService : status 404 reading WriterClient#getWriter(long); content: {} feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {} at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na] ... at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
まとめ
Reactorを使って非同期プログラミングを行い、障害に対処して克服できるように試みました。途中、reactorにtimeout()を使用したので、この部分はclientからFeignClientを使用することで、application.ymlに引き出して別途管理することができます。以前共有したHystrix
とも連携してfallbackを実装することもでき、強力な障害対応ができます。
依然として現在のコードは大きな欠点があります。AttachmentAspectでreactorのblock()
を呼び出すという点です。これについては、reactor learnページから参照した画像で説明できそうです。
| 出典_:https://projectreactor.io/learn_ |
つまりNon-Blockingでリソースを効率的に使用しなかったということです。そのためSpringFramework 5ではwebfluxを使ってnetty基盤(基本設定)でNon-Blocking + Asyncを使用できるようにしました。