- 作者:老汪软件技巧
- 发表时间:2024-11-13 17:01
- 浏览量:
在文章开始之前,我们先介绍一下生产消费者模型。
生产者-消费者模型是一种常见的并发设计模式,广泛应用于多线程或异步编程中。它主要用于解决生产和消费之间的协调问题,以提高系统的效率和响应性。以下是该模型的主要概念和特点:
基本组成应用场景一个经典的生产消费者模型
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private final BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 1; i <= 20; i++) {
System.out.println("produce: " + i);
queue.put(i);
System.out.println("produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("consume: " + (item * 2));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
producer.join();
consumer.interrupt();
}
}
一般来说每个 Java/Kotlin/Android 开发者都需要掌握这种最基本的生产消费者模型,才能够对多线程开发游刃有余,不过这种是最简单的模型,一般我们在实际开发中我们还需要借助线程池来管理。相信我们中的大多数人或多或少都被多线程任务折腾过,例如批量的上传任务,聊天消息队列处理。我相信被折磨的应该是属于大多数,能够真正掌握多线程框架并且熟练运用的实属凤毛麟角。多线程的里面的概念非常多,线程池,阻塞队列,Runnale,Future 等等。
Kotlin Flow 实现的生产消费者模型
上面我们介绍了生产消费者模型,以及基础实现,相信很多人都很熟悉,似曾相识。不过这种方式显得有点重,在资源管理,参数配置,以及任务调度上等想要每个环节都处理好不是一件轻松的事情。
下面我们来使用 kotlin flow 来实现一个生产消费者模型:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flow
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.flow.*
suspend fun processItem(item: Int): Int {
// 模拟处理时间
delay(10) // 处理每个 item 需要 10ms
return item * 2 // 示例处理逻辑
}
fun main() = runBlocking {
// 创建一个 Flow,发射 20 条数据
val flow = flow {
for (i in 1..20) {
println("produce: $i")
emit(i)
println("produced: $i")
}
}
// 正常 collect
val timeNormal = measureTimeMillis {
flow
.buffer(10)
.collect { value ->
val res = processItem(value)
println("consume: $res")
}
}
println("Time taken with normal collect: $timeNormal ms")
}
这是一个普通的不能再普通的 kotlin flow 了,如果你仔细看看 print 的内容你就会发现了,其实 flow 与 blockingqueue 的作用其实差不多,满了会 suspend 类似于 blockingqueue 的 block。
produce: 1
produced: 1
produce: 2
produced: 2
produce: 3
produced: 3
produce: 4
produced: 4
produce: 5
produced: 5
produce: 6
produced: 6
produce: 7
produced: 7
produce: 8
produced: 8
produce: 9
produced: 9
produce: 10
produced: 10
produce: 11
produced: 11
produce: 12
consume: 2
produced: 12
produce: 13
... Time taken with normal collect: 270 ms
分析一下上面的打印内容我们可以模拟这个任务队列的执行过程,首先 emit 来模仿生产端,collect 模拟下游消费端,flow buffer 来模仿一个有界队列,首先 flow 的 capacity 是 10,但是我们看 print 内容,实际到了生产 12 才挂起,其实也很好理解,因为下游消费端消费了一个 item,所以实际上 flow 里面有 10 个元素,所以当前挂在了 12.
上面的代码其实一个最基础的模型,在这个模型里面其实是不能执行多线程任务的,
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flow
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.flow.*
suspend fun processItem(item: Int): Int {
// 模拟处理时间
delay(10) // 处理每个 item 需要 10ms
return item * 2 // 示例处理逻辑
}
fun main() = runBlocking {
// 创建一个 Flow,发射 20 条数据
val flow = flow {
for (i in 1..20) {
println("produce: $i")
emit(i)
println("produced: $i")
}
}
// 使用 async 并行处理
val timeAsync = measureTimeMillis {
flow
.buffer(10)
.chunked(2)
.collect { chunk ->
val deferred1 = async {
println("consume: ${chunk[0]}")
processItem(chunk[0])
}
val deferred2 = async {
println("consume: ${chunk[1]}")
processItem(chunk[1])
}
// 等待两个协程完成
awaitAll(deferred1, deferred2)
}
}
println("Time taken with async collect: $timeAsync ms")
}
下面我们看一下打印内容,验证一下我们的代码
produce: 1
produced: 1
produce: 2
produced: 2
produce: 3
produced: 3
produce: 4
produced: 4
produce: 5
produced: 5
produce: 6
produced: 6
produce: 7
produced: 7
produce: 8
produced: 8
produce: 9
produced: 9
produce: 10
produced: 10
produce: 11
produced: 11
produce: 12
produced: 12
produce: 13
consume: 1
consume: 2
produced: 13
... Time taken with async collect: 145 ms
我们可以看到处理时间大约减少了一半,跟我们预期一致。上面展示的其实是一个最基础的模型,在实际生产中,大家只需要掌握了他的模型基本原理,然后组合利用各种操作符,相信处理好一般的开发任务应当是手拿把掐了。我建议一般的多线程任务,图片上传了,消息处理,大家尽量使用 kotlin flow 来处理。
当然 kotlin flow 还有一些其他的优点,首先的就是 函数式编程 Functional program,这与我们之前使用 Java 编写代码时候,使用面向对象编程 OOP 是有很大区别的,其次作用域的引入使得协程的操作变得简单起来,尤其对 Android jetpack 的支持,这使得我们很多时候不必手动取消,避免了很多不必要的异常。还有就是各种操作符的引入使得 flow 操作变得更加强大,应对更多的场景。
最后希望我的文章能够帮助到你,对你的工作有用。
Thanks for watching.