0%

多多线程进阶-JUC并发编程

多多线程进阶-JUC并发编程

1.什么是JUC?

java.util工具包、包、类

业务普通的业务代码 Thread

Runnable:没有返回值,效率相对Callable低

2.线程和进程

线程、进程,如果不能使用一句话说出来的技术,不扎实

进程:一个程序,QQ.exe、Music.exe程序的集合

一个进程往往包含多个线程,至少包含一个

java默认有几个线程?2个,1.main 2.GC

线程:开一个进程玩一下联盟,操作英雄,语音通话(线程负责)

对java而言开启线程: Thread、Runnable、Callable

java真的能开启线程吗? 开不了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();

/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//本地方法,底层C++,java运行在虚拟机上,无法直接操作硬件
private native void start0();

并发和并行

并发:多个线程操作同一个资源

  • Cpu一核,模拟出多条线程,天下武功,唯快不破,快速交替

并行:多个人一起走(多个线程一起执行)

  • Cpu多核,多个线程可以同时执行
1
2
3
4
5
public static void main(String[] args) {
//获取Cpu核心数
int i = Runtime.getRuntime().availableProcessors();
System.out.println("i = " + i);//i = 4
}

并发编程的本质:充分利用CPU的资源

线程有几个状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
//新生
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
//运行
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
//阻塞
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
//等待,,死死等待
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
//超时等待
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
//终止
TERMINATED;
}

wait/sleep区别

1.来自不同的类

wait=>Object

sleep=>Thread

2.关于锁的释放

wait会释放锁,sleep睡觉了,抱着锁睡觉,不会释放锁

3.使用的范围不同

wait必须在同步代码块中

sleep可以在任何地方睡

4.是否需要捕获异常

wait不需要捕获异常

sleep必须捕获异常

3.Lock(重点)

公平锁:十分公平,先来后到(如果先到的线程等待时间较长,就需要一直等待)

非公平锁:十分不公平,可以插队,无参构造默认非公平锁(充分利用cup资源)

1
synchronized和Lock的区别

1.synchronized是内置关键字,lock是JUC并发包下的类

2.synchronized无法判断锁的状态,lock可以判断锁的状态

3.synchronized会自动释放锁,Lock必须手动释放,如果不释放锁,死锁

4.synchronized线程1(获得锁,阻塞)、线程2(等待,傻傻得等),Lock不一定会一直等待

5.synchronized可重入锁,不可用中断,非公平,lock可重入锁,不可用中断,非公平(有参构造可设置)

6.synchronized适合锁少量代码同步问题,lock适合锁大量同步代码

总结:synchronized类似自动挡汽车,lock类似手动挡汽车,更灵活

锁是什么?如何判断锁的是谁?

4.生产者消费者问题

面试的:单例模式、排序算法、生产者消费者、死锁

生产者消费者synchronized版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//多线程通信问题:生产者和消费者
//判断等待,业务,通知
class Data {
int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
if (number != 0) {
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();
}

public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}

问题存在,A、B、C、D四个线程 虚假唤醒

解决办法,if判断改为while判断

就是用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。

这也就是为什么用while而不用if的原因了,因为线程被唤醒后,执行开始的地方是wait之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//多线程通信问题:生产者和消费者
//判断等待,业务,通知
class Data {
int number = 0;

//+1
public synchronized void increment() throws InterruptedException {
while (number != 0) {
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();
}

//-1
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();
}

public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}

JUC 版生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//多线程通信问题:生产者和消费者
//判断等待,业务,通知
//使用JUC版 Lock
class Data1 {
int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

//-1
public synchronized void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
Data1 data = new Data1();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}

为什么使用JUC版Condition?Condition可以实现精准通知唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//面试题,使用多线程,依次打印A、B、C
class Data3 {
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int number = 1;

public void printA() {
lock.lock();
try {
//业务代码 业务判断,执行,通知
while (number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "打印:AAAAA");
//唤醒指定的线程
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public void printB() {
lock.lock();
try {
//业务代码
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "打印:BBBBB");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public void printC() {
lock.lock();
try {
//业务代码
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "打印:CCCCC");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printC();
}
}, "C").start();
}
}

5.8锁问题

小结

new 锁的this ,具体的实例

static 锁的Class唯一的模板

6.集合类不安全

1.List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CollectoinTest {
//java.util.ConcurrentModificationException 并发修改异常
public static void main(String[] args) {
//并发下ArrayList不安全? synchronized
/**
*解决方案
* 1.使用Vector,List<String> list = new Vector<>();(不推荐),Vector是JDK1.0的产物,使用的synchronized,效率低
* 2.List<String> list = Collections.synchronizedList(new LinkedList<>());
* 3.List<String> list = new CopyOnWriteArrayList<>();推荐使用,底层使用的是新版JUC下的lock方式,效率相对要快
*/
//CopyOnWrite 写入时复制,读写分离 COW 计算机程序设计领域的一种优化策略
//CopyOnWriteArrayList比Vector好在哪里?
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(Thread.currentThread().getName() + list);
}, "" + i).start();

}
}
}

Vector

1
2
3
4
5
6
7
//使用synchronized同步代码块,效率低
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}

CopyOnWriteArrayList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//使用JUC下的lock方式加锁,效率快
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

使用CopyOnWriteMap需要注意两件事情:

    1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
    1. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。

CopyOnWrite的缺点

CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap

数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。

2.Set

set和List同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SetTest {
// 多线程下会抛出java.util.ConcurrentModificationException 并发修改异常
public static void main(String[] args) {
/**
* 解决方案
* 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2.Set<String> set = new CopyOnWriteArraySet<>();(写时复制,用在读多写少的场景)
*/
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}).start();

}
}
}

hashSet底层是什么?

1
2
3
4
5
6
7
8
9
10
//HashSet底层就是HashMap!
public HashSet() {
map = new HashMap<>();
}
//add set本质就是map的key,map的key不能重复!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//不变的值
private static final Object PRESENT = new Object();

3.Map

ConcurrentHashMap

7.Callable

1.可以有返回值

2.可以抛出异常

3.方法不同 run()/call()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
//适配类,Thread无法直接接受Callable,使用FutureTask适配,使用了适配器模式
FutureTask task = new FutureTask(myThread);
new Thread(task, "A").start();//怎么启动Callable?
new Thread(task, "B").start();//结果会被缓存,效率高
String s = (String) task.get();//这个get方法可能会发送阻塞,放在最后
//或者使用异步通信来处理
System.out.println(s);
}
}

class MyThread implements Callable<String> {

@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + "----call()");
return "str";
}
}

细节:

1.有缓存

2.结果可能需要等待,会阻塞

8.常用的辅助类

1.CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//计数器
class CountTest {
public static void main(String[] args) throws InterruptedException {
//总数是6,必须在执行任务的时候使用
CountDownLatch count = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " GO OUT");
count.countDown();//-1
}, "" + i).start();
}
count.await();//等待计数器归零,再向下执行
System.out.println("Close Door");
}
}

原理:

count.countDown();//数量-1

count.await();//等待计数器归零,再向下执行

每次有线程调用count.countDown(),数量-1,假设计数器变为0, count.await()就会被唤醒,继续向下执行

2.CyclicBarrier

加法计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CyclicBarrierTest {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
CyclicBarrier barrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
int finalI = i;//lamba能直接操作i吗?
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "收集第" + finalI + "颗龙珠");
barrier.await();//等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

3.Semaphore

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SemaphoreTest {


public static void main(String[] args) {
//3线程数量,停车位,限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
//acquire 得到
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//释放
}
}, "" + i).start();

}
}
}

原理:

semaphore.acquire(); 获得,假如已经满了,等待,等待被释放为止

semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程

作用:多个共享资源互斥使用,并发限流,控制最大线程数!

9.ReadWriteLock

读写锁,写的时候只能一个线程写,读的时候可以多个线程共享读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ReadWriteLockTest {
//如果不使用读写锁,在写入的时候,多个线程会同时写入
public static void main(String[] args) {
MapCache cache = new MapCache();
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
cache.put("" + finalI, finalI);
}, "写线程" + i).start();
}
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
cache.get("" + finalI);
}, "读线程" + i).start();
}
}
}

//自定义缓存
class MapCache {
private volatile Map<String, Object> mapCache = new HashMap<>();

//存,写
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "写入" + key);
mapCache.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");

}

//取,读
public Object get(String key) {
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = mapCache.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok");
return o;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ReadWriteLockTest {
//如果不使用读写锁,在写入的时候,多个线程会同时写入
public static void main(String[] args) {
MapCache cache = new MapCache();
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
cache.put("" + finalI, finalI);
}, "写线程" + i).start();
}
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
cache.get("" + finalI);
}, "读线程" + i).start();
}
}
}

/**
* ReadWriteLock
* 独占锁(写锁) 一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* 读-读 能共享
* 读-写 不能共享
* 写-读 不能共享
*/
//自定义缓存
class MapCache {
private volatile Map<String, Object> mapCache = new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

//存,写
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
mapCache.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}


}

//取,读
public Object get(String key) {
readWriteLock.readLock().lock();
Object o = null;
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
o = mapCache.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}

return o;
}
}

10.阻塞队列

什么情况下会使用阻塞队列?多线程并发处理,线程池!

四组api

方式 抛出异常 不抛出异常 超时等待 阻塞等待
添加 add offer offer(“d”, 2, TimeUnit.SECONDS) put
移除 remove poll poll(2,TimeUnit.SECONDS) take
检查首元素 element peek
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 抛出异常
*/
class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(3);
test1(queue);
test2(queue);
}

private static void test1(BlockingQueue queue) {
System.out.println(queue.add("a"));
System.out.println(queue.add("b"));
System.out.println(queue.add("c"));
// System.out.println(queue.add("d"));//队列满,抛出异常Queue full
System.out.println(queue.element());//获取首元素,无元素,抛出异常NoSuchElementException
}

private static void test2(BlockingQueue queue) {
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());//队列空了,抛出异常java.util.NoSuchElementException
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 不抛出异常
*/
class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(3);
// test1(queue);
// test2(queue);
test3(queue);
test4(queue);
}


private static void test3(BlockingQueue queue) {
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println(queue.offer("d"));//队列满,不抛出异常,返回false
System.out.println(queue.peek());//获取首元素,队列空,不抛出异常,返回null
}
private static void test4(BlockingQueue queue) {
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());//队列空了,不抛出异常,返回null
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 阻塞等待
*/
class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(3);
// test1(queue);
// test2(queue);
// test3(queue);
// test4(queue);
test5(queue);
test6(queue);
}


private static void test5(BlockingQueue queue) throws InterruptedException {
queue.put("a");
queue.put("b");
queue.put("c");
// queue.put("d");//队列满,不抛出异常,阻塞等待
}
private static void test6(BlockingQueue queue) throws InterruptedException {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());//队列空了,不抛出异常,阻塞等待
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 超时等待
*/
class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(3);
// test1(queue);
// test2(queue);
// test3(queue);
// test4(queue);
// test5(queue);
// test6(queue);
test7(queue);
test8(queue);
}



private static void test7(BlockingQueue queue) throws InterruptedException {
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println(queue.offer("d", 2, TimeUnit.SECONDS));//队列满,不抛出异常,超时等待,返回false
}
private static void test8(BlockingQueue queue) throws InterruptedException {
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll(2,TimeUnit.SECONDS));//队列空了,不抛出异常,超时等待,返回null
}
}

SynchronousQueue 同步队列

没有容量,

进去一个元素后,只能等到取出元素后才能再放入元素

put、take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 同步队列
* 和其它的阻塞队列BlockingQueue不一样,
* put进去一个元素后,必须先从里面take出一个元素后,才能再put放入元素!
*/
class SynchronousQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue();//同步队列
new Thread(() -> {
try {
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + " put a");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + " put b");
blockingQueue.put("c");
System.out.println(Thread.currentThread().getName() + " put c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}

11.线程池(重点)

线程池:三大方法,7大参数,4种拒绝策略

池化技术

程序的运行,本质:占用系统资源!优化资源的使用=》池化技术

线程池、连接池(JDBC)、内存池、对象池(JVM)……创建和销毁。十分浪费资源

池化技术:事先准备好一些资源,有人要用,就从我这里拿,用完之后还给我

默认大小:2

max

线程池的好处:

1.降低资源的消耗

2.提高响应速度

3.方便管理

线程可以复用了,可以控制最大并发数,管理线程

线程池三大方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//Executors工具类,3大方法
class ExecutorsTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(3);//创建一个固定大小的线程池
// ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩,遇强则强,遇弱则弱
try {
for (int i = 0; i < 10; i++) {
//使用线程池之后,使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "=>ok");
});

}
} catch (Exception e) {
e.printStackTrace();
}
finally {
threadPool.shutdown();//用完关闭线程池
}
}

}

7大参数

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//约21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//本质是ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时等待时间,超时了没有人调用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不动
RejectedExecutionHandler handler//拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

手动创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//Executors工具类,3大方法
class ExecutorsTest {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(3);//创建一个固定大小的线程池
// ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩,遇强则强,遇弱则弱
//工作中使用自定义线程池ThreadPoolExecutor
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常RejectedExecutionException
);
try {
//最大承载:队列Queue+maxnumPoolSize
for (int i = 0; i < 9; i++) {
//使用线程池之后,使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "=>ok");
});

}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();//用完关闭线程池
}
}

}
1
new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常RejectedExecutionException
1
new ThreadPoolExecutor.CallerRunsPolicy()//哪里来的去哪里,main线程过来的,就返回main线程执行
1
new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不会抛出异常
1
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试去和队列首位竞争,如果竞争失败不执行也不抛出异常,如果成功就执行

小结和扩展

最大核心线程池数量怎么设置?

​ 1.cpu密集型,几核就设置几,可以保持CPU的效率最高
​ 2.io密集型 判断你的程序中十分耗费io的线程有多少,一般是线程数量的2倍

了解:cpu密集型、io密集型(调优)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ExecutorsTest {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(3);//创建一个固定大小的线程池
// ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩,遇强则强,遇弱则弱
//最大核心线程数量怎么去设置?
//1.cpu密集型,几核就设置几,可以保持CPU的效率最高
//2.io密集型 判断你的程序中十分耗费io的线程有多少,一般是线程数量的2倍
//15个大型任务,io会十分占用资源

//获取系统核心数
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
2,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试去和队列首位竞争,如果竞争失败不执行也不抛出异常,如果成功就执行
);
try {
//最大承载:队列Queue+maxnumPoolSize
for (int i = 0; i < 9; i++) {
//使用线程池之后,使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "=>ok");
});

}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();//用完关闭线程池
}
}

}

12.ForkJoin

什么是ForkJoin?

ForkJoin在Jdk1.7出现,可以并行执行任务,提供效率! 用于大数据量

大数据:map reduce,核心思想把大任务拆分为小任务,分而治之(分治法的算法思想)

ForkJoin的特点:工作窃取

内部维护的都是双端队列,B线程任务执行完了,A线程任务还没执行完,B线程不会干等着A,B线程会从A线程的另一端窃取任务,从而提升效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class ForkJoinTest extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;

public ForkJoinTest(Long start, Long end) {
this.start = start;
this.end = end;
}


/**
* 计算求和方法
*
* @return
*/
@Override
protected Long compute() {
if ((end - start) < temp) {
long sum = 0;
for (long i = start; start <= end; i++) {
sum += i;
}
return sum;
} else {
//超过临界值,走分支合并计算
Long middle = (start + end) / 2;//中间值
ForkJoinTest task1 = new ForkJoinTest(start, middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinTest task2 = new ForkJoinTest(middle + 1, end);
task2.fork();//拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();//耗时1411
// test2();
// test3();//耗时1129
}

/**
* 普通做法
*/
public static void test1() {
long start = System.currentTimeMillis();
long sum = 0;
for (long i = 1; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 耗时" + (end - start));
}

/**
* ForkJoin分治法方式
*
* @throws ExecutionException
* @throws InterruptedException
*/
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinTest(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = pool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 耗时" + (end - start));
}

/**
* 并行流
*/
public static void test3() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 耗时" + (end - start));
}

}

13.异步回调

Future 设计的初衷:对将来的某个结果进行建模

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 异步调用CompletableFuture
* 异步执行
* 成功回调
* 失败回调
*/
class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// voidTest();
supplyAsyncTest();
}

private static void supplyAsyncTest() throws InterruptedException, ExecutionException {
//有返回值的异步回调supplyAsync
CompletableFuture<Integer> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync=>Integer");
int i = 10 / 0;
return 1000;
});
System.out.println("业务代码");
supplyAsyncFuture.whenComplete((integer, throwable) ->{
System.out.println(integer);//正常的返回结果
System.out.println(throwable);//异常信息java.util.concurrent.CompletionException
}).exceptionally(throwable -> {
System.out.println(throwable.getMessage());
return 1005;
});
System.out.println("supplyAsyncFuture.get() = " + supplyAsyncFuture.get());
}

private static void voidTest() throws InterruptedException, ExecutionException {
//没有返回值的异步回调runAsync
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " runAsync=>void");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

System.out.println("业务代码");
future.get();//获取阻塞执行结果
}
}
-------------本文结束感谢您的阅读-------------