多线程执行

Runnable 不可返回值

Callable 可以返回值,返回Future(Java8可用CompletableFuture)

Callable.call()必须有返回时,FutureTask.get()方法才会执行,否则一直阻塞

Java8新提供CompletableFuture,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 串行执行
* @throws IOException
* @throws InterruptedException
*/
@Test
void CompletableFuture() throws IOException, InterruptedException {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> "今天");
// cfQuery成功后继续执行下一个任务:
CompletableFuture<String> cfFetch = cfQuery.thenApplyAsync((code) -> code + "星期");
CompletableFuture<String> ThFetch = cfFetch.thenApplyAsync((code) -> code + "五");
// 并行 ----多个条件只需要满足一个
CompletableFuture<Object> future = CompletableFuture.anyOf(cfFetch, cfQuery, ThFetch);

// cfFetch成功后打印结果:
ThFetch.thenAccept((result) -> {
System.out.println("消息: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}

也可以并行执行,逻辑流程为:
其中CompletableFuture.anyOf()含义为:只要任意一个成功,即可执行下面的逻辑,CompletableFuture.allOf()含义为:全部成功之后执行下面逻辑


三个线程轮流输出1-100,不能重复

可以用JUC中LockSupport类中park()方法进行阻塞,unpark()方法进行解除阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static Thread t1 = null;
static Thread t2 = null;
static Thread t3 = null;

static int countt = 1;

public static void main(String[] args) {
t1 = new Thread(() -> {
while (true) {
LockSupport.park();
System.out.println(Thread.currentThread().getName() + ":" + countt++);
if (countt > 100) {
LockSupport.unpark(t2);
break;
}
LockSupport.unpark(t2);
}
});
t2 = new Thread(() -> {
while (true) {
LockSupport.park();
if (countt > 100) {
LockSupport.unpark(t3);
break;
}
System.out.println(Thread.currentThread().getName() + ":" + countt++);
LockSupport.unpark(t3);
}
});
t3 = new Thread(() -> {
while (true) {
LockSupport.park();
if (countt > 100) {
LockSupport.unpark(t1);
break;
}
System.out.println(Thread.currentThread().getName() + ":" + countt++);
LockSupport.unpark(t1);
}
});
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}

测试接口并发承受量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 测试并发
*/
@Test
void testConcurrency() {
// 设置300并发量
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 300; i++) {
new Thread(new seed(countDownLatch)).start();
}
countDownLatch.countDown();
}

private class seed implements Runnable {
private final CountDownLatch countDownLatch;

private seed(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程可见性

volatile 关键字

  • CPU指令重排序

当cpu写缓存时发现缓存区块被其他cpu占用,可能将后面的读缓存命令优先执行,但是不管怎么重排序,(单线程)程序的执行结果不能被改变,runtime和处理器需遵守as-if-serial语义

告诉JVM被修饰的变量不要缓存,直接写入内存,所以也可以防止程序被JVM重排序,防止另外的线程拿到错误的数据

比如A线程将i变量加1,B线程紧接着要去对i这个共享资源进行操作。

synchronized 关键字

被synchronized修饰的代码,在获取锁前,在释放锁后都能保证线程可见性

Integer类型多线程值的变更

推荐使用AtomicInteger工具类

1
2
3
4
5
6
AtomicInteger atomicInteger = new AtomicInteger();
@Test
void testAto(){
// 如果值为当前所期望的0,那么将它变为1
atomicInteger.compareAndSet(0,1);
}

火车票超卖案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 测试100张火车票
private AtomicInteger ato = new AtomicInteger(100);

private Lock lock = new ReentrantLock();

CountDownLatch countDownLatch = new CountDownLatch(1);

Runnable buy = () -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (ato.get() > 0) {
lock.lock();
if (ato.get() > 0) {
try {
System.out.println(Thread.currentThread().getName() + "号窗口开始抢票,票数剩余==》" + ato.decrementAndGet());
} finally {
lock.unlock();
}
}
}
};

@Test
void buy() throws IOException {
// 模拟4个线程进行购票
new Thread(buy, Integer.toString(1)).start();
new Thread(buy, Integer.toString(2)).start();
new Thread(buy, Integer.toString(3)).start();
new Thread(buy, Integer.toString(4)).start();
countDownLatch.countDown();
System.in.read();
}

读多写少时推荐使用读写锁

读写锁

ReentrantLock虽然可以解决高并发的问题,但是效率不高,关于读取其实我们没有必要上锁,我们只需要保证任意时刻,只有一个线程进行修改即可

这时候推荐使用ReadWriteLock/StampedLock(Java8新增)读写锁

  • 只允许一个线程写入
  • 没有写入时,多个线程允许同时读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}
public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}

ReadWriteLock和StampedLock区别

ReadWriteLock :如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

定时执行线程

1
2
3
4
5
6
7
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
  • 注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间
  • 而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务

并发限流测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RateLimiter limiter = RateLimiter.create(50);
CountDownLatch down = new CountDownLatch(60);

@Test
public void token() {
for (int i = 0; i < 60; i++) {
new Thread(() -> {
try {
down.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (limiter.tryAcquire()) {
System.out.println("抢到了");
} else {
System.out.println("没抢到");
}
}).start();
down.countDown();
}
}
赏个🍗吧
0%