并发问题,总是一个比较值得探究和有趣的话题, 有时候并发问题看起来会很简单,有时又让人迷惑,不出问题则已,一出问题则难揪诱因, 只能靠个人小心为慎,来尽量降低这种威胁,并发中, 锁则起到了至关重要的作用, 用得好谢天谢地,用得不好只能等待问题出现,所以觉得,还是有必要对锁进行一番整理记忆, 可能其中,我也会头脑发晕,感谢邮件指出。
并发问题总结即为,在多线程环境下, 程序运行出现预料之外的错误,如 状态不一致, 死锁, 甚至程序崩溃等等。
1. 多个线程: 当然得在多线程环境下。
2. 共享变量: 若不存在共享变量(即存在本地变量),多个线程只能访问本地线程栈的数据。
3. 可变变量: 即使存在共享变量,但是该变量不可变(即初始化后,就不会更新状态)。
4. 写操作: 即使共享变量可变,但是多个线程不存在更新操作(仅读操作),此时依然是线程安全的。
类似这种情形,跑在CPU上的一个线程将这个共享对象读到CPU缓存中,然后修改了这个对象。 只要CPU缓存没有被刷新会主存,对象修改后的版本对跑在其它CPU上的线程都是不可见的。 这种方式可能导致每个线程拥有这个共享对象的私有拷贝,每个拷贝停留在不同的CPU缓存中。
synchronized (sharedObj){
// 临界区
}
public class Lock {
private Boolean locked = Boolean.FALSE;
public void lock() throws InterruptedException {
synchronized (this){
while (locked){ // 自旋转,防止假唤醒
this.wait();
}
locked = Boolean.TRUE;
}
}
public void unlock(){
synchronized (this){
if (!Thread.holdsLock(this)){
throw new IllegalMonitorStateException("current thread not locked me.");
}
locked = Boolean.FALSE;
this.notify();
}
}
}
lock.lock();
try{
// do sth.
} finally {
lock.unlock();
}
lock.lock();
lock.lock();
lock.unlock();
lock.unlock();
public class ReentrantLock {
/**
* 是否已被加锁
*/
private Boolean locked = Boolean.FALSE;
/**
* 当前加锁线程
*/
private Thread locking = null;
/**
* 记录被同一线程加锁几次
*/
private Integer locks = 0;
public void lock() throws InterruptedException {
synchronized (this){
while (locked && (Thread.currentThread() != locking)){
this.wait();
}
locked = Boolean.TRUE;
locking = Thread.currentThread();
++ locks;
}
}
public void unlock(){
synchronized (this){
if (Thread.currentThread() != locking){
throw new IllegalMonitorStateException("current thread not locked me.");
}
if (--locks == 0){
locked = Boolean.FALSE;
locking = null;
this.notify();
}
}
}
}
public class FairLock {
/**
* 是否已被加锁
*/
private Boolean locked = Boolean.FALSE;
/**
* 当前加锁线程
*/
private Thread locking = null;
/**
* 记录被同一线程加锁几次
*/
private Integer locks = 0;
/**
* 等待线程队列
*/
private List<Object> waitings = new ArrayList<>();
public void lock() throws InterruptedException {
// 使用占位符来进行线程等待和唤醒
Object placeholder = new Object();
synchronized (this){
// 占位(优先进入等待队列的线程,优先被执行)
waitings.add(placeholder);
}
synchronized (this) {
while ((locked && Thread.currentThread() != locking) // 是否已经被锁且可重入
|| waitings.get(0) != placeholder){ // 是否为最先请求锁的线程
System.out.println(Thread.currentThread().getName() + " is waiting.");
this.wait();
}
// 线程不需要等待
System.out.println(Thread.currentThread().getName() + " is locking.");
locked = Boolean.TRUE;
locking = Thread.currentThread();
++ locks;
// 加锁成功后,移除占位符
waitings.remove(placeholder);
}
}
public void unlock(){
synchronized (this){
if (Thread.currentThread() != locking){
throw new IllegalMonitorStateException("current thread not locked me.");
}
if (--locks == 0){
System.out.println(Thread.currentThread().getName() + " is unlocking.");
locked = Boolean.FALSE;
locking = null;
// 通知所有等待的线程
this.notifyAll();
}
}
}
}
public class FairLock {
/**
* 是否已被加锁
*/
private Boolean locked = Boolean.FALSE;
/**
* 当前加锁线程
*/
private Thread locking = null;
/**
* 记录被同一线程加锁几次
*/
private Integer locks = 0;
/**
* 等待线程队列
*/
private List<PlaceHolder> waitings = new ArrayList<>();
public void lock() throws InterruptedException {
// 使用占位符来进行线程等待和唤醒
PlaceHolder placeholder = new PlaceHolder();
synchronized (this){
// 占位(优先进入等待队列的线程,优先被执行)
waitings.add(placeholder);
}
Boolean needWait = Boolean.TRUE;
while (needWait){
synchronized (this) {
needWait = (locked && Thread.currentThread() != locking) // 是否已经被锁且可重入
|| waitings.get(0) != placeholder; // 是否为最先请求锁的线程
if (!needWait){
// 线程不需要等待
System.out.println(Thread.currentThread().getName() + " is locking.");
locked = Boolean.TRUE;
locking = Thread.currentThread();
++ locks;
// 加锁成功后,移除占位符
waitings.remove(placeholder);
return;
}
}
System.out.println(Thread.currentThread().getName() + " is waiting.");
// 线程等待在自己占位符上, 不是this
placeholder.doWait();
}
}
public void unlock(){
synchronized (this){
if (Thread.currentThread() != locking){
throw new IllegalMonitorStateException("current thread not locked me.");
}
if (--locks == 0){
System.out.println(Thread.currentThread().getName() + " is unlocking.");
locked = Boolean.FALSE;
locking = null;
if (waitings.size() > 0){
// 通知优先等待的线程
waitings.get(0).doNotify();
}
}
}
}
class PlaceHolder {
private boolean isNotified = false;
public synchronized void doWait() throws InterruptedException {
while (!isNotified) {
this.wait();
}
this.isNotified = false;
}
public synchronized void doNotify() {
this.isNotified = true;
this.notify();
}
public boolean equals(Object o) {
return this == o;
}
}
}
public class ReadWriteLock {
/**
* 读线程数
*/
private Integer readCount = 0;
/**
* 写线程数
*/
private Integer writeCount = 0;
/**
* 写请求数
*/
private Integer writeRequests = 0;
public void lockRead() throws InterruptedException {
synchronized (this){
while (writeRequests > 0 || writeCount > 0){
// 有写线程或写请求都应该等待
System.out.println(Thread.currentThread().getName() + " waiting read");
wait();
}
System.out.println(Thread.currentThread().getName() + " locking read");
++ readCount;
}
}
public void unlockRead(){
synchronized (this){
System.out.println(Thread.currentThread().getName() + " unlock read");
-- readCount;
notifyAll();
}
}
public void lockWrite() throws InterruptedException {
synchronized (this){
++ writeRequests; // 记录写请求, 保证写请求优先
while (writeCount > 0 || readCount > 0){
System.out.println(Thread.currentThread().getName() + " waiting write");
wait();
}
System.out.println(Thread.currentThread().getName() + " locking write");
-- writeRequests;
++ writeCount;
}
}
public void unlockWrite(){
synchronized (this){
System.out.println(Thread.currentThread().getName() + " unlock write");
-- writeCount;
notifyAll();
}
}
}
/**
* <读线程, 读线程请求数>
*/
private final Map<Thread, Integer> readings = new HashMap<>();
public void lockRead() throws InterruptedException {
synchronized (this){
Thread current = Thread.currentThread();
while (needWaitRead()){
// 有写线程或写请求都应该等待
System.out.println(current.getName() + " waiting read");
wait();
}
System.out.println(current.getName() + " locking read");
if (readings.get(current) != null){
readings.put(current, readings.get(current) + 1);
} else {
readings.put(current, 1);
}
}
}
/**
* 是否需要等待读锁
*/
private Boolean needWaitRead(){
if(writeCount > 0) {
// 有写线程已加锁
return Boolean.TRUE;
}
if(readings.get(Thread.currentThread()) != null) {
// 当前线程已经获取过读锁
return Boolean.FALSE;
}
if(writeRequests > 0) {
// 有写请求
return Boolean.TRUE;
}
return Boolean.FALSE;
}
public void unlockRead(){
synchronized (this){
Thread current = Thread.currentThread();
System.out.println(current.getName() + " unlock read");
Integer readCount = readings.get(current);
if (readCount == 1){
readings.remove(current);
notifyAll();
} else {
readings.put(current, --readCount);
}
}
}
public void lockWrite() throws InterruptedException {
synchronized (this){
Thread current = Thread.currentThread();
++ writeRequests; // 记录写请求, 保证写请求优先
while (needWaitWrite()){
System.out.println(current.getName() + " waiting write");
wait();
}
System.out.println(current.getName() + " locking write");
-- writeRequests;
++ writeCount;
writing = current;
}
}
private Boolean needWaitWrite(){
if (readings.size() > 0){
// 已经有读线程
return Boolean.TRUE;
}
if (writing == null){
// 还没有写线程
return Boolean.FALSE;
}
if (Thread.currentThread() == writing){
// 当前线程已经获得写锁
return Boolean.FALSE;
} else {
// 当前线程没有获得写锁
return Boolean.TRUE;
}
}
public void unlockWrite(){
synchronized (this){
System.out.println(Thread.currentThread().getName() + " unlock write");
if(-- writeCount == 0){
writing = null;
notifyAll();
}
}
}
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
该方法虽然没有进行加锁操作,但通过compareAndSet()方法(该方法比较current与变量内存中的值,若相等,将变量值设置为next,虽然是先比较后设置"两步"操作, 但该方法利用CPU的compare-and-swap指令达到了原子操作,所以不会有线程安全问题),这样就能在不加锁的情况下实现线程安全,也叫非阻塞算法。 还有一种叫复制后再更新的非阻塞线程安全的实现方式,如CopyOnWriteArrayList, 利用volatile关键字和更新前拷贝副本的方式实现线程安全。
而悲观锁则相反,每次进行访问都会有加锁请求,直到获取到对象锁才进行后续操作,如上面实现的各种锁。