/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import rx.functions.Func1;
/**
* Benchmark the cost of subscription and initial request management.
*
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
*
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class SubscribingPerf {
Observable just = Observable.just(1);
Observable range = Observable.range(1, 2);
@Benchmark
public void justDirect(Blackhole bh) {
DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}
@Benchmark
public void justStarted(Blackhole bh) {
StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}
@Benchmark
public void justUsual(Blackhole bh) {
UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.subscribe(subscriber);
}
@Benchmark
public void rangeDirect(Blackhole bh) {
DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}
@Benchmark
public void rangeStarted(Blackhole bh) {
StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}
@Benchmark
public void rangeUsual(Blackhole bh) {
UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.subscribe(subscriber);
}
@Benchmark
public void justDirectUnsafe(Blackhole bh) {
DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}
@Benchmark
public void justStartedUnsafe(Blackhole bh) {
StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}
@Benchmark
public void justUsualUnsafe(Blackhole bh) {
UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
just.unsafeSubscribe(subscriber);
}
@Benchmark
public void rangeDirectUnsafe(Blackhole bh) {
DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}
@Benchmark
public void rangeStartedUnsafe(Blackhole bh) {
StartedSubscriber subscriber = new StartedSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}
@Benchmark
public void rangeUsualUnsafe(Blackhole bh) {
UsualSubscriber subscriber = new UsualSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
range.unsafeSubscribe(subscriber);
}
@State(Scope.Thread)
public static class Chain {
@Param({"10", "1000", "1000000"})
public int times;
@Param({"1", "2", "3", "4", "5"})
public int maps;
Observable source;
@Setup
public void setup() {
Observable o = Observable.range(1, times);
for (int i = 0; i < maps; i++) {
o = o.map(new Func1() {
@Override
public Integer call(Integer v) {
return v + 1;
}
});
}
source = o;
}
@Benchmark
public void mapped(Chain c, Blackhole bh) {
DirectSubscriber subscriber = new DirectSubscriber(Long.MAX_VALUE, bh);
bh.consume(subscriber);
c.source.subscribe(subscriber);
}
}
static final class DirectSubscriber extends Subscriber {
final long r;
final Blackhole bh;
public DirectSubscriber(long r, Blackhole bh) {
this.r = r;
this.bh = bh;
}
@Override
public void onNext(T t) {
bh.consume(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
}
@Override
public void setProducer(Producer p) {
p.request(r);
}
}
static final class StartedSubscriber extends Subscriber {
final long r;
final Blackhole bh;
public StartedSubscriber(long r, Blackhole bh) {
this.r = r;
this.bh = bh;
}
@Override
public void onStart() {
request(r);
}
@Override
public void onNext(T t) {
bh.consume(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
}
}
/**
* This requests in the constructor.
* @param the value type
*/
static final class UsualSubscriber extends Subscriber {
final Blackhole bh;
public UsualSubscriber(long r, Blackhole bh) {
this.bh = bh;
request(r);
}
@Override
public void onNext(T t) {
bh.consume(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
}
}
}