利用redis實現分佈式鎖,快速解決高並發時的線程安全問題

實際工作中,經常會遇到多線程並發時的類似搶購的功能,本篇描述一個簡單的redis分佈式鎖實現的多線程搶票功能。

直接上代碼。首先按照慣例,給出一個錯誤的示范:

我們可以看看,當20個線程一起來搶10張票的時候,會發生什麼事。

package com.tiger.utils; 
public class TestMutilThread {
 
	// 總票量
	public static int count = 10; 
	public static void main(String[] args) {
		statrtMulti();
	}
 
	public static void statrtMulti() {
		for (int i = 1; i <= 20; i++) {
			TicketRunnable tickrunner = new TicketRunnable();
			Thread thread = new Thread(tickrunner, "Thread No: " + i);
			thread.start();
		} 
	}
 
	public static class TicketRunnable implements Runnable {
 
		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName() + " start "
					+ count);
			// TODO Auto-generated method stub
			// logger.info(Thread.currentThread().getName()
			// + " really start" + count);
			if (count <= 0) {
				System.out.println(Thread.currentThread().getName()
						+ " ticket sold out ! No tickets remained!" + count);
				return;
			} else {
				count = count - 1;
				System.out.println(Thread.currentThread().getName()
						+ " bought a ticket,now remaining :" + (count));
			}
		}
	}
}

測試結果,從結果可以看到,票數在不同的線程中已經出現混亂。

Thread No: 2 start 10
Thread No: 6 start 10
Thread No: 4 start 10
Thread No: 5 start 10
Thread No: 3 start 10
Thread No: 9 start 6
Thread No: 1 start 10
Thread No: 1 bought a ticket,now remaining :3
Thread No: 9 bought a ticket,now remaining :4
Thread No: 3 bought a ticket,now remaining :5
Thread No: 12 start 3
Thread No: 5 bought a ticket,now remaining :6
Thread No: 4 bought a ticket,now remaining :7
Thread No: 8 start 7
Thread No: 7 start 8
Thread No: 12 bought a ticket,now remaining :1
Thread No: 14 start 0
Thread No: 6 bought a ticket,now remaining :8
Thread No: 16 start 0
Thread No: 2 bought a ticket,now remaining :9
Thread No: 16 ticket sold out ! No tickets remained!0
Thread No: 14 ticket sold out ! No tickets remained!0
Thread No: 18 start 0
Thread No: 18 ticket sold out ! No tickets remained!0
Thread No: 7 bought a ticket,now remaining :0
Thread No: 15 start 0
Thread No: 8 bought a ticket,now remaining :1
Thread No: 13 start 2
Thread No: 19 start 0
Thread No: 11 start 3
Thread No: 11 ticket sold out ! No tickets remained!0
Thread No: 10 start 3
Thread No: 10 ticket sold out ! No tickets remained!0
Thread No: 19 ticket sold out ! No tickets remained!0
Thread No: 13 ticket sold out ! No tickets remained!0
Thread No: 20 start 0
Thread No: 20 ticket sold out ! No tickets remained!0
Thread No: 15 ticket sold out ! No tickets remained!0
Thread No: 17 start 0
Thread No: 17 ticket sold out ! No tickets remained!0

為瞭解決多線程時出現的混亂問題,這裡給出真正的測試類!!!

真正的測試類,這裡啟動20個線程,來搶10張票。

RedisTemplate 是用來實現redis操作的,由spring進行集成。這裡是使用到瞭RedisTemplate,所以我以構造器的形式在外部將RedisTemplate傳入到測試類中。

MultiTestLock 是用來實現加鎖的工具類。

總票數使用volatile關鍵字,實現多線程時變量在系統內存中的可見性,這點可以去瞭解下volatile關鍵字的作用。

TicketRunnable用於模擬搶票功能。

其中由於lock與unlock之間存在if判斷,為保證線程安全,這裡使用synchronized來保證。

測試類:

package com.tiger.utils; 
import java.io.Serializable; 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate; 
public class MultiConsumer {
	Logger logger=LoggerFactory.getLogger(MultiTestLock.class);	
	private RedisTemplate<Serializable, Serializable> redisTemplate;	
	public MultiTestLock lock;
	//總票量
	public volatile static int count = 10;
 
	public void statrtMulti() {
		lock = new MultiTestLock(redisTemplate);
		for (int i = 1; i <= 20; i++) {
			TicketRunnable tickrunner = new TicketRunnable();
			Thread thread = new Thread(tickrunner, "Thread No: " + i);
			thread.start();
			} 
	}
 
	public class TicketRunnable implements Runnable {
 
		@Override
		public void run() {
			logger.info(Thread.currentThread().getName() + " start "
					+ count);
			// TODO Auto-generated method stub
			if (count > 0) {
//				logger.info(Thread.currentThread().getName()
//						+ " really start" + count);
				lock.lock();
				synchronized (this) {
					if(count<=0){
						logger.info(Thread.currentThread().getName()
								+ " ticket sold out ! No tickets remained!" + count);
						lock.unlock();
						return;
					}else{
						count=count-1;
						logger.info(Thread.currentThread().getName()
								+ " bought a ticket,now remaining :" + (count));
					}
				}
				lock.unlock();
			}else{
				logger.info(Thread.currentThread().getName()
						+ " ticket sold out !" + count);
			}
		}
	}
 
	public RedisTemplate<Serializable, Serializable> getRedisTemplate() {
		return redisTemplate;
	}
 
	public void setRedisTemplate(
			RedisTemplate<Serializable, Serializable> redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
 
	public MultiConsumer(RedisTemplate<Serializable, Serializable> redisTemplate) {
		super();
		this.redisTemplate = redisTemplate;
	}
}

Lock工具類:

我們知道為保證線程安全,程序中執行的操作必須時原子的。redis後續的版本中可以使用set key同時設置expire超時時間。

想起上次去 電信翼支付 面試時,面試官問過一個問題:分佈式鎖如何防止死鎖,問題關鍵在於我們在分佈式中進行加鎖操作時成功瞭,但是後續業務操作完畢執行解鎖時出現失敗。導致分佈式鎖無法釋放。出現死鎖,後續的加鎖無法正常進行。所以這裡設置expire超時時間的目的就是防止出現解鎖失敗的情況,這樣,即使解鎖失敗瞭,分佈式鎖依然會在超時時間過瞭之後自動釋放。

具體在代碼中也有註釋,也可以作為參考。

package com.tiger.utils; 
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; 
import javax.sound.midi.MidiDevice.Info; 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.script.RedisScript; 
 
public class MultiTestLock implements Lock {	
	Logger logger=LoggerFactory.getLogger(MultiTestLock.class);	
	private RedisTemplate<Serializable, Serializable> redisTemplate;	
	public MultiTestLock(RedisTemplate<Serializable, Serializable> redisTemplate) {
		super();
		this.redisTemplate = redisTemplate;
	}
 
	@Override
	public void lock() {
		//這裡使用while循環強制線程進來之後先進行搶鎖操作。隻有搶到鎖才能進行後續操作
		while(true){
			if(tryLock()){
				try {
					//這裡讓線程睡500毫秒的目的是為瞭模擬業務耗時,確保業務結束時之前設置的值正好打到超時時間,
					//實際生產中可能有偏差,這裡需要經驗
					Thread.sleep(500l);
//					logger.info(Thread.currentThread().getName()+" time to awake");
					return;
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}else{
				try {
					//這裡設置一個隨機毫秒的sleep目的時降低while循環的頻率 
					Thread.sleep(new Random().nextInt(200)+100);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
 
	@Override
	public boolean tryLock() {
		//這裡也可以選用transactionSupport支持事務操作
		SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {
			@Override
			public Object execute(RedisOperations operations)
					throws DataAccessException {
				operations.multi();
				operations.opsForValue().setIfAbsent("secret", "answer");
				//設置超時時間要根據業務實際的可能處理時間來,是一個經驗值
				operations.expire("secret", 500l, TimeUnit.MILLISECONDS);
				Object object=operations.exec();
				return object;
			}
		};
		//執行兩部操作,這裡會拿到一個數組值 [true,true],分別對應上述兩部操作的結果,如果中途出現第一次為false則表明第一步set值出錯
		List<Boolean> result=(List) redisTemplate.execute(sessionCallback);
//		logger.info(Thread.currentThread().getName()+" try lock "+ result);
		if(true==result.get(0)||"true".equals(result.get(0)+"")){
			logger.info(Thread.currentThread().getName()+" try lock success");
			return true;
		}else{
			return false;
		}
	}
 
	@Override
	public boolean tryLock(long arg0, TimeUnit arg1)
			throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}
 
	@Override
	public void unlock() {
		//unlock操作直接刪除鎖,如果執行完還沒有達到超時時間則直接刪除,讓後續的線程進行繼續操作。起到補刀的作用,確保鎖已經超時或被刪除
		SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {
			@Override
			public Object execute(RedisOperations operations)
					throws DataAccessException {
				operations.multi();
				operations.delete("secret");
				Object object=operations.exec();
				return object;
			}
		};
		Object result=redisTemplate.execute(sessionCallback);
	} 
 
	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub
	}
 
	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}
	
	public RedisTemplate<Serializable, Serializable> getRedisTemplate() {
		return redisTemplate;
	}
 
	public void setRedisTemplate(
			RedisTemplate<Serializable, Serializable> redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
}

執行結果

可以看到,票數穩步減少,後續沒有搶到鎖的線程餘票為0,無票可搶。

tips:

這其中也出現瞭一個問題,redis進行多部封裝操作時,系統報錯:ERR EXEC without MULTI

後經過查閱發現問題出在:

在spring中,多次執行MULTI命令不會報錯,因為第一次執行時,會將其內部的一個isInMulti變量設為true,後續每次執行命令是都會檢查這個變量,如果為true,則不執行命令。

而多次執行EXEC命令則會報開頭說的”ERR EXEC without MULTI”錯誤。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。

推薦閱讀: