NHN Cloud Meetup 編集部
Springからの要求に応じた付加応答を追加する(2) – reactor適用
2018.05.21
2,282
はじめに
前回開発したソースには、2つの問題がありました。
- Network I / Oを順次実行
- O(n)時間がかかる:timeout * attachment改修
- AsyncでO(1)で終わるようにチューニングが必要
- Failover
- attachmentは単純付加情報にもかかわらずattachmentServiceでexceptionが発生した場合、何の情報も取得できない
- attachは失敗してもBoard情報と残りの成功したattachmentは表示する必要がある
今回は、このような課題をreactorを使って解決してみよう。

Reactor
まず、なぜreactorを使用するのか、簡単に整理してみよう。
- Rx(Reactive Extension)を実装し、簡単に非同期プログラミングが可能
- 別のRx実装チェーンRxJavaと比較したとき、次のような利点がある
- Spring5に統合しやすい
- Java8対応
- rxJavaは1.6バージョンから使え、独自のFunction
を実装して使用 - ReactorはJava8から使え、Java8 ApiとOptionalなどをサポートしている
- rxJavaは1.6バージョンから使え、独自のFunction
ここではReactor APIの基本的な内容は言及しません。ちなみにreactorはJavaのバージョンに影響を受けます。本文のソースはSpring 5で作成しているが、Java8を使用する場合は、以下のソースをSpring4に適用しても、正常に動作することを確認する必要があります。
AttachmentWrapperItem
問題を解決する前にまずリファクタリングが必要です。以前の内容を振り返ってみよう。BoardDtoは以下のようにAttachmentWrapperを持っています。
@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class BoardDto implements Attachable {
private Long id;
private String title;
private String content;
@Setter(AccessLevel.PRIVATE)
@JsonIgnore
private AttachmentWrapper attachmentWrapper = new AttachmentWrapper();
}
AttachmentWrapperはAttachmentTypeとAttachmentを別々に受け取ります。
@ToString
@EqualsAndHashCode
public class AttachmentWrapper {
//...
void put(AttachmentType type, Attachment attachment);
}
reactorを使うと、Mono<T>, Flux<T>のようにGenericで表現できるように、AttachmentTypeとAttachmentを1つにまとめるAttachmentWrapperItemクラスを作成し、これをAttachmentWrapperに反映しなければなりません。
AttachmentWrapperItem
@Value
public class AttachmentWrapperItem {
// 例外発生時に返却するインスタンス
public static final AttachmentWrapperItem ON_ERROR = new AttachmentWrapperItem(null, null);
private AttachmentType type;
private Attachment attachment;
}
AttachmentWrapper適用
@ToString
@EqualsAndHashCode
public class AttachmentWrapper {
interface AttachmentMap {
boolean isEmpty();
Set<Map.Entry<AttachmentType, Attachment>> entrySet();
}
@Delegate(types = AttachmentMap.class)
private Map<AttachmentType, Attachment> value = new EnumMap<>(AttachmentType.class);
public void putAll(Collection<AttachmentWrapperItem> items) {
this.value.putAll(items.stream().collect(Collectors.toMap(AttachmentWrapperItem::getType, AttachmentWrapperItem::getAttachment)));
}
}
Attachable interface変更
既存の2つのパラメータで受け取っていたものを、1つのパラメータで受け取るように変更します。
// 変更前
interface Attachable {
//...
default void attach(Map<? extends AttachmentType, ? extends Attachment> attachment) {
getAttachmentWrapper().putAll(attachment);
}
}
// 変更後
interface Attachable {
//...
default void attach(Collection<AttachmentWrapperItem> items) {
getAttachmentWrapper().putAll(items);
}
}
AttachService変更
getAttachmentの戻り値をAttachmentWrapperItemに変えます。
AttachmentWrapperItem getAttachment(Attachable attachable);
AttachWriterToBoardService変更
AttachServiceの変更されたロジックを反映します。
// 変更前
@Override
public Attachment getAttachment(Attachable attachment) {
BoardDto boardDto = supportType.cast(attachment);
return writerClient.getWriter(boardDto.getWriterId());
}
// 変更後
@Override
public AttachmentWrapperItem getAttachment(Attachable attachable) {
BoardDto boardDto = supportType.cast(attachable);
Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
return new AttachmentWrapperItem(supportAttachmentType, attachment);
}
AttachmentAspect変更
// 変更前
private void executeAttach(Attachable attachable) {
Set<AttachmentType> types = attachmentTypeHolder.getTypes();
Map<AttachmentType, Attachment> attachmentMap =
types.stream()
.flatMap(type -> typeToServiceMap.get(type).stream())
.filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
.collect(Collectors.toMap(AttachService::getSupportAttachmentType, service -> service.getAttachment(attachable)));
attachable.attach(attachmentMap);
}
// 変更後
private void executeAttach(Attachable attachable) {
Set<AttachmentType> types = attachmentTypeHolder.getTypes();
List<AttachmentWrapperItem> items =
types.stream()
.flatMap(type -> typeToServiceMap.get(type).stream())
.filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
.map(service -> service.getAttachment(attachable))
.collect(Collectors.toList());
attachable.attach(items);
}
reactorで非同期プログラミングを適用
attachService.getAttachment()を呼び出すときにNetwork I/Oが発生しています。問題は、このメソッドがattachmentの件数分、実行されるという点です。非同期プログラミングを適用してこれを解決してみよう。
依存性の設定
compile('io.projectreactor:reactor-core:3.1.5.RELEASE')
AttachService修正
getAttachmentの戻り値の型をMono<AttachmentWrapperInfo>に変更します。
public interface AttachService<T extends Attachable> {
AttachmentType getSupportAttachmentType();
Class<T> getSupportType();
Mono<AttachmentWrapperItem> getAttachment(Attachable attachable);
}
AttachWriterToBoardService修正
修正したAttachServiceの実装体であるAttachWriterToBoardServiceに変更内容を反映してみよう。
@Override
public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) {
return Mono.defer(() -> executeGetAttachment(attachable))
// Network I/Oを使用、elastic()で生成されたthreadで実行されるように宣言
.subscribeOn(Schedulers.elastic());
}
// getAttachmentが実行していた部分をもってくる
// 戻り値にMono.just()をかぶせる
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
BoardDto boardDto = supportType.cast(attachable);
Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}
AttachmetAspect修正
Attachableの実装体のタイプに合わせてserviceを実行します。取得したMonoのListをそれぞれ非同期で実行させ、block()を呼び出して同期します。
private void executeAttach(Attachable attachable) {
List<Mono<AttachmentWrapperItem>> monoItems = createMonoList(attachable);
List<AttachmentWrapperItem> items = executeMonoAndCollectList(monoItems);
attachable.attach(items);
}
// Attachableのタイプに合わせてサービス実行後、List<Mono>を生成する
private List<Mono<AttachmentWrapperItem>> createMonoList(Attachable attachable) {
Set<AttachmentType> types = attachmentTypeHolder.getTypes();
return types.stream()
.flatMap(type -> typeToServiceMap.get(type).stream())
.filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
.map(service -> service.getAttachment(attachable))
.collect(Collectors.toList());
}
// List<Mono>をzip()でそれぞれ実行しながら、List<attachmentWrapperItem>を作成して返却
// それぞれのMonoは内部でelastic()により非同期で実行され
// block()を介して最終的に同期する
private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) {
return Mono.zip(monoItems, this::filterItems)
.block();
}
private List<AttachmentWrapperItem> filterItems(Object[] itemArray) {
return Stream.of(itemArray)
.map(AttachmentWrapperItem.class::cast)
.collect(Collectors.toList());
}
実行
非同期に戻ることを確認しよう。
テストコードを組んで確認するのが最良ですが、ここでは簡単にThread.sleep(3000)で確認してみましょう。
// 書き込みサービスに3秒スリープ
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
try { Thread.sleep(3000); } catch (InterruptedException e) { }
BoardDto boardDto = supportType.cast(attachable);
Attachment attachment = new SimpleAttachmentCollection<>(commentClient.getComments(boardDto.getId()));
return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}
// 作成者情報サービスに3秒スリープ
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
try { Thread.sleep(3000); } catch (InterruptedException e) { }
BoardDto boardDto = supportType.cast(attachable);
Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}
3秒以上、6秒以内にリクエストがきたら成功です。

reactorでエラー克服
reactorでエラーを克服する方法は簡単です。エラーが発生した場合、以前作成したAttachmentWrapperItem.ON_ERRORを返すようにすればよいでしょう。Rxはこのような状況のためのAPIが、すべて定義しています。
AttachServiceで例外発生時の処理
AttachWriterToBoardServiceでWriterをインポートしている間に、Exceptionが発生した場合、AttachmentWrapperItem.ON_ERRORを送信するように変更します。
@Slf4j
@Component
public class AttachWriterToBoardService implements AttachService<BoardDto> {
//...
private final WriterClient writerClient;
private final Duration timeout;
@Autowired
public AttachWriterToBoardService(@NonNull WriterClient writerClient,
@Value("${attach.writer.timeoutMillis:5000}") long timeout) {
this.writerClient = writerClient;
this.timeout = Duration.ofMillis(timeout);
}
//...
@Override
public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) {
return Mono.defer(() -> executeGetAttachment(attachable))
.subscribeOn(Schedulers.elastic())
.timeout(timeout) // reactorにtimeoutをかける。
.doOnError(e -> log.warn(e.getMessage(), e)) // エラーが発生したらlogを残す。代替値を返すのでwarnに指定。
.onErrorReturn(AttachmentWrapperItem.ON_ERROR); // エラー発生でON_ERRORを返す。
}
}
AttachmentAspectでON_ERRORをろ過するようにロジックを変更
前述のAttachmentAspectからList<Mono>を非同期で実行し、結果値をList<AttachmentWrapperItem>にまとめてAttachableに入れました。簡単に非同時実行結果がON_ERRORある場合をフィルタリングすれば、成功結果だけを集めてList<AttachmentWrapperItem>を作ることができます。
@Slf4j
@Component
@Aspect
public class AttachmentAspect {
//...
private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) {
// timeoutでMonoが実行される最大時間を指定することもできる
return Mono.zip(monoItems, this::filterItems)
.doOnError(e -> log.warn(e.getMessage(), e))
.onErrorReturn(Collections.emptyList()) // すべてのMonoを実行して集合する過程でエラーが発生した場合、emptyList()を返す
.block();
}
private List<AttachmentWrapperItem> filterItems(Object[] itemArray) {
return Stream.of(itemArray)
.map(AttachmentWrapperItem.class::cast)
.filter(item -> item != AttachmentWrapperItem.ON_ERROR) // 例外処理によって失敗した要請は除く
.collect(Collectors.toList());
}
}
実行
以前、100番の掲示板を呼び出すとき、作成者情報を取得しようとするとFeignClientで404を投げ、下記のようにAPI自体が失敗しました。
GET /boards/100?attachment=comments,writer
{
"timestamp": "2018-03-08T07:55:22.127+0000",
"status": 500,
"error": "Internal Server Error",
"message": "status 404 reading WriterClient#getWriter(long); content: {}",
"path": "/boards/100"
}
feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {}
at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na]
...
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
failover適用後、例外はwarnでログを残し、成功した部分までは応答できるようになりました。
{
{
"id": 100,
"title": "title100",
"content": "content100",
"comments": [
{
"id": 496,
"email": "Zola@lizzie.com",
"body": "neque unde voluptatem iure\nodio excepturi ipsam ad id\nipsa sed expedita error quam\nvoluptatem tempora necessitatibus suscipit culpa veniam porro iste vel"
}
]
}
}
2018-03-08 19:59:12.056 WARN 64890 --- [ elastic-5] c.p.s.s.a.s.w.AttachWriterToBoardService : status 404 reading WriterClient#getWriter(long); content:
{}
feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {}
at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na]
...
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
まとめ
Reactorを使って非同期プログラミングを行い、障害に対処して克服できるように試みました。途中、reactorにtimeout()を使用したので、この部分はclientからFeignClientを使用することで、application.ymlに引き出して別途管理することができます。以前共有したHystrixとも連携してfallbackを実装することもでき、強力な障害対応ができます。
依然として現在のコードは大きな欠点があります。AttachmentAspectでreactorのblock()を呼び出すという点です。これについては、reactor learnページから参照した画像で説明できそうです。
| 出典_:https://projectreactor.io/learn_ |
つまりNon-Blockingでリソースを効率的に使用しなかったということです。そのためSpringFramework 5ではwebfluxを使ってnetty基盤(基本設定)でNon-Blocking + Asyncを使用できるようにしました。