Simplify user connection

Extracted handshake phase into separate class.
This commit is contained in:
Arnaud Vergnet 2021-01-06 08:28:21 +01:00
parent fcb678f4fe
commit 63cb83916a
6 changed files with 145 additions and 268 deletions

View file

@ -63,8 +63,10 @@ public class ChatHistory {
*/
private void onLoaded(ArrayList<Message> newHistory) {
historyLoaded = true;
history.addAll(newHistory);
history.sort((message, t1) -> (int) (message.getDate().getTime() - t1.getDate().getTime()));
Platform.runLater(() -> {
history.addAll(newHistory);
history.sort((message, t1) -> (int) (message.getDate().getTime() - t1.getDate().getTime()));
});
Log.v(getClass().getSimpleName(), "Message history loaded");
notifyHistoryLoaded();
}
@ -79,9 +81,7 @@ public class ChatHistory {
* @param message The message to add
*/
public void addMessage(Message message, ErrorCallback errorCallback) {
db.addMessage(message, () -> {
Platform.runLater(() -> history.add(message));
}, errorCallback);
db.addMessage(message, () -> Platform.runLater(() -> history.add(message)), errorCallback);
}
public interface HistoryLoadedCallback {

View file

@ -0,0 +1,98 @@
package fr.insa.clavardator.network;
import fr.insa.clavardator.errors.UsernameTakenException;
import fr.insa.clavardator.users.CurrentUser;
import fr.insa.clavardator.users.UserInformation;
import fr.insa.clavardator.util.ErrorCallback;
import fr.insa.clavardator.util.Log;
import java.net.InetAddress;
import java.net.Socket;
public class PeerHandshake {
private PeerConnection connection;
private UserInformation userInformation;
public void createConnection(InetAddress ipAddr, UserConnectedCallback callback, ErrorCallback errorCallback) {
closeConnection();
Log.v(this.getClass().getSimpleName(), "Creating new TCP connection ");
connection = new PeerConnection(
ipAddr,
(thisConnection) -> init(thisConnection,
false,
callback,
errorCallback),
e -> {
Log.e(this.getClass().getSimpleName(), "Could not create TCP connection", e);
closeConnection();
errorCallback.onError(e);
});
}
public void acceptConnection(Socket socket, UserConnectedCallback callback, ErrorCallback errorCallback) {
closeConnection();
connection = new PeerConnection(socket);
init(connection, true, callback, errorCallback);
}
private void init(PeerConnection thisConnection, boolean isReceiving, UserConnectedCallback callback, ErrorCallback errorCallback) {
// Send our username
thisConnection.send(new UserInformation(CurrentUser.getInstance()), null, e -> {
closeConnection();
errorCallback.onError(e);
});
// Receive peer's username
thisConnection.receiveOne(msg -> {
if (msg instanceof UserInformation) {
UserInformation userInfo = (UserInformation) msg;
final String receivedUsername = userInfo.getUsername();
if (!receivedUsername.equals(CurrentUser.getInstance().getUsername())) {
this.userInformation = userInfo;
callback.onUserConnected(this);
} else if (isReceiving) {
sendUsernameTaken(thisConnection);
} else {
closeConnection();
errorCallback.onError(new Exception("Tried to use same username as remote"));
}
} else {
closeConnection();
errorCallback.onError(new Exception("Did not receive remote username"));
}
}, e -> {
closeConnection();
errorCallback.onError(e);
});
}
private void sendUsernameTaken(PeerConnection thisConnection) {
Log.v(this.getClass().getSimpleName(), "Received username request using current username");
thisConnection.send(new UsernameTakenException("Username taken"), this::closeConnection, null);
}
/**
* Close the connection to this user
*/
private void closeConnection() {
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
}
}
public PeerConnection getConnection() {
return connection;
}
public UserInformation getUserInformation() {
return userInformation;
}
public interface UserConnectedCallback {
void onUserConnected(PeerHandshake handshake);
}
}

View file

@ -30,10 +30,8 @@ public class MessageListItemCell extends ListCell<Message> {
if (item == null || empty) {
setGraphic(null);
} else {
Platform.runLater(() -> {
setGraphic(view);
messageListItemController.setMessage(item);
});
setGraphic(view);
messageListItemController.setMessage(item);
}
}
}

View file

@ -6,8 +6,6 @@ import fr.insa.clavardator.ui.UserSelectedEvent;
import fr.insa.clavardator.users.PeerUser;
import fr.insa.clavardator.users.User;
import fr.insa.clavardator.users.UserList;
import fr.insa.clavardator.util.Log;
import javafx.application.Platform;
import javafx.fxml.FXML;
import javafx.fxml.Initializable;
import javafx.scene.control.ListView;
@ -63,26 +61,11 @@ public class UserListController implements Initializable {
});
}
/**
* Add user to UI if not already present
* @param user The new user to display
*/
private void onUserConnected(PeerUser user) {
Platform.runLater(() -> {
if (!peerUserListView.getItems().contains(user)) {
Log.v(this.getClass().getSimpleName(), "Add user to UI");
peerUserListView.getItems().add(user);
} else {
Log.w(this.getClass().getSimpleName(), "User already added to ui, skipping...");
}
});
}
/**
* Sets the user list to subscribe to
* @param userList The user list to use
*/
public void setUserList(UserList userList) {
userList.setNewUserObserver(this::onUserConnected);
peerUserListView.setItems(userList.getUserObservableList());
}
}

View file

@ -11,8 +11,6 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.EOFException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Date;
public class PeerUser extends User implements Comparable<PeerUser> {
@ -35,41 +33,6 @@ public class PeerUser extends User implements Comparable<PeerUser> {
history = new ChatHistory(this);
}
/**
* Asynchronously connects to the user and receives its information (id, username)
*
* @param ipAddr The IP address of this user
* @param callback The function to call on success
* @param errorCallback The function to call on socket error
*/
public void createConnection(InetAddress ipAddr, UserConnectedCallback callback, ErrorCallback errorCallback) {
closeConnection();
Log.v(this.getClass().getSimpleName(), "Creating new TCP connection with " + id);
// Connect to the peer
setState(State.CONNECTING);
connection = new PeerConnection(
ipAddr,
(thisConnection) -> init(thisConnection, false, callback, errorCallback),
e -> {
Log.e(this.getClass().getSimpleName(), "Could not create TCP connection with " + id, e);
disconnect();
errorCallback.onError(e);
});
}
/**
* Asynchronously connects to the user using the socket and receives its information (id, username)
*
* @param socket A socket already connected to the user
* @param callback The function to call on success, with the new ActiveUser as parameter
* @param errorCallback The function to call on socket error
*/
public void acceptConnection(Socket socket, UserConnectedCallback callback, ErrorCallback errorCallback) {
closeConnection();
setState(State.CONNECTING);
connection = new PeerConnection(socket);
init(connection, true, callback, errorCallback);
}
/**
* Sends a basic text message to this user
@ -112,46 +75,12 @@ public class PeerUser extends User implements Comparable<PeerUser> {
thisConnection.send(new UsernameTakenException("Username taken"), this::disconnect, null);
}
/**
* Initializes the connection with this user.
* Both user exchange their information
*
* @param thisConnection The peer connection to use
* @param callback Callback on success
* @param errorCallback Callback on error
*/
private void init(PeerConnection thisConnection, boolean isReceiving, UserConnectedCallback callback, ErrorCallback errorCallback) {
// Send our username
thisConnection.send(new UserInformation(CurrentUser.getInstance()), null, e -> {
disconnect();
errorCallback.onError(e);
});
// Receive peer's username
thisConnection.receiveOne(msg -> {
if (msg instanceof UserInformation) {
UserInformation userInfo = (UserInformation) msg;
final String receivedUsername = userInfo.getUsername();
if (!receivedUsername.equals(CurrentUser.getInstance().getUsername())) {
setId(userInfo.id);
setUsername(receivedUsername);
callback.onUserConnected();
subscribeToMessages(e -> {
disconnect();
errorCallback.onError(e);
});
setState(State.CONNECTED);
} else if (isReceiving) {
sendUsernameTaken(thisConnection);
} else {
disconnect();
errorCallback.onError(new Exception("Tried to use same username as remote"));
}
} else {
disconnect();
errorCallback.onError(new Exception("Did not receive remote username"));
}
}, e -> {
public void init(PeerConnection connection, String id, String username, ErrorCallback errorCallback) {
this.connection = connection;
this.id = id;
setUsername(username);
setState(State.CONNECTED);
subscribeToMessages((e) -> {
disconnect();
errorCallback.onError(e);
});
@ -240,7 +169,7 @@ public class PeerUser extends User implements Comparable<PeerUser> {
*
* @param state The new state
*/
private void setState(State state) {
protected void setState(State state) {
pcs.firePropertyChange("state", this.state, state);
this.state = state;
}
@ -268,15 +197,7 @@ public class PeerUser extends User implements Comparable<PeerUser> {
* The user connection state
*/
public enum State {
CONNECTING,
CONNECTED,
DISCONNECTED,
}
/**
* Callback when this user successfully connects
*/
public interface UserConnectedCallback {
void onUserConnected();
}
}

View file

@ -3,24 +3,23 @@ package fr.insa.clavardator.users;
import fr.insa.clavardator.db.DatabaseController;
import fr.insa.clavardator.network.ConnectionListener;
import fr.insa.clavardator.network.NetDiscoverer;
import fr.insa.clavardator.network.PeerHandshake;
import fr.insa.clavardator.util.ErrorCallback;
import fr.insa.clavardator.util.Log;
import javafx.application.Platform;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import java.beans.PropertyChangeEvent;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
public class UserList {
private final Map<String, PeerUser> inactiveUsers = new HashMap<>();
private final Map<String, PeerUser> activeUsers = new HashMap<>();
private final ArrayList<PeerUser> pendingUsers = new ArrayList<>();
private UserConnectionCallback newUsersObservers = null;
// List kept in sync with the running thread
private final Map<String, PeerUser> userHashmap = new HashMap<>();
// List kept in sync with ui thread, may be out out date with rapid interactions
// Use only for display, not for duplicate checking
private final ObservableList<PeerUser> userObservableList = FXCollections.observableArrayList();
private final DatabaseController db = new DatabaseController();
private final NetDiscoverer netDiscoverer = new NetDiscoverer();
private final ConnectionListener connectionListener = new ConnectionListener();
@ -28,41 +27,19 @@ public class UserList {
public UserList() {
}
/**
* Adds an observer for new user connection events
*
* @param connectionCallback The function to add as listener
*/
public void setNewUserObserver(UserConnectionCallback connectionCallback) {
newUsersObservers = connectionCallback;
}
/**
* Notifies observers the given user is a new connection
*
* @param user The newly connected user
*/
private void notifyNewUserObservers(PeerUser user) {
if (newUsersObservers != null) {
newUsersObservers.onUserConnected(user);
}
}
/**
* Discovers all active users over the network
* and create a connection for each.
* Observers are notified for each new successful connection.
*
* @param errorCallback The function to call on error
* @see UserList#setNewUserObserver(UserConnectionCallback)
*/
public void discoverActiveUsers(ErrorCallback errorCallback) {
netDiscoverer.discoverActiveUsers("CLAVARDATOR_BROADCAST", (ipAddr, data) -> {
Log.v(this.getClass().getSimpleName(), "Discovered new user: " + data);
final PeerUser user = createNewPendingUser(data.trim());
if (user != null) {
user.createConnection(ipAddr, () -> onUserConnectionSuccess(user), errorCallback);
}
new PeerHandshake().createConnection(
ipAddr,
this::onUserConnectionSuccess,
errorCallback);
}, errorCallback);
}
@ -94,56 +71,26 @@ public class UserList {
(clientSocket) -> {
Log.v(this.getClass().getSimpleName(),
"new connection from user at address: " + clientSocket.getInetAddress().toString());
final PeerUser user = createNewPendingUser();
if (user != null) {
user.acceptConnection(clientSocket, () ->
onUserConnectionSuccess(user), errorCallback);
}
new PeerHandshake().acceptConnection(clientSocket, this::onUserConnectionSuccess, errorCallback);
},
errorCallback);
}
/**
* Notify observers and subscribe to user changes.
*
* @param user The newly connected user
*/
private void onUserConnectionSuccess(PeerUser user) {
notifyNewUserObservers(user);
user.addObserver(evt -> userChangeObserver(user, evt));
}
private PeerUser createNewPendingUser() {
PeerUser user = new PeerUser();
addUserToPendingList(user);
return user;
}
/**
* Creates a new user from its id and puts it in the inactive user list.
* We first try to fetch it from active and inactive users to prevent duplicates.
* We do not notify observers yet as the connection may be canceled on username checks.
*
* @param id The new user's id.
* @return A new PeerUser, or null if the user is already connected
*/
private PeerUser createNewPendingUser(String id) {
// If already connected, warn and return
if (activeUsers.containsKey(id)) {
Log.w(getClass().getSimpleName(),
"An already connected user tried to initiate a new connection: user id " + id);
return null;
private void onUserConnectionSuccess(PeerHandshake handshake) {
// Check if we already know this user
final UserInformation userInfo = handshake.getUserInformation();
final PeerUser savedUser = userHashmap.get(userInfo.id);
if (savedUser != null) {
savedUser.init(handshake.getConnection(), userInfo.id, userInfo.getUsername(), null);
} else {
final PeerUser user = new PeerUser();
user.init(handshake.getConnection(), userInfo.id, userInfo.getUsername(), null);
userHashmap.put(user.id, user);
Platform.runLater(() -> userObservableList.add(user));
}
// Get the user if already existing
PeerUser user = inactiveUsers.get(id);
// else create it
if (user == null) {
// Username is set on TCP connection start or db fetch
user = new PeerUser(id);
addUserToPendingList(user);
}
return user;
}
/**
@ -156,75 +103,8 @@ public class UserList {
*/
private void createNewInactiveUser(String id, String username) {
final PeerUser user = new PeerUser(id, username);
inactiveUsers.put(id, user);
notifyNewUserObservers(user);
}
private void addUserToPendingList(PeerUser user) {
if (!pendingUsers.contains(user)) {
pendingUsers.add(user);
}
}
/**
* Tries to move the given user from inactive list to active list
*
* @param user The user to move
*/
private void moveUserToActiveList(PeerUser user) {
final String id = user.id;
if (!inactiveUsers.containsKey(id)) {
if (activeUsers.containsKey(id)) {
Log.w(getClass().getSimpleName(), "Tried to set state " + PeerUser.State.CONNECTED + " on an already connected user: user id " + id);
} else {
Log.w(getClass().getSimpleName(), "Tried to set state " + PeerUser.State.CONNECTED + " on an unknown user: user id " + id);
}
return;
}
inactiveUsers.remove(user.id);
activeUsers.put(user.id, user);
}
/**
* Tries to move the given user from active list to inactive list
*
* @param user The user to move
*/
private void moveUserToInactiveList(PeerUser user) {
final String id = user.id;
if (!activeUsers.containsKey(id)) {
if (inactiveUsers.containsKey(id)) {
Log.w(getClass().getSimpleName(), "Tried to set state " + PeerUser.State.DISCONNECTED + " on an already disconnected user: user id " + id);
} else {
Log.w(getClass().getSimpleName(), "Tried to set state " + PeerUser.State.DISCONNECTED + " on an unknown user: user id " + id);
}
return;
}
activeUsers.remove(id);
inactiveUsers.put(id, user);
}
/**
* Move user from active or inactive list when its state changes.
*
* @param user The user which changed
* @param evt The change event
*/
private void userChangeObserver(PeerUser user, PropertyChangeEvent evt) {
if (evt.getPropertyName().equals("state")) {
PeerUser.State oldState = (PeerUser.State) evt.getOldValue();
PeerUser.State newState = (PeerUser.State) evt.getNewValue();
if ((oldState == PeerUser.State.DISCONNECTED ||
(oldState == PeerUser.State.CONNECTING) && newState == PeerUser.State.CONNECTED)) {
moveUserToActiveList(user);
} else if (oldState == PeerUser.State.CONNECTED &&
(newState == PeerUser.State.DISCONNECTED || newState == PeerUser.State.CONNECTING)) {
moveUserToInactiveList(user);
}
Log.v(getClass().getSimpleName(), "State of user " + user.id + " updated from " +
oldState.toString() + " to " + newState.toString());
}
userHashmap.put(id, user);
Platform.runLater(() -> userObservableList.add(user));
}
/**
@ -235,17 +115,14 @@ public class UserList {
*/
public boolean isUsernameAvailable(String username) {
Predicate<User> usernameEqual = user -> user.getUsername() != null && user.getUsername().equals(username);
return activeUsers.values().stream().noneMatch(usernameEqual) &&
inactiveUsers.values().stream().noneMatch(usernameEqual);
return userHashmap.values().stream().noneMatch(usernameEqual);
}
/**
* Tells all active users that our username changed
*/
public void propagateUsernameChange() {
activeUsers.forEach((id, user) -> {
user.sendCurrentUser(Throwable::printStackTrace);
});
userHashmap.forEach((id, user) -> user.sendCurrentUser(Throwable::printStackTrace));
}
/**
@ -256,13 +133,13 @@ public class UserList {
netDiscoverer.stopDiscovery();
connectionListener.stopAccepting();
db.close();
for (PeerUser user : activeUsers.values()) {
for (PeerUser user : userHashmap.values()) {
user.disconnect();
}
}
public interface UserConnectionCallback {
void onUserConnected(PeerUser user);
public ObservableList<PeerUser> getUserObservableList() {
return userObservableList;
}
public interface UserListLoadedCallback {