Fix client connection with the proxy

This commit is contained in:
Yohan Simard 2021-01-27 00:32:36 +01:00
parent ebd292e6eb
commit f41c7baa8b
20 changed files with 425 additions and 202 deletions

View file

@ -5,7 +5,10 @@ import fr.insa.clavardator.lib.util.Log;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import static fr.insa.clavardator.client.network.NetUtil.isLocalAddress; import static fr.insa.clavardator.client.network.NetUtil.isLocalAddress;
@ -104,16 +107,22 @@ public class NetDiscoverer {
@Override @Override
public void run() { public void run() {
byte[] buf = broadcastMessage.getBytes(); byte[] buf = broadcastMessage.getBytes();
DatagramSocket broadcastSocket = null;
try { try {
for (InetAddress broadcastAddr : NetUtil.listAllBroadcastAddresses()) { for (InetAddress broadcastAddr : NetUtil.listAllBroadcastAddresses()) {
DatagramSocket broadcastSocket = new DatagramSocket(); broadcastSocket = new DatagramSocket();
broadcastSocket.setBroadcast(true); broadcastSocket.setBroadcast(true);
broadcastSocket.send(new DatagramPacket(buf, buf.length, broadcastAddr, DISCOVERY_PORT)); broadcastSocket.send(new DatagramPacket(buf, buf.length, broadcastAddr, DISCOVERY_PORT));
Log.v(this.getClass().getSimpleName(), "Broadcast sent on address " + broadcastAddr.toString()); Log.v(this.getClass().getSimpleName(), "Broadcast sent on address " + broadcastAddr.toString());
broadcastSocket.close();
} }
} catch (IOException e) { } catch (IOException e) {
Log.e(this.getClass().getSimpleName(), "Error sending broadcast", e); Log.e(this.getClass().getSimpleName(), "Error sending broadcast", e);
errorCallback.onError(e); errorCallback.onError(e);
} finally {
if (broadcastSocket != null) {
broadcastSocket.close();
}
} }
} }
} }
@ -138,15 +147,11 @@ public class NetDiscoverer {
@Override @Override
public void run() { public void run() {
try { try {
socket = new DatagramSocket(DISCOVERY_PORT);
byte[] buffer = new byte[BROADCAST_BUFFER_SIZE];
while (!shouldStop) { while (!shouldStop) {
socket = new DatagramSocket(null);
socket.setOption(StandardSocketOptions.SO_REUSEPORT, true);
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.bind(new InetSocketAddress((InetAddress) null, DISCOVERY_PORT));
byte[] buffer = new byte[BROADCAST_BUFFER_SIZE];
DatagramPacket receivedPacket = new DatagramPacket(buffer, BROADCAST_BUFFER_SIZE); DatagramPacket receivedPacket = new DatagramPacket(buffer, BROADCAST_BUFFER_SIZE);
socket.receive(receivedPacket); socket.receive(receivedPacket);
Log.v(this.getClass().getSimpleName(), "Broadcast received from ip " + receivedPacket.getAddress().toString()); Log.v(this.getClass().getSimpleName(), "Broadcast received from ip " + receivedPacket.getAddress().toString());
callback.onBroadcastReceived(receivedPacket.getAddress(), new String(receivedPacket.getData())); callback.onBroadcastReceived(receivedPacket.getAddress(), new String(receivedPacket.getData()));
@ -156,6 +161,9 @@ public class NetDiscoverer {
Log.e(this.getClass().getSimpleName(), "Error receiving broadcast message", e); Log.e(this.getClass().getSimpleName(), "Error receiving broadcast message", e);
errorCallback.onError(e); errorCallback.onError(e);
} }
} finally {
if (socket != null)
socket.close();
} }
} }
@ -186,8 +194,7 @@ public class NetDiscoverer {
@Override @Override
public void run() { public void run() {
byte[] buf = message.getBytes(); byte[] buf = message.getBytes();
try { try (DatagramSocket responseSocket = new DatagramSocket()) {
DatagramSocket responseSocket = new DatagramSocket();
responseSocket.send(new DatagramPacket(buf, buf.length, address, RESPONSE_PORT)); responseSocket.send(new DatagramPacket(buf, buf.length, address, RESPONSE_PORT));
Log.v(this.getClass().getSimpleName(), "Broadcast response sent to ip " + address.toString()); Log.v(this.getClass().getSimpleName(), "Broadcast response sent to ip " + address.toString());
} catch (IOException e) { } catch (IOException e) {
@ -217,12 +224,8 @@ public class NetDiscoverer {
@Override @Override
public void run() { public void run() {
try { try {
socket = new DatagramSocket(RESPONSE_PORT);
while (!shouldStop) { while (!shouldStop) {
socket = new DatagramSocket(null);
socket.setOption(StandardSocketOptions.SO_REUSEPORT, true);
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.bind(new InetSocketAddress((InetAddress) null, RESPONSE_PORT));
byte[] buffer = new byte[RESPONSE_BUFFER_SIZE]; byte[] buffer = new byte[RESPONSE_BUFFER_SIZE];
DatagramPacket receivedPacket = new DatagramPacket(buffer, RESPONSE_BUFFER_SIZE); DatagramPacket receivedPacket = new DatagramPacket(buffer, RESPONSE_BUFFER_SIZE);
socket.receive(receivedPacket); socket.receive(receivedPacket);

View file

@ -71,7 +71,9 @@ public class PeerHandshake {
private void sendUsernameTaken(TcpConnection thisConnection) { private void sendUsernameTaken(TcpConnection thisConnection) {
Log.v(this.getClass().getSimpleName(), "Received username request using current username"); Log.v(this.getClass().getSimpleName(), "Received username request using current username");
thisConnection.send(new UsernameTakenException("Username taken", userInformation.id), this::closeConnection, null); UsernameTakenException exception = new UsernameTakenException("Username taken",
CurrentUser.getInstance().getId(), userInformation.id);
thisConnection.send(exception, this::closeConnection, null);
} }

View file

@ -34,23 +34,20 @@ public class InsaPresence implements Presence {
private final String path; private final String path;
private final int presencePort; private final int presencePort;
private final int proxyPort;
private TcpConnection presenceConnection; private TcpConnection presenceConnection;
private TcpConnection proxyConnection; private final Proxy proxy;
public InsaPresence(String path, int presencePort, int proxyPort) { public InsaPresence(String path, int presencePort, int proxyPort) {
this.path = path; this.path = path;
this.presencePort = presencePort; this.presencePort = presencePort;
this.proxyPort = proxyPort; this.proxy = new Proxy(path, proxyPort);
} }
@Override @Override
public void subscribe(ParametrizedCallback<ArrayList<UserInformation>> callback, ErrorCallback errorCallback) { public void subscribe(ParametrizedCallback<ArrayList<UserInformation>> callback, ErrorCallback errorCallback) {
if (!isConnected()) { if (!isConnected()) {
connectToPresence( connectToPresence(
() -> connectToProxy(() -> { () -> proxy.connect(() -> {
sendSubscribeMessage(errorCallback); sendSubscribeMessage(errorCallback);
receiveSubscribeNotifications(callback, errorCallback); receiveSubscribeNotifications(callback, errorCallback);
}, errorCallback), }, errorCallback),
@ -130,26 +127,6 @@ public class InsaPresence implements Presence {
} }
} }
/**
* Connects to the presence proxy by TCP
*
* @param callback Called when connection is successful
* @param errorCallback Called on connection error
*/
private void connectToProxy(SimpleCallback callback, ErrorCallback errorCallback) {
try {
proxyConnection = new TcpConnection(InetAddress.getByName(path), proxyPort,
(newConnection) -> newConnection.send(new UserInformation(CurrentUser.getInstance()), () -> {
if (callback != null) {
callback.call();
}
}, errorCallback),
errorCallback);
} catch (UnknownHostException e) {
Log.e(getClass().getSimpleName(), "Could not connect to presence proxy", e);
}
}
/** /**
* Closes the given connection * Closes the given connection
*/ */
@ -167,8 +144,7 @@ public class InsaPresence implements Presence {
private boolean isConnected() { private boolean isConnected() {
return presenceConnection != null && return presenceConnection != null &&
presenceConnection.isOpen() && presenceConnection.isOpen() &&
proxyConnection != null && proxy.isConnected();
proxyConnection.isOpen();
} }
/** /**
@ -177,13 +153,12 @@ public class InsaPresence implements Presence {
private void disconnect() { private void disconnect() {
Log.v(this.getClass().getSimpleName(), "Disconnecting presence server"); Log.v(this.getClass().getSimpleName(), "Disconnecting presence server");
closeConnection(presenceConnection); closeConnection(presenceConnection);
closeConnection(proxyConnection);
presenceConnection = null; presenceConnection = null;
proxyConnection = null; proxy.close();
} }
@Override @Override
public TcpConnection getProxyConnection() { public Proxy getProxy() {
return proxyConnection; return proxy;
} }
} }

View file

@ -1,6 +1,5 @@
package fr.insa.clavardator.client.server; package fr.insa.clavardator.client.server;
import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.users.UserInformation; import fr.insa.clavardator.lib.users.UserInformation;
import fr.insa.clavardator.lib.util.ErrorCallback; import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.ParametrizedCallback; import fr.insa.clavardator.lib.util.ParametrizedCallback;
@ -36,12 +35,12 @@ public interface Presence {
void unsubscribe(SimpleCallback callback, @Nullable ErrorCallback errorCallback); void unsubscribe(SimpleCallback callback, @Nullable ErrorCallback errorCallback);
/** /**
* Gets a connection to the proxy. * Gets the proxy.
* This can be used to initialize a * This can be used to initialize a
* {@link fr.insa.clavardator.client.users.PeerUser Peeruser} * {@link fr.insa.clavardator.client.users.PeerUser Peeruser}
* and send messages like on a local network. * and send messages like on a local network.
* *
* @return The server address * @return The proxy
*/ */
TcpConnection getProxyConnection(); Proxy getProxy();
} }

View file

@ -0,0 +1,86 @@
package fr.insa.clavardator.client.server;
import fr.insa.clavardator.client.users.CurrentUser;
import fr.insa.clavardator.lib.message.SenderInfo;
import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.users.UserInformation;
import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.Log;
import fr.insa.clavardator.lib.util.SimpleCallback;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
public class Proxy {
private final String path;
private final int proxyPort;
TcpConnection proxyConnection;
Map<String, TcpConnection.MessageReceivedCallback> callbackMap = new HashMap<>();
private boolean receiving = false;
public Proxy(String path, int proxyPort) {
this.path = path;
this.proxyPort = proxyPort;
}
public void connect(SimpleCallback callback, ErrorCallback errorCallback) {
try {
proxyConnection = new TcpConnection(InetAddress.getByName(path), proxyPort,
(newConnection) -> newConnection.send(new UserInformation(CurrentUser.getInstance()), () -> {
if (callback != null) {
callback.call();
}
}, errorCallback),
errorCallback);
} catch (UnknownHostException e) {
Log.e(getClass().getSimpleName(), "Could not connect to the proxy", e);
}
}
public void send(Serializable message, @Nullable SimpleCallback callback, @Nullable ErrorCallback errorCallback) {
proxyConnection.send(message, callback, errorCallback);
}
public void receive(String userId, TcpConnection.MessageReceivedCallback callback, ErrorCallback errorCallback) {
Log.v(getClass().getSimpleName(), "Registering a new receive request from " + userId);
callbackMap.put(userId, callback);
if (!receiving) {
proxyConnection.receive(this::onMessageReceived, errorCallback);
receiving = true;
}
}
private void onMessageReceived(Object msg) throws IOException {
if (msg instanceof SenderInfo) {
TcpConnection.MessageReceivedCallback callback = callbackMap.get(((SenderInfo) msg).getSenderId());
if (callback != null) {
callback.onMessageReceived(msg);
} else {
Log.w(getClass().getSimpleName(),
"Nobody is waiting for messages from " + ((SenderInfo) msg).getSenderId());
}
} else {
Log.e(getClass().getSimpleName(),
"Proxy sent a message that does not contain sender information: " +
msg.getClass().getSimpleName());
}
}
public boolean isConnected() {
return proxyConnection != null &&
proxyConnection.isOpen();
}
public void close() {
proxyConnection.close();
}
public void disconnectUser(String id) {
callbackMap.remove(id);
}
}

View file

@ -287,12 +287,12 @@ public class MainController implements Initializable {
if (presenceServer != null && online) { if (presenceServer != null && online) {
presenceServer.subscribe( presenceServer.subscribe(
param -> userList.onReceivePresenceNotification( param -> userList.onReceivePresenceNotification(
param, param,
presenceServer.getProxyConnection()), presenceServer.getProxy()),
(e) -> Log.v( (e) -> Log.v(
getClass().getSimpleName(), getClass().getSimpleName(),
"Error subscribing to presence server", "Error subscribing to presence server",
e)); e));
} }
} }

View file

@ -0,0 +1,52 @@
package fr.insa.clavardator.client.users;
import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.Log;
import fr.insa.clavardator.lib.util.SimpleCallback;
import org.jetbrains.annotations.Nullable;
import java.io.EOFException;
import java.io.Serializable;
public class DirectPeerConnection extends PeerConnection {
private final TcpConnection connection;
public DirectPeerConnection(TcpConnection connection, PeerUser user) {
super(user);
this.connection = connection;
}
@Override
protected void send(Serializable message, SimpleCallback callback, @Nullable ErrorCallback errorCallback) {
connection.send(message, callback, errorCallback);
}
@Override
protected void receive(TcpConnection.MessageReceivedCallback callback, ErrorCallback errorCallback) {
connection.receive(callback,
e -> {
disconnect();
if (!(e instanceof EOFException)) {
Log.e(this.getClass().getSimpleName(), "Error receiving message from " + user.getId(), e);
errorCallback.onError(e);
}
});
}
/**
* Close the connection to this user
*/
private void closeConnection() {
if (connection != null && connection.isOpen()) {
connection.close();
}
}
@Override
public void disconnect() {
closeConnection();
}
}

View file

@ -0,0 +1,34 @@
package fr.insa.clavardator.client.users;
import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.SimpleCallback;
import org.jetbrains.annotations.Nullable;
import java.io.Serializable;
public abstract class PeerConnection {
protected final PeerUser user;
protected PeerConnection(PeerUser user) {
this.user = user;
}
protected abstract void send(Serializable message, SimpleCallback calback, @Nullable ErrorCallback errorCallback);
/**
* Subscribe to this user messages.
* If receiving new user info, update this user.
* If receiving text message, store it in the history.
*
* @param errorCallback Callback on error
*/
protected abstract void receive(TcpConnection.MessageReceivedCallback callback, ErrorCallback errorCallback);
/**
* Close the connection and set state to disconnected
*/
public abstract void disconnect();
}

View file

@ -2,6 +2,7 @@ package fr.insa.clavardator.client.users;
import fr.insa.clavardator.client.chat.ChatHistory; import fr.insa.clavardator.client.chat.ChatHistory;
import fr.insa.clavardator.client.db.DatabaseController; import fr.insa.clavardator.client.db.DatabaseController;
import fr.insa.clavardator.client.server.Proxy;
import fr.insa.clavardator.lib.errors.UsernameTakenException; import fr.insa.clavardator.lib.errors.UsernameTakenException;
import fr.insa.clavardator.lib.message.FileMessage; import fr.insa.clavardator.lib.message.FileMessage;
import fr.insa.clavardator.lib.message.Message; import fr.insa.clavardator.lib.message.Message;
@ -14,15 +15,14 @@ import fr.insa.clavardator.lib.util.Log;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
public class PeerUser extends User implements Comparable<PeerUser> { public class PeerUser extends User implements Comparable<PeerUser> {
protected transient ChatHistory history; private final transient ChatHistory history;
private UserState state = UserState.DISCONNECTED; private UserState state = UserState.DISCONNECTED;
private transient TcpConnection connection; private PeerConnection connection;
public PeerUser(String id, String username) { public PeerUser(String id, String username) {
super(id, username); super(id, username);
@ -34,6 +34,23 @@ public class PeerUser extends User implements Comparable<PeerUser> {
history = new ChatHistory(this); history = new ChatHistory(this);
} }
private void init(String id, String username, ErrorCallback errorCallback) {
setId(id);
setUsername(username);
setState(UserState.CONNECTED);
connection.receive(msg -> onMessageReceived(msg, errorCallback), errorCallback);
}
public void init(TcpConnection tcpConnection, String id, String username, ErrorCallback errorCallback) {
connection = new DirectPeerConnection(tcpConnection, this);
init(id, username, errorCallback);
}
public void init(Proxy proxy, String id, String username, ErrorCallback errorCallback) {
connection = new ProxyPeerConnection(proxy, this);
init(id, username, errorCallback);
}
/** /**
* Sends a basic text message to this user * Sends a basic text message to this user
@ -44,14 +61,15 @@ public class PeerUser extends User implements Comparable<PeerUser> {
public void sendTextMessage(String msg, @Nullable ErrorCallback errorCallback) { public void sendTextMessage(String msg, @Nullable ErrorCallback errorCallback) {
if (connection != null) { if (connection != null) {
Log.v(this.getClass().getSimpleName(), Log.v(this.getClass().getSimpleName(),
"Sending message to " + this.getUsername() + " / " + this.getId() + ": " + msg); "Sending message to " + getUsername() + " / " + getId() + ": " + msg);
final Message message = new Message(CurrentUser.getInstance(), this, new Date(), msg); final Message message = new Message(CurrentUser.getInstance(), this, new Date(), msg);
connection.send(message, () -> history.addMessage(message, errorCallback), errorCallback); connection.send(message, () -> addMessageToHistory(message, errorCallback), errorCallback);
} else { } else {
Log.e(this.getClass().getSimpleName(), "Could not send message: connection is not initialized"); Log.e(this.getClass().getSimpleName(), "Could not send message: connection is not initialized");
} }
} }
/** /**
* Sends a message containing a file to this user * Sends a message containing a file to this user
* *
@ -65,7 +83,7 @@ public class PeerUser extends User implements Comparable<PeerUser> {
try { try {
final FileMessage message = new FileMessage(CurrentUser.getInstance(), this, new Date(), msg, file.getPath()); final FileMessage message = new FileMessage(CurrentUser.getInstance(), this, new Date(), msg, file.getPath());
message.storeFile(); message.storeFile();
connection.send(message, () -> history.addMessage(message, errorCallback), errorCallback); connection.send(message, () -> addMessageToHistory(message, errorCallback), errorCallback);
} catch (IOException e) { } catch (IOException e) {
Log.e(this.getClass().getSimpleName(), "Could not send message: error while opening file", e); Log.e(this.getClass().getSimpleName(), "Could not send message: error while opening file", e);
if (errorCallback != null) { if (errorCallback != null) {
@ -87,102 +105,61 @@ public class PeerUser extends User implements Comparable<PeerUser> {
final String username = CurrentUser.getInstance().getUsername(); final String username = CurrentUser.getInstance().getUsername();
Log.v(this.getClass().getSimpleName(), Log.v(this.getClass().getSimpleName(),
"Sending current user information to " + this.getUsername() + " / " + this.getId() + ": " + username); "Sending current user information to " + this.getUsername() + " / " + this.getId() + ": " + username);
connection.send( connection.send(new UserInformation(CurrentUser.getInstance()), null, errorCallback);
new UserInformation(CurrentUser.getInstance()),
null,
errorCallback);
} else { } else {
Log.e(this.getClass().getSimpleName(), "Could not send new username: connection is not initialized"); Log.e(this.getClass().getSimpleName(), "Could not send new username: connection is not initialized");
} }
} }
private void sendUsernameTaken(TcpConnection thisConnection) { protected void sendUsernameTaken() {
Log.v(this.getClass().getSimpleName(), "Received username request using current username"); if (connection != null) {
thisConnection.send(new UsernameTakenException("Username taken", getId()), this::disconnect, null); Log.v(this.getClass().getSimpleName(), "Received username request using current username");
} UsernameTakenException exception = new UsernameTakenException("Username taken",
CurrentUser.getInstance().getId(), getId());
public void init(TcpConnection connection, String id, String username, ErrorCallback errorCallback) { connection.send(exception, connection::disconnect, null);
this.connection = connection; } else {
this.setId(id); Log.e(this.getClass().getSimpleName(), "Could not send UsernameTaken: connection is not initialized");
setUsername(username);
setState(UserState.CONNECTED);
subscribeToMessages((e) -> {
disconnect();
errorCallback.onError(e);
});
}
/**
* Subscribe to this user messages.
* If receiving new user info, update this user.
* If receiving text message, store it in the history.
*
* @param errorCallback Callback on error
*/
private void subscribeToMessages(ErrorCallback errorCallback) {
connection.receive(
msg -> {
Log.v(this.getClass().getSimpleName(), "Received message from " + getId());
if (msg instanceof UserInformation) {
assert ((UserInformation) msg).id.equals(getId());
final String receivedUsername = ((UserInformation) msg).getUsername();
Log.v(this.getClass().getSimpleName(), "Message username: " + receivedUsername);
if (CurrentUser.getInstance().getUsername().equals(receivedUsername)) {
sendUsernameTaken(connection);
} else {
setUsername(receivedUsername);
}
} else if (msg instanceof Message) {
assert !((Message) msg).getRecipient().id.equals(getId());
Log.v(this.getClass().getSimpleName(), "Message text: " + ((Message) msg).getText());
if (msg instanceof FileMessage) {
((FileMessage) msg).storeFile();
}
history.addMessage((Message) msg, errorCallback);
} else if (msg instanceof UsernameTakenException) {
disconnect();
errorCallback.onError(new Exception("Received username already taken message"));
}
},
e -> {
if (e instanceof EOFException) {
disconnect();
} else {
Log.e(this.getClass().getSimpleName(), "Error receiving message from " + getId(), e);
errorCallback.onError(e);
}
});
}
/**
* Close the connection and set state to disconnected
*/
public void disconnect() {
Log.v(this.getClass().getSimpleName(), "Disconnecting from user: " + getId());
closeConnection();
setState(UserState.DISCONNECTED);
}
@Override
protected void setUsername(String newUsername) {
super.setUsername(newUsername);
final DatabaseController db = new DatabaseController();
db.updateUsername(new UserInformation(this),
null,
e -> Log.e(getClass().getSimpleName(), "Unable to update the username", e));
}
/**
* Close the connection to this user
*/
private void closeConnection() {
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
} }
} }
protected void addMessageToHistory(Message message, @Nullable ErrorCallback errorCallback) {
history.addMessage(message, errorCallback);
}
protected void onMessageReceived(Object msg, ErrorCallback errorCallback) throws IOException {
Log.v(this.getClass().getSimpleName(), "Received message from " + getId());
if (msg instanceof UserInformation) {
assert ((UserInformation) msg).id.equals(getId());
final String receivedUsername = ((UserInformation) msg).getUsername();
Log.v(this.getClass().getSimpleName(), "Message username: " + receivedUsername);
if (CurrentUser.getInstance().getUsername().equals(receivedUsername)) {
sendUsernameTaken();
} else {
setUsername(receivedUsername);
}
} else if (msg instanceof Message) {
assert !((Message) msg).getRecipient().id.equals(getId());
Log.v(this.getClass().getSimpleName(), "Message text: " + ((Message) msg).getText());
if (msg instanceof FileMessage) {
((FileMessage) msg).storeFile();
}
history.addMessage((Message) msg, errorCallback);
} else if (msg instanceof UsernameTakenException) {
disconnect();
errorCallback.onError(new Exception("Received username already taken message"));
}
}
public void disconnect() {
if (connection != null) {
Log.v(this.getClass().getSimpleName(), "Disconnecting from user: " + getId());
connection.disconnect();
connection = null;
}
setState(UserState.DISCONNECTED);
}
/** /**
* Gets the value of history * Gets the value of history
@ -203,6 +180,16 @@ public class PeerUser extends User implements Comparable<PeerUser> {
this.state = state; this.state = state;
} }
@Override
protected void setUsername(String newUsername) {
super.setUsername(newUsername);
final DatabaseController db = new DatabaseController();
db.updateUsername(new UserInformation(this),
null,
e -> Log.e(getClass().getSimpleName(), "Unable to update the username", e));
}
/** /**
* Check if this user is active. * Check if this user is active.
* *

View file

@ -0,0 +1,43 @@
package fr.insa.clavardator.client.users;
import fr.insa.clavardator.client.server.Proxy;
import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.Log;
import fr.insa.clavardator.lib.util.SimpleCallback;
import org.jetbrains.annotations.Nullable;
import java.io.EOFException;
import java.io.Serializable;
public class ProxyPeerConnection extends PeerConnection {
private final Proxy proxy;
public ProxyPeerConnection(Proxy proxy, PeerUser user) {
super(user);
this.proxy = proxy;
}
@Override
protected void send(Serializable message, SimpleCallback callback, @Nullable ErrorCallback errorCallback) {
proxy.send(message, callback, errorCallback);
}
@Override
protected void receive(TcpConnection.MessageReceivedCallback callback, ErrorCallback errorCallback) {
proxy.receive(user.getId(), callback,
e -> {
disconnect();
if (!(e instanceof EOFException)) {
Log.e(this.getClass().getSimpleName(), "Error receiving message from " + user.getId(), e);
errorCallback.onError(e);
}
});
}
@Override
public void disconnect() {
proxy.disconnectUser(user.getId());
}
}

View file

@ -3,7 +3,7 @@ package fr.insa.clavardator.client.users;
import fr.insa.clavardator.client.db.DatabaseController; import fr.insa.clavardator.client.db.DatabaseController;
import fr.insa.clavardator.client.network.NetDiscoverer; import fr.insa.clavardator.client.network.NetDiscoverer;
import fr.insa.clavardator.client.network.PeerHandshake; import fr.insa.clavardator.client.network.PeerHandshake;
import fr.insa.clavardator.lib.network.TcpConnection; import fr.insa.clavardator.client.server.Proxy;
import fr.insa.clavardator.lib.network.TcpListener; import fr.insa.clavardator.lib.network.TcpListener;
import fr.insa.clavardator.lib.users.User; import fr.insa.clavardator.lib.users.User;
import fr.insa.clavardator.lib.users.UserInformation; import fr.insa.clavardator.lib.users.UserInformation;
@ -47,7 +47,7 @@ public class UserList {
}, errorCallback); }, errorCallback);
} }
public void onReceivePresenceNotification(ArrayList<UserInformation> newPresenceUsers, TcpConnection proxyConnection) { public void onReceivePresenceNotification(ArrayList<UserInformation> newPresenceUsers, Proxy proxy) {
newPresenceUsers.forEach((userInfo -> { newPresenceUsers.forEach((userInfo -> {
final PeerUser savedUser = userHashmap.get(userInfo.id); final PeerUser savedUser = userHashmap.get(userInfo.id);
if (savedUser != null) { if (savedUser != null) {
@ -55,12 +55,12 @@ public class UserList {
Log.v(getClass().getSimpleName(), "Received user from presence server already known and connected"); Log.v(getClass().getSimpleName(), "Received user from presence server already known and connected");
} else { } else {
Log.v(getClass().getSimpleName(), "Received user from presence server already known but not connected, connecting..."); Log.v(getClass().getSimpleName(), "Received user from presence server already known but not connected, connecting...");
savedUser.init(proxyConnection, userInfo.id, userInfo.getUsername(), null); savedUser.init(proxy, userInfo.id, userInfo.getUsername(), null);
} }
} else { } else {
Log.v(getClass().getSimpleName(), "Received new user from presence server"); Log.v(getClass().getSimpleName(), "Received new user from presence server");
final PeerUser user = new PeerUser(); final PeerUser user = new PeerUser();
user.init(proxyConnection, userInfo.id, userInfo.getUsername(), null); user.init(proxy, userInfo.id, userInfo.getUsername(), null);
userHashmap.put(user.getId(), user); userHashmap.put(user.getId(), user);
Platform.runLater(() -> userObservableList.add(user)); Platform.runLater(() -> userObservableList.add(user));
} }

View file

@ -1,12 +1,27 @@
package fr.insa.clavardator.lib.errors; package fr.insa.clavardator.lib.errors;
import fr.insa.clavardator.lib.message.RecipientInfo;
import fr.insa.clavardator.lib.message.SenderInfo;
import java.io.Serializable; import java.io.Serializable;
public class UsernameTakenException extends Exception implements Serializable { public class UsernameTakenException extends Exception implements Serializable, RecipientInfo, SenderInfo {
public final String recipient; public final String recipient;
public final String sender;
public UsernameTakenException(String message, String recipient) { public UsernameTakenException(String message, String sender, String recipient) {
super(message); super(message);
this.sender = sender;
this.recipient = recipient; this.recipient = recipient;
} }
@Override
public String getRecipientId() {
return recipient;
}
@Override
public String getSenderId() {
return sender;
}
} }

View file

@ -86,8 +86,9 @@ public class FileMessage extends Message {
// write to the file // write to the file
if (rawFile == null) { if (rawFile == null) {
FileInputStream stream = new FileInputStream(fileName); FileInputStream istream = new FileInputStream(fileName);
rawFile = stream.readAllBytes(); rawFile = istream.readAllBytes();
istream.close();
} }
FileOutputStream ostream = new FileOutputStream(file); FileOutputStream ostream = new FileOutputStream(file);

View file

@ -6,7 +6,7 @@ import fr.insa.clavardator.lib.users.UserInformation;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
public class Message implements Serializable { public class Message implements Serializable, SenderInfo, RecipientInfo {
private final String text; private final String text;
private final Date date; private final Date date;
private final UserInformation sender; private final UserInformation sender;
@ -59,4 +59,14 @@ public class Message implements Serializable {
", recipient=" + recipient + ", recipient=" + recipient +
'}'; '}';
} }
@Override
public String getSenderId() {
return sender.id;
}
@Override
public String getRecipientId() {
return recipient.id;
}
} }

View file

@ -0,0 +1,5 @@
package fr.insa.clavardator.lib.message;
public interface RecipientInfo {
String getRecipientId();
}

View file

@ -0,0 +1,5 @@
package fr.insa.clavardator.lib.message;
public interface SenderInfo {
String getSenderId();
}

View file

@ -1,6 +1,7 @@
package fr.insa.clavardator.lib.network; package fr.insa.clavardator.lib.network;
import fr.insa.clavardator.lib.util.ErrorCallback; import fr.insa.clavardator.lib.util.ErrorCallback;
import fr.insa.clavardator.lib.util.SimpleCallback;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -19,6 +20,7 @@ public class TcpConnection {
private ObjectInputStream inputStream; private ObjectInputStream inputStream;
private boolean shouldStop = false; private boolean shouldStop = false;
private final int port; private final int port;
private final Object outputStreamGuard = new Object();
/** /**
* Creates a new connection, and connects to the peer * Creates a new connection, and connects to the peer
@ -61,7 +63,7 @@ public class TcpConnection {
* @param callback The function to call on success * @param callback The function to call on success
* @param errorCallback The function to call on error * @param errorCallback The function to call on error
*/ */
public void send(Serializable message, @Nullable MessageSentCallback callback, @Nullable ErrorCallback errorCallback) { public void send(Serializable message, @Nullable SimpleCallback callback, @Nullable ErrorCallback errorCallback) {
Sender sender = new Sender(message, callback, errorCallback); Sender sender = new Sender(message, callback, errorCallback);
sender.start(); sender.start();
} }
@ -93,8 +95,10 @@ public class TcpConnection {
*/ */
public void close() { public void close() {
shouldStop = true; shouldStop = true;
if (socket != null) { if (outputStream != null) {
try { try {
outputStream.close();
inputStream.close();
socket.close(); socket.close();
} catch (IOException ignored) { } catch (IOException ignored) {
} }
@ -103,7 +107,8 @@ public class TcpConnection {
/** /**
* Checks if the current connection is open * Checks if the current connection is open
* @return True if the socket is still open *
* @return True if the socket is still open
*/ */
public boolean isOpen() { public boolean isOpen() {
return socket != null && socket.isConnected() && !socket.isClosed(); return socket != null && socket.isConnected() && !socket.isClosed();
@ -138,31 +143,33 @@ public class TcpConnection {
private class Sender extends Thread { private class Sender extends Thread {
private final Serializable message; private final Serializable message;
private final MessageSentCallback callback; private final SimpleCallback callback;
private final ErrorCallback errorCallback; private final ErrorCallback errorCallback;
/** /**
* Constructs a thread that sends a message using the socket of the outer class * Constructs a thread that sends a message using the socket of the outer class
* *
* @param message The message to send * @param message The message to send
* @param callback The function to call on success * @param callback The function to call on success
* @param errorCallback The function to call on error * @param errorCallback The function to call on error
*/ */
public Sender(Serializable message, @Nullable MessageSentCallback callback, @Nullable ErrorCallback errorCallback) { public Sender(Serializable message, @Nullable SimpleCallback callback, @Nullable ErrorCallback errorCallback) {
this.message = message; this.message = message;
this.callback = callback; this.callback = callback;
this.errorCallback = errorCallback; this.errorCallback = errorCallback;
} }
@Override @Override
synchronized public void run() { public void run() {
try { try {
if (outputStream == null) { synchronized (outputStreamGuard) {
outputStream = new ObjectOutputStream(socket.getOutputStream()); if (outputStream == null) {
outputStream = new ObjectOutputStream(socket.getOutputStream());
}
outputStream.writeObject(message);
} }
outputStream.writeObject(message);
if (callback != null) if (callback != null)
callback.onMessageSent(); callback.call();
} catch (IOException e) { } catch (IOException e) {
if (errorCallback != null && !shouldStop) if (errorCallback != null && !shouldStop)
errorCallback.onError(e); errorCallback.onError(e);
@ -211,9 +218,4 @@ public class TcpConnection {
public interface MessageReceivedCallback { public interface MessageReceivedCallback {
void onMessageReceived(Object msg) throws IOException; void onMessageReceived(Object msg) throws IOException;
} }
public interface MessageSentCallback {
void onMessageSent();
}
} }

View file

@ -49,7 +49,7 @@ public class TcpListener {
private final NewConnectionCallback callback; private final NewConnectionCallback callback;
private final ErrorCallback errorCallback; private final ErrorCallback errorCallback;
private ServerSocket server; private ServerSocket server;
private int port; private final int port;
public Acceptor(int port, NewConnectionCallback callback, ErrorCallback errorCallback) { public Acceptor(int port, NewConnectionCallback callback, ErrorCallback errorCallback) {
this.port = port; this.port = port;
@ -69,11 +69,11 @@ public class TcpListener {
if (!shouldStop) if (!shouldStop)
errorCallback.onError(e); errorCallback.onError(e);
} }
} }
public void stopAccepting() { public void stopAccepting() {
shouldStop = true; shouldStop = true;
// TODO: call interrupt() with ChannelServerSocket instead of closing the socket?
try { try {
server.close(); server.close();
} catch (IOException ignored) { } catch (IOException ignored) {

View file

@ -1,11 +1,13 @@
package fr.insa.clavardator.lib.users; package fr.insa.clavardator.lib.users;
import fr.insa.clavardator.lib.message.SenderInfo;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Class used to serialize useful user information * Class used to serialize useful user information
*/ */
public class UserInformation implements Serializable { public class UserInformation implements Serializable, SenderInfo {
public final String id; public final String id;
private final String username; private final String username;
private final UserState state; private final UserState state;
@ -46,4 +48,9 @@ public class UserInformation implements Serializable {
public String toString() { public String toString() {
return "UserInfo " + id + '(' + username + ')'; return "UserInfo " + id + '(' + username + ')';
} }
@Override
public String getSenderId() {
return id;
}
} }

View file

@ -1,7 +1,6 @@
package fr.insa.clavardator.server; package fr.insa.clavardator.server;
import fr.insa.clavardator.lib.errors.UsernameTakenException; import fr.insa.clavardator.lib.message.RecipientInfo;
import fr.insa.clavardator.lib.message.Message;
import fr.insa.clavardator.lib.network.TcpConnection; import fr.insa.clavardator.lib.network.TcpConnection;
import fr.insa.clavardator.lib.network.TcpListener; import fr.insa.clavardator.lib.network.TcpListener;
import fr.insa.clavardator.lib.users.UserInformation; import fr.insa.clavardator.lib.users.UserInformation;
@ -24,17 +23,9 @@ public class Proxy {
Log.v(getClass().getSimpleName(), "Accepting a new user"); Log.v(getClass().getSimpleName(), "Accepting a new user");
TcpConnection connection = new TcpConnection(clientSocket); TcpConnection connection = new TcpConnection(clientSocket);
connection.receive(msg -> { connection.receive(msg -> {
if (msg instanceof UserInformation) {
if (msg instanceof Message) { // Send UserInformation to all other connected users
Log.v(getClass().getSimpleName(), "Transmitting message: " + msg); Log.v(getClass().getSimpleName(), "Received UserInformation: " + msg);
transmitMessage((Serializable) msg, ((Message) msg).getRecipient().id);
} else if (msg instanceof UsernameTakenException) {
UsernameTakenException unameTaken = ((UsernameTakenException) msg);
transmitMessage(unameTaken, unameTaken.recipient);
} else if (msg instanceof UserInformation) {
Log.v(getClass().getSimpleName(), "Registering new user: " + msg);
users.put(((UserInformation) msg).id, connection); users.put(((UserInformation) msg).id, connection);
for (String userId : users.keySet()) { for (String userId : users.keySet()) {
UserInformation userInfo = ((UserInformation) msg); UserInformation userInfo = ((UserInformation) msg);
@ -42,20 +33,25 @@ public class Proxy {
transmitMessage((Serializable) msg, userId); transmitMessage((Serializable) msg, userId);
} }
} }
} else if (msg instanceof RecipientInfo) {
// Send other messages only to the recipient
transmitMessage((Serializable) msg, ((RecipientInfo) msg).getRecipientId());
} else {
Log.e(getClass().getSimpleName(), "Unexpected message received: " + msg.toString());
} }
}, e -> { }, e -> {
if (e instanceof EOFException) { if (!(e instanceof EOFException)) {
for (Map.Entry<String, TcpConnection> user : users.entrySet()) {
if (user.getValue().equals(connection)) {
Log.v(getClass().getSimpleName(), "Disconnecting user " + user.getKey());
users.remove(user.getKey());
break;
}
}
} else {
Log.e(getClass().getSimpleName(), "Error while receiving message to transmit", e); Log.e(getClass().getSimpleName(), "Error while receiving message to transmit", e);
} }
for (Map.Entry<String, TcpConnection> user : users.entrySet()) {
if (user.getValue().equals(connection)) {
Log.v(getClass().getSimpleName(), "Disconnecting user " + user.getKey());
users.remove(user.getKey());
break;
}
}
}); });
}, },
e -> Log.e(getClass().getSimpleName(), "Error while accepting a user", e)); e -> Log.e(getClass().getSimpleName(), "Error while accepting a user", e));
@ -67,6 +63,7 @@ public class Proxy {
if (user == null) { if (user == null) {
Log.e(getClass().getSimpleName(), "Cannot find the recipient in the connected users"); Log.e(getClass().getSimpleName(), "Cannot find the recipient in the connected users");
} else { } else {
Log.v(getClass().getSimpleName(), "Transmitting message: " + msg);
user.send(msg, null, e -> Log.e(getClass().getSimpleName(), "Error while sending message", e)); user.send(msg, null, e -> Log.e(getClass().getSimpleName(), "Error while sending message", e));
} }
} }