响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono

响应式 Web 第三节

  • 服务调用中的三种耦合
  • 响应式流规范与接口
  • 响应式流中的流量控制
  • Web中的响应式与请求/响应式的区别
  • 流式处理中的Source/Sink模型
  • RXJava2 观察者模式同步与异步实现
  • Project Reactor 中的 Flux、Mono
  • Flux、Mono 同步静态创建与异步动态创建
  • WebFlux

服务当中的耦合

在调用服务的时候,总会有耦合,基于rmi的

1、技术耦合:dubbo,典型的基于rpc的远程服务调用,两边都是java才能调用。
《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》
2、空间耦合:两台机器的依赖

《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》

3、时间耦合:服务的可用、不可用

微服务 就巧妙地解决了这三个维度上的耦合,但是所有调用几乎都是同步调用。
异步调用能提升整体的性能吗?不能,但是它能够提高整体的吞吐量,防止雪崩

对于传统编程模型的web服务:

  • 访问量过大,web服务可能会oom,浏览器/app一次接这么多数据可能也会扛不住
  • 而且前端的展示要等待传输的过程

解决方法:分页。
分页缺点:只能追加,不能在中间插入,否则会在分页取数据的时候发生混乱。也可以通过编码解决,但是会增加整体业务的复杂度。如果使用私有数据的话,你会和别人看到的数据不一样。
分页缺点解决方法:响应式编程,基于发布/订阅模型

发布/订阅模型

  • mq:做数据缓冲、通知,不做持久化,数据可以推过去,或者主动去拉也可以
  • zk
  • sse:server sent push

《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》

List底层是数组,是固定长度的;Flux底层是流,是可变长度的,流的大小取决于缓冲区的大小。

《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》
响应式数据库:例如,某个用户发送短信超过100条之后,会反过来去回调服务的接口。
《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》

设置边界:到达边界之后,就流向server/service,要考虑一次流多少:如果流多了,会造成流量过大,解决方法:加缓冲区,可以在服务端加缓冲区,也可以在客户端加缓冲区。
推送数据:超过客户端的临界值怎么办?丢弃策略
拉取数据:rocketmq是拉数据
推数据和拉数据,都是流式计算的概念

流式计算

Flume,Flink都是处理流的。
大数据技术栈中,引入了很多先进的概念,web架构中没有的。
Flume用来做大数据中对于日志的拉取。
Flink
source,channel,sink
source:数据源
channel:缓冲区
sink:目的地

处理数据:同步/异步
Flux<T>:可以装0~n个数据
Mono:只能装一个数据

背压处理,慢消费,同一线程,好控制

响应式流的规范:Reactive规范

  • Reactive是响应式,jdk9引入了响应式的接口。
  • Project Reactor,RXJava是响应式的框架。RXJava在安卓领域用的比较多
  • webflux也是响应式框架,将servlet换成了netty或servlet3

《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》

代码示例

Project Reactor

官网
https://projectreactor.io/

Reactor 是Spring5中构建各个响应式组件的基础框架,内部提供了Flux和Mono两个代表异步数据序列的核心组件。

Flux

静态方法生成

// 静态方法生成Flux
	String[] s = new String[] { "xx","oo"};
	// just 已知元素数量和内容 使用
	// 
	Flux<String> flux1 = Flux.just(s);
// flux1.subscribe(System.out::println);

	
	Flux<String> flux2 = Flux.just("xx","xxx");
// flux2.subscribe(System.out::println);
	
	
	
    //fromArray方法
    List<String> list = Arrays.asList("hello", "world");
    Flux<String> flux3 = Flux.fromIterable(list);
  // flux3.subscribe(System.out::println);
	
    
    //fromStream方法
    Stream<String> stream = Stream.of("hi", "hello");
    Flux<String> flux4 = Flux.fromStream(stream);
 // flux4.subscribe(System.out::println);
    
    
    //range方法
    Flux<Integer> range = Flux.range(0, 5);
    
 // range.subscribe(System.out::println);
    
  //interval方法, take方法限制个数为5个
    Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
    longFlux.subscribe(System.out::println);
    
    //链式
    Flux.range(1, 5).subscribe(System.out::println);
}

    //链式
   Flux.range(1, 5).subscribe(System.out::println);
   
   
   // 合并
   Flux<String> mergeWith = flux3.mergeWith(flux4);
   mergeWith.subscribe(System.out::println);
   System.out.println("---");
   
   // 结合为元祖
   Flux<String> source1 = Flux.just("111", "world","333");
   Flux<String> source2 = Flux.just("2111", "xxx");

   Flux<Tuple2<String, String>> zip = source1.zipWith(source2);
   zip.subscribe(tuple -> { 
       System.out.println(tuple.getT1() + " -> " + tuple.getT2());
   });
	// 跳过两个
    Flux<String> flux = Flux.just("1111", "222", "333");

    Flux<String> skip = flux.skip(2);
    skip.subscribe(System.out::println);
    
    // 拿前几个
    Flux<String> flux2 = Flux.just("1111", "222", "333");
    Flux<String> skip2 = flux2.take(2);
    skip2.subscribe(System.out::println);
   

	// 过滤
    Flux<String> flux = Flux.just("xx", "oo", "x1x");

    Flux<String> filter = flux.filter(s -> s.startsWith("x"));
    filter.subscribe(System.out::println);

	// 去重
    Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");

    Flux<String> filter = flux.filter(s -> s.startsWith("x")).distinct();
    filter.subscribe(System.out::println);
    // 转 Mono
    Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");
    Mono<List<String>> mono = flux.collectList();
    
    mono.subscribe(System.out::println);


    // 逻辑运算 all 与 any
    Flux<String> flux = Flux.just("xx", "oox", "x1x","x2x");

    Mono<Boolean> mono = flux.all(s -> s.contains("x"));
    mono.subscribe(System.out::println);

Mono 连接

		Flux<String> concatWith = Mono.just("100").concatWith(Mono.just("100"));
		concatWith.subscribe(System.out::println);

异常处理

		Mono.just("100")
				.concatWith(Mono.error(new Exception("xx")))
				
				.onErrorReturn("xxx")
				.subscribe(System.out::println)

动态创建

		// 同步动态创建,next 只能被调用一次
		Flux.generate(sink -> { 

			sink.next("xx");
			sink.complete();

		}).subscribe(System.out::print);
	}
		Flux.create(sink -> { 
			
			for (int i = 0; i < 10; i++) { 
				sink.next("xxoo:" + i);
			}
			
			sink.complete();
			
			
		}).subscribe(System.out::println);
	}

WebFlux

《响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono》

RXJava2

http://reactivex.io/#

Reactive Extensions

同步

哪个线程产生就在哪个线程消费

maven依赖

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
</dependency>

main

	public static void main(String[] args) { 
		
		Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() { 

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception { 
				emitter.onNext("1");
				emitter.onNext("2");
				emitter.onNext("3");
				emitter.onNext("4");
				emitter.onNext("5");
				emitter.onComplete();
			}
		});
	
	// 观察者
		Observer<String> man = new Observer<String>() { 
			@Override
			public void onSubscribe(Disposable d) { 
				// TODO Auto-generated method stub
				System.out.println("onSubscribe" + d);
			}

			@Override
			public void onNext(String t) { 
				// TODO Auto-generated method stub
				System.out.println("onNext " + t);
			}

			@Override
			public void onError(Throwable e) { 
				// TODO Auto-generated method stub
				System.out.println("onError " + e.getMessage());
			}

			@Override
			public void onComplete() { 
				// TODO Auto-generated method stub
				System.out.println("onComplete");
			}
		};
		
		girl.subscribe(man);
	}

异步

方法说明
Schedulers.computation()适用于计算密集型任务
Schedulers.io()适用于 IO 密集型任务
Schedulers.trampoline()在某个调用 schedule 的线程执行
Schedulers.newThread()每个 Worker 对应一个新线程
Schedulers.single()所有 Worker 使用同一个线程执行任务
Schedulers.from(Executor)使用 Executor 作为任务执行的线程
	public static void main(String[] args) throws InterruptedException { 
		Observable.create(new ObservableOnSubscribe<String>() { 

			@Override
			public void subscribe(ObservableEmitter<String> emitter) throws Exception { 
				emitter.onNext("1");
				emitter.onNext("2");
				emitter.onNext("3");
				emitter.onNext("4");
				emitter.onNext("5");
				emitter.onComplete();				
			}
		})
		.observeOn(
				Schedulers.computation()
				)
		.subscribeOn( Schedulers.computation())
		.subscribe(new Observer<String>() { 

			@Override
			public void onSubscribe(Disposable d) { 
				// TODO Auto-generated method stub
				System.out.println("onSubscribe");
			}

			@Override
			public void onNext(String t) { 
				// TODO Auto-generated method stub
				System.out.println("onNext");
			}

			@Override
			public void onError(Throwable e) { 
				// TODO Auto-generated method stub
				System.out.println("onError");
			}

			@Override
			public void onComplete() { 
				// TODO Auto-generated method stub
				System.out.println("onComplete");
			}

		})
		;
		Thread.sleep(10000);	
	}

下节课,我们讲WebFlux的应用~

    原文作者:寒泉Hq
    原文地址: https://blog.csdn.net/sinat_42483341/article/details/107265264
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞