分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型
阿新 • • 發佈:2019-01-03
寫在前面
前面一文主要簡單介紹了JAVA動態代理基本原理,這也是實現RPC的基本知識,這裡我們運用Socket簡單實現一個遠端過程呼叫,方便大家理解RPC的基本原理,希望對大家有所幫助。
新建People介面類與Man實現類
介面類
public interface People {
public String sayHello(String name);
}
實現類
public class Man implements People {
public String sayHello(String name) {
System.out.println("call Man sayHello " );
return "Hello "+name;
}
}
這兩個類都比較簡單,沒什麼好說的了。
新建生產者執行緒類ProducerTask 和處理Socket服務類RPCProducer
生產者處理連線執行緒類
public class ProducerTask implements Runnable {
Socket client=null;
public ProducerTask(Socket client){
this.client=client;
}
public void run() {
ObjectInputStream input=null ;
ObjectOutputStream output=null;
try {
input=new ObjectInputStream(client.getInputStream());
String interFaceName=input.readUTF();
Class<?> service=Class.forName(interFaceName);
String methodName=input.readUTF();
Class<?> [] paramterTypes= (Class<?>[]) input.readObject();
Object [] arguments= (Object[]) input.readObject();
Method method=service.getMethod(methodName,paramterTypes);
Object result=method.invoke(service.newInstance(),arguments);
System.out.println("遠端呼叫物件名為:" +service.getName());
System.out.println("遠端呼叫方法名為:"+methodName);
for (Object obj:arguments){
System.out.println("遠端呼叫引數為:"+obj);
}
output=new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
output.close();
input.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
ProducerTask 類主要是處理請求子執行緒,在該類中通過Socket連接獲取消費者傳送過來的 物件名、方法名、方法引數、引數列表,再利用反射機制,完成方法的呼叫,並向消費者返回呼叫的結果。
處理Socket服務類
public class RPCProducer {
static Executor executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void exproter(String hostName,int port) throws Exception {
ServerSocket server=new ServerSocket();
server.bind(new InetSocketAddress(hostName,port));
try {
while (true){
System.out.println("get client call");
executor.execute(new ProducerTask(server.accept()));
}
}catch (Exception e){
e.printStackTrace();
}finally {
server.close();
}
}
}
該類是生產者監聽連線類,這裡面主要使用了執行緒池,每當一個連線過來時,將執行緒池中增加一個執行緒,並讓子執行緒去處理連線請求。
新建消費者動態代理類和消費者類
消費者動態代理類
public class ConsumerHandler<T> implements InvocationHandler {
private Class<?> object;
private InetSocketAddress address;
public ConsumerHandler(Class<?> object, InetSocketAddress address){
this.object=object;
this.address=address;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println(" call peopleProxyHandler invoke ");
System.out.println("呼叫物件為:"+object.getClass().getName());
System.out.println("呼叫方法為:"+method.getName());
for (Object obj:args){
System.out.println("引數為:"+obj.toString());
}
Class cl=Class.forName(object.getClass().getName());
Socket socket=null;
ObjectOutputStream output=null;
ObjectInputStream input=null;
socket=new Socket();
socket.connect(address);
output=new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(object.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
input =new ObjectInputStream(socket.getInputStream());
return input.readObject();
}
}
該類實現了InvocationHandler 介面,主要將對本地方法的呼叫,代理到此類中,在invoke方法中,獲取呼叫方法的物件名、方法名、引數型別及引數列表,再通過連線生產者的Socket服務,將這些引數傳送給生產者,並返回生產者返回的結果。
消費者類
public class RPCConsumer<T> {
public T improter(Class<?> serviceClass, InetSocketAddress address){
ConsumerHandler<T> consumerHandler=new ConsumerHandler<T>(serviceClass,address);
Class<?> intf=serviceClass.getInterfaces()[0];
Class<?> [] interfaces=new Class<?>[]{intf};
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),interfaces,consumerHandler);
}
}
該類主要是返回一個代理物件,完成動態代理功能
新建生產者和服務者測試類
生產者測試類
public class ProducerTest {
public static void main(String [] rags){
new Thread(new Runnable() {
public void run() {
try {
RPCProducer.exproter("localhost",8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
這是生產者測試類,主要在開啟生產者服務,監聽埠為8080
消費者測試類
public static void main(String [] rags){
RPCConsumer<People> peopleRPCConsumer=new RPCConsumer<People>();
People people=peopleRPCConsumer.improter(Man.class,new InetSocketAddress("localhost",8080));
System.out.println(people.sayHello("zhaochao"));
}
消費者測試類,通過peopleRPCConsumer 生成代理物件people,呼叫people的方法,完成遠端呼叫
結果
生產者輸出
get client call
get client call
call Man sayHello
遠端呼叫物件名為:com.zhaocaho.proxy.Man
遠端呼叫方法名為:sayHello
遠端呼叫引數為:zhaochao
消費者輸出
call peopleProxyHandler invoke
呼叫物件為:java.lang.Class
呼叫方法為:sayHello
引數為:zhaochao
Hello zhaochao