线程间通信-爱代码爱编程
1 wait/notify机制
1.1 机制原理
拥有相同锁的线程才可以实现wait/notify机制。它们都是Object类中的方法。wai使线程暂停运行,而notify通知暂停的线程继续运行。
wait是使执行该方法的线程进行等待,在wait()所在的代码处暂停执行,并释放锁,直到接到通知或被中断为止。
notify()也要在同步方法或同步块中调用,在调用前线程必须获得锁。执行notify()方法后,当前线程不会马上释放该锁,且等待状态的线程也并不能马上获取该对象锁。
1.2 wait(long)
等待某一时间内是否有线程对锁进行唤醒,如果超过这一个时间则自动唤醒。能继续向下运行的前提是再次持有锁。
public class WaitLong {
private static final String lock = "lock";
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(() -> {
synchronized (lock) {
System.out.println("waitThread begin");
long begin = System.currentTimeMillis();
try {
lock.wait(2000); // 等待两秒继续运行
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("waitThread end。耗时:" + (System.currentTimeMillis() - begin) / 1000 + "s");
}
});
Thread thread = new Thread(() -> {
synchronized (lock) {
System.out.println("其他线程运行中");
for (int i = 0; i < 100000000; i++) {
String s = new String("消化时间,不用sleep");
System.out.print(s.replace("消化时间,不用sleep",""));
}
System.out.println("其他线程运行结束 lock锁释放");
}
});
waitThread.start();
TimeUnit.SECONDS.sleep(1);
thread.start();
}
}
图 wait(long) 在超过设定时间后,并不一定能继续运行,还需重新获得锁
1.3 生产者/消费者
1.3.1 if 与 while
public class IfAndWhile {
private static String lock = "lock";
private static AtomicBoolean ready = new AtomicBoolean(false); //是否生产好
private static AtomicLong count = new AtomicLong();
private static class ProductThread extends Thread {
@Override
public void run() {
while (true) {
synchronized (lock) {
if (ready.get()) {
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
lock.notifyAll();
count.getAndIncrement();
ready.set(true);
}
}
}
}
private static class ConsumerThread extends Thread {
@Override
public void run() {
while (true) {
synchronized (lock) {
if (!ready.get()) {
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
lock.notifyAll();
System.out.println("消费:" + count);
ready.set(false);
}
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
new ProductThread().start();
}
new ConsumerThread().start();
}
}
图 多生产者单消费者运行结果部分截图
消费数字非递增,出现生产多次消费一次的错误情况。 这是因为在判断是否生产或消费的时候,用的是if。
图 上段代码产生错误的原因
分析:代码在wait()处被唤醒后,会继续向下执行,并不会再判断if里面的条件。如果,将if判断改成while,则wait()处被唤醒后,会再判断一下while里面的条件,如果条件正确,才会跳出循环。
图 修改后的代码
1.4 管道流中传递字节/字符流
JDK提供了4个类来使线程间可以进行通信:PipedInputStream、PipedOutputStream、PipedReader和PipedWriter。
public class PipedCommunication {
public static void main(String[] args) throws IOException, InterruptedException {
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
inputStream.connect(outputStream);
Thread writeThread = new Thread(() -> {
String content = "hello Piped!";
System.out.println("write:");
try {
outputStream.write(content.getBytes());
outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Thread readThread = new Thread(() -> {
byte bytes[] = new byte[20];
System.out.println("read:");
int readLength = 0;
try {
while ((readLength = inputStream.read(bytes)) != -1) {
System.out.print(new String(bytes,0,readLength));
}
inputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
writeThread.start();
TimeUnit.SECONDS.sleep(2);
readThread.start();
}
}
图 管道流运行结果
2 join的使用
使所属的线程对象x正常执行run()方法中的任务,而使当前线程z进行无限期阻塞,等待线程x销毁后再继续执行线程z后面的代码。
图 Thread类join关键源码
Thread里的join方法是一个同步方法(锁是当前thread对象),当thread这个对象还存活的时候,join方法使当前这个线程进入wait状态。当线程执行完run方法后,一定会自动执行notifyAll()方法。
2.1 join(long)
直到获得锁后才会继续向下执行。
public class JoinLong {
public static void main(String[] args) throws InterruptedException {
Thread lockThread = new Thread(() -> {
System.out.println("lockThread start");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("lockThread end");
});
Thread thread = new Thread(() -> {
synchronized (lockThread) {
System.out.println("其他线程启动");
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("其他线程终止");
}
});
lockThread.start();
TimeUnit.SECONDS.sleep(1);
thread.start();
long begin = System.currentTimeMillis();
lockThread.join(4000); //预期最多过4秒,就会继续执行
System.out.println("main 进程结束。 join耗时:" + (System.currentTimeMillis() - begin) / 1000 + "s");
}
}
图 join(long)需要获得锁后才会继续向下执行运行结果
3 ThreadLocal的使用
将数据放入当前线程对象的Map中。
3.1 源码分析
图 ThreadLocal set方法
图 ThreadLocal createMap方法
Thread 中包含:ThreadLocal.ThreadLocalMap threadLocals = null;
图 ThreadLocaly与Thread的关系
3.2 使用remove()方法的必要性
ThreadLocalMap中的静态内置类Entry是弱引用类型。
图 ThreadLocalMap部分源码
弱引用的特点是,垃圾回收器扫描发现弱引用对象,不管内存是否足够,都会回收弱引用的对象。也就是只要执行gc操作,ThreadLocal对象就立即销毁,代表key的ThreadLocal对象会随着gc操作而消耗,释放内存空间,但是value值却不会随着gc操作而销毁。
public class ThreadLocalRemove {
private static class MyThreadLocal extends ThreadLocal {
private static AtomicInteger count = new AtomicInteger(0);
@Override
protected void finalize() throws Throwable {
System.out.println("MyThreadLocal finalize() " + count.addAndGet(1));
}
}
private static class UserInfo {
private static AtomicInteger count = new AtomicInteger(0);
@Override
protected void finalize() throws Throwable {
System.out.println("-------Userinfo protected void finalize() " + count.addAndGet(1));
}
}
public static void main(String[] args) {
for (int i = 0; i < 9000; i++) {
MyThreadLocal threadLocal = new MyThreadLocal();
UserInfo userInfo = new UserInfo();
threadLocal.set(userInfo);
threadLocal.remove();
}
MyThreadLocal threadLocal = new MyThreadLocal();
for (int i = 0; i < 90000000; i++) {
String newStr = new String("" + (i + 1));
Thread.yield();
Thread.yield();
Thread.yield();
Thread.yield();
}
System.out.println("main 结束:" + threadLocal);
}
}
图 程序运行结果部分截图
图 程序运行过程的内存监控
需要及时使用remove方法删除不用的值,否则value不会随着gc操作而销毁,最后可能会内存溢出。
图 ThreadLocalRemove修改后的代码
3.3 InheritableThreadLocal
可以在子线程中取得父线程继承下来的值。
3.3.1 源码分析
图 InheritableThreadLocal源码
Thread中包含:ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
在创建线程时,子线程主动引用父线程的inheritableThreadLocals的值。
图 Thread init方法部份源码
其中 ThreadLocal.createInheritedMap源码为:
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
3.3.2子线程继承的是父线程旧值,父线程新值不可继承。
public class InheritableThreadLocalTest {
private static InheritableThreadLocal<Object> inheritableThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
inheritableThreadLocal.set("main线程存储的值");
Thread thread1 = new Thread(() -> {
System.out.println("thread1 读取:" + inheritableThreadLocal.get());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("thread1 2s后读取:" + inheritableThreadLocal.get());
});
thread1.start();
TimeUnit.SECONDS.sleep(2);
inheritableThreadLocal.set("新值由main线程设置");
System.out.println("main 设置新值:" + inheritableThreadLocal.get());
}
}
图 子线程无法继承父线程新值
3.3.3 重写childValue方法
public class InheritableThreadLocalTest2 {
private static class MyInheritableThreadLocal extends InheritableThreadLocal {
@Override
protected Object childValue(Object parentValue) {
return Thread.currentThread().getName() + "存储的:" + super.childValue(parentValue);
}
}
private static MyInheritableThreadLocal local = new MyInheritableThreadLocal();
public static void main(String[] args) {
local.set("我是main");
new Thread(() -> {
System.out.println(local.get());
}).start();
}
}
图 重写childValue的运行结果