多多线程进阶-JUC并发编程
1.什么是JUC?
java.util工具包、包、类
业务 :普通的业务代码 Thread
Runnable :没有返回值,效率相对Callable低
2.线程和进程
线程、进程,如果不能使用一句话说出来的技术,不扎实
进程:一个程序,QQ.exe、Music.exe程序的集合
一个进程往往包含多个线程,至少包含一个
java默认有几个线程?2个,1.main 2.GC
线程:开一个进程玩一下联盟,操作英雄,语音通话(线程负责)
对java而言开启线程: Thread、Runnable、Callable
java真的能开启线程吗? 开不了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public synchronized void start () { if (threadStatus != 0 ) throw new IllegalThreadStateException (); group.add(this ); boolean started = false ; try { start0(); started = true ; } finally { try { if (!started) { group.threadStartFailed(this ); } } catch (Throwable ignore) { } } } private native void start0 () ;
并发和并行
并发:多个线程操作同一个资源
Cpu一核,模拟出多条线程,天下武功,唯快不破,快速交替
并行:多个人一起走(多个线程一起执行)
1 2 3 4 5 public static void main (String[] args) { int i = Runtime.getRuntime().availableProcessors(); System.out.println("i = " + i); }
并发编程的本质:充分利用CPU的资源
线程有几个状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
wait/sleep区别
1.来自不同的类
wait=>Object
sleep=>Thread
2.关于锁的释放
wait会释放锁,sleep睡觉了,抱着锁睡觉,不会释放锁
3.使用的范围不同
wait必须在同步代码块中
sleep可以在任何地方睡
4.是否需要捕获异常
wait不需要捕获异常
sleep必须捕获异常
3.Lock(重点)
公平锁:十分公平,先来后到(如果先到的线程等待时间较长,就需要一直等待)
非公平锁:十分不公平,可以插队,无参构造默认非公平锁(充分利用cup资源)
1.synchronized是内置关键字,lock是JUC并发包下的类
2.synchronized无法判断锁的状态,lock可以判断锁的状态
3.synchronized会自动释放锁,Lock必须手动释放,如果不释放锁,死锁
4.synchronized线程1(获得锁,阻塞)、线程2(等待,傻傻得等),Lock不一定会一直等待
5.synchronized可重入锁,不可用中断,非公平,lock可重入锁,不可用中断,非公平(有参构造可设置)
6.synchronized适合锁少量代码同步问题,lock适合锁大量同步代码
总结:synchronized类似自动挡汽车,lock类似手动挡汽车,更灵活
锁是什么?如何判断锁的是谁?
4.生产者消费者问题 面试的:单例模式、排序算法、生产者消费者、死锁
生产者消费者synchronized版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class Data { int number = 0 ; public synchronized void increment () throws InterruptedException { if (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { if (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public static void main (String[] args) { Data data = new Data (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); } }
问题存在,A、B、C、D四个线程 虚假唤醒
解决办法,if判断改为while判断
就是用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。
这也就是为什么用while而不用if的原因了,因为线程被唤醒后,执行开始的地方是wait之后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 class Data { int number = 0 ; public synchronized void increment () throws InterruptedException { while (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { while (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public static void main (String[] args) { Data data = new Data (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D" ).start(); } }
JUC 版生产者消费者问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 class Data1 { int number = 0 ; Lock lock = new ReentrantLock (); Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (number != 0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public synchronized void decrement () throws InterruptedException { lock.lock(); try { while (number == 0 ) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main (String[] args) { Data1 data = new Data1 (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D" ).start(); } }
为什么使用JUC版Condition?Condition可以实现精准通知唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 class Data3 { private Lock lock = new ReentrantLock (); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number = 1 ; public void printA () { lock.lock(); try { while (number != 1 ) { condition1.await(); } System.out.println(Thread.currentThread().getName() + "打印:AAAAA" ); number = 2 ; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB () { lock.lock(); try { while (number != 2 ) { condition2.await(); } System.out.println(Thread.currentThread().getName() + "打印:BBBBB" ); number = 3 ; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC () { lock.lock(); try { while (number != 3 ) { condition3.await(); } System.out.println(Thread.currentThread().getName() + "打印:CCCCC" ); number = 1 ; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main (String[] args) { Data3 data3 = new Data3 (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { data3.printA(); } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { data3.printB(); } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { data3.printC(); } }, "C" ).start(); } }
5.8锁问题
小结
new 锁的this ,具体的实例
static 锁的Class唯一的模板
6.集合类不安全 1.List 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class CollectoinTest { public static void main (String[] args) { List<String> list = new CopyOnWriteArrayList <>(); for (int i = 0 ; i < 20 ; i++) { new Thread (() -> { list.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(Thread.currentThread().getName() + list); }, "" + i).start(); } } }
Vector
1 2 3 4 5 6 7 public synchronized boolean add (E e) { modCount++; ensureCapacityHelper(elementCount + 1 ); elementData[elementCount++] = e; return true ; }
CopyOnWriteArrayList
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
使用CopyOnWriteMap需要注意两件事情:
减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。
CopyOnWrite的缺点 CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
内存占用问题 。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap 。
数据一致性问题 。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
2.Set set和List同理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class SetTest { public static void main (String[] args) { Set<String> set = new CopyOnWriteArraySet <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (() -> { set.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(set); }).start(); } } }
hashSet底层是什么?
1 2 3 4 5 6 7 8 9 10 public HashSet () { map = new HashMap <>(); } public boolean add (E e) { return map.put(e, PRESENT)==null ; } private static final Object PRESENT = new Object ();
3.Map ConcurrentHashMap
7.Callable
1.可以有返回值
2.可以抛出异常
3.方法不同 run()/call()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class CallableTest { public static void main (String[] args) throws ExecutionException, InterruptedException { MyThread myThread = new MyThread (); FutureTask task = new FutureTask (myThread); new Thread (task, "A" ).start(); new Thread (task, "B" ).start(); String s = (String) task.get(); System.out.println(s); } } class MyThread implements Callable <String> { @Override public String call () throws Exception { System.out.println(Thread.currentThread().getName() + "----call()" ); return "str" ; } }
细节:
1.有缓存
2.结果可能需要等待,会阻塞
8.常用的辅助类 1.CountDownLatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class CountTest { public static void main (String[] args) throws InterruptedException { CountDownLatch count = new CountDownLatch (6 ); for (int i = 0 ; i < 6 ; i++) { new Thread (() -> { System.out.println(Thread.currentThread().getName() + " GO OUT" ); count.countDown(); }, "" + i).start(); } count.await(); System.out.println("Close Door" ); } }
原理:
count.countDown();//数量-1
count.await();//等待计数器归零,再向下执行
每次有线程调用count.countDown(),数量-1,假设计数器变为0, count.await()就会被唤醒,继续向下执行
2.CyclicBarrier 加法计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class CyclicBarrierTest { public static void main (String[] args) { CyclicBarrier barrier = new CyclicBarrier (7 , () -> { System.out.println("召唤神龙" ); }); for (int i = 0 ; i < 7 ; i++) { int finalI = i; new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + "收集第" + finalI + "颗龙珠" ); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
3.Semaphore 信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class SemaphoreTest { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 1 ; i <= 6 ; i++) { new Thread (() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位" ); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "离开车位" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, "" + i).start(); } } }
原理:
semaphore.acquire();
获得,假如已经满了,等待,等待被释放为止
semaphore.release();
释放,会将当前的信号量释放+1,然后唤醒等待的线程
作用:多个共享资源互斥使用,并发限流,控制最大线程数!
9.ReadWriteLock 读写锁,写的时候只能一个线程写,读的时候可以多个线程共享读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class ReadWriteLockTest { public static void main (String[] args) { MapCache cache = new MapCache (); for (int i = 1 ; i <= 10 ; i++) { int finalI = i; new Thread (() -> { cache.put("" + finalI, finalI); }, "写线程" + i).start(); } for (int i = 1 ; i <= 10 ; i++) { int finalI = i; new Thread (() -> { cache.get("" + finalI); }, "读线程" + i).start(); } } } class MapCache { private volatile Map<String, Object> mapCache = new HashMap <>(); public void put (String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); mapCache.put(key, value); System.out.println(Thread.currentThread().getName() + "写入ok" ); } public Object get (String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = mapCache.get(key); System.out.println(Thread.currentThread().getName() + "读取ok" ); return o; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 class ReadWriteLockTest { public static void main (String[] args) { MapCache cache = new MapCache (); for (int i = 1 ; i <= 10 ; i++) { int finalI = i; new Thread (() -> { cache.put("" + finalI, finalI); }, "写线程" + i).start(); } for (int i = 1 ; i <= 10 ; i++) { int finalI = i; new Thread (() -> { cache.get("" + finalI); }, "读线程" + i).start(); } } } class MapCache { private volatile Map<String, Object> mapCache = new HashMap <>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock (); public void put (String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); mapCache.put(key, value); System.out.println(Thread.currentThread().getName() + "写入ok" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public Object get (String key) { readWriteLock.readLock().lock(); Object o = null ; try { System.out.println(Thread.currentThread().getName() + "读取" + key); o = mapCache.get(key); System.out.println(Thread.currentThread().getName() + "读取ok" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } return o; } }
10.阻塞队列
什么情况下会使用阻塞队列?多线程并发处理,线程池!
四组api
方式
抛出异常
不抛出异常
超时等待
阻塞等待
添加
add
offer
offer(“d”, 2, TimeUnit.SECONDS)
put
移除
remove
poll
poll(2,TimeUnit.SECONDS)
take
检查首元素
element
peek
—
—
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class BlockingQueueTest { public static void main (String[] args) { BlockingQueue queue = new ArrayBlockingQueue (3 ); test1(queue); test2(queue); } private static void test1 (BlockingQueue queue) { System.out.println(queue.add("a" )); System.out.println(queue.add("b" )); System.out.println(queue.add("c" )); System.out.println(queue.element()); } private static void test2 (BlockingQueue queue) { System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class BlockingQueueTest { public static void main (String[] args) { BlockingQueue queue = new ArrayBlockingQueue (3 ); test3(queue); test4(queue); } private static void test3 (BlockingQueue queue) { System.out.println(queue.offer("a" )); System.out.println(queue.offer("b" )); System.out.println(queue.offer("c" )); System.out.println(queue.offer("d" )); System.out.println(queue.peek()); } private static void test4 (BlockingQueue queue) { System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class BlockingQueueTest { public static void main (String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue (3 ); test5(queue); test6(queue); } private static void test5 (BlockingQueue queue) throws InterruptedException { queue.put("a" ); queue.put("b" ); queue.put("c" ); } private static void test6 (BlockingQueue queue) throws InterruptedException { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class BlockingQueueTest { public static void main (String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue (3 ); test7(queue); test8(queue); } private static void test7 (BlockingQueue queue) throws InterruptedException { System.out.println(queue.offer("a" )); System.out.println(queue.offer("b" )); System.out.println(queue.offer("c" )); System.out.println(queue.offer("d" , 2 , TimeUnit.SECONDS)); } private static void test8 (BlockingQueue queue) throws InterruptedException { System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll(2 ,TimeUnit.SECONDS)); } }
SynchronousQueue 同步队列
没有容量,
进去一个元素后,只能等到取出元素后才能再放入元素
put、take
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class SynchronousQueueTest { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue (); new Thread (() -> { try { blockingQueue.put("a" ); System.out.println(Thread.currentThread().getName() + " put a" ); blockingQueue.put("b" ); System.out.println(Thread.currentThread().getName() + " put b" ); blockingQueue.put("c" ); System.out.println(Thread.currentThread().getName() + " put c" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "T1" ).start(); new Thread (() -> { try { TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take()); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take()); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + " get=>" + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T2" ).start(); } }
11.线程池(重点) 线程池:三大方法,7大参数,4种拒绝策略
池化技术
程序的运行,本质:占用系统资源!优化资源的使用=》池化技术
线程池、连接池(JDBC)、内存池、对象池(JVM)……创建和销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就从我这里拿,用完之后还给我
默认大小:2
max
线程池的好处:
1.降低资源的消耗
2.提高响应速度
3.方便管理
线程可以复用了,可以控制最大并发数,管理线程
线程池三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class ExecutorsTest { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "=>ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
7大参数
源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } public ThreadPoolExecutor (int corePoolSize,//核心线程池大小 int maximumPoolSize,//最大核心线程池大小 long keepAliveTime,//超时等待时间,超时了没有人调用就会释放 TimeUnit unit,//超时单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂,创建线程的,一般不动 RejectedExecutionHandler handler//拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
手动创建线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class ExecutorsTest { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 5 , 2 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); try { for (int i = 0 ; i < 9 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "=>ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
1 new ThreadPoolExecutor .AbortPolicy()
1 new ThreadPoolExecutor .CallerRunsPolicy()
1 new ThreadPoolExecutor .DiscardPolicy()
1 new ThreadPoolExecutor .DiscardOldestPolicy()
小结和扩展
最大核心线程池数量怎么设置?
1.cpu密集型,几核就设置几,可以保持CPU的效率最高 2.io密集型 判断你的程序中十分耗费io的线程有多少,一般是线程数量的2倍
了解:cpu密集型、io密集型(调优)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class ExecutorsTest { public static void main (String[] args) { System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool = new ThreadPoolExecutor ( 2 , Runtime.getRuntime().availableProcessors(), 2 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .DiscardOldestPolicy() ); try { for (int i = 0 ; i < 9 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "=>ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
12.ForkJoin
什么是ForkJoin?
ForkJoin在Jdk1.7出现,可以并行执行任务,提供效率! 用于大数据量
大数据:map reduce,核心思想把大任务拆分为小任务,分而治之(分治法的算法思想)
ForkJoin的特点:工作窃取
内部维护的都是双端队列,B线程任务执行完了,A线程任务还没执行完,B线程不会干等着A,B线程会从A线程的另一端窃取任务,从而提升效率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 class ForkJoinTest extends RecursiveTask <Long> { private Long start; private Long end; private Long temp = 10000L ; public ForkJoinTest (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { if ((end - start) < temp) { long sum = 0 ; for (long i = start; start <= end; i++) { sum += i; } return sum; } else { Long middle = (start + end) / 2 ; ForkJoinTest task1 = new ForkJoinTest (start, middle); task1.fork(); ForkJoinTest task2 = new ForkJoinTest (middle + 1 , end); task2.fork(); return task1.join() + task2.join(); } } public static void main (String[] args) throws ExecutionException, InterruptedException { test1(); } public static void test1 () { long start = System.currentTimeMillis(); long sum = 0 ; for (long i = 1 ; i <= 10_0000_0000L ; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 耗时" + (end - start)); } public static void test2 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool (); ForkJoinTask<Long> task = new ForkJoinTest (0L , 10_0000_0000L ); ForkJoinTask<Long> submit = pool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 耗时" + (end - start)); } public static void test3 () { long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0 , 10_0000_0000L ).parallel().reduce(0 , Long::sum); long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 耗时" + (end - start)); } }
13.异步回调
Future 设计的初衷:对将来的某个结果进行建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class FutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException { supplyAsyncTest(); } private static void supplyAsyncTest () throws InterruptedException, ExecutionException { CompletableFuture<Integer> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " supplyAsync=>Integer" ); int i = 10 / 0 ; return 1000 ; }); System.out.println("业务代码" ); supplyAsyncFuture.whenComplete((integer, throwable) ->{ System.out.println(integer); System.out.println(throwable); }).exceptionally(throwable -> { System.out.println(throwable.getMessage()); return 1005 ; }); System.out.println("supplyAsyncFuture.get() = " + supplyAsyncFuture.get()); } private static void voidTest () throws InterruptedException, ExecutionException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + " runAsync=>void" ); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("业务代码" ); future.get(); } }