Convert ActiveUser to PeerUser

This commit is contained in:
Yohan Simard 2020-12-09 11:38:33 +01:00
parent 037e16b97b
commit 1d979c2dab
8 changed files with 214 additions and 49 deletions

View file

@ -3,8 +3,7 @@ package fr.insa.clavardator;
import fr.insa.clavardator.network.ConnectionListener;
import fr.insa.clavardator.network.NetDiscoverer;
import fr.insa.clavardator.ui.MainController;
import fr.insa.clavardator.users.ActiveUser;
import fr.insa.clavardator.users.CurrentUser;
import fr.insa.clavardator.users.PeerUser;
import javafx.application.Application;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
@ -19,6 +18,8 @@ public class MainApp extends Application {
NetDiscoverer netDiscoverer;
private ConnectionListener connectionListener;
private PeerUser user1;
private PeerUser user2;
public static void main(String[] args) {
launch(args);
@ -50,7 +51,10 @@ public class MainApp extends Application {
// TCP communication tests
connectionListener = new ConnectionListener();
connectionListener.acceptConnection(user -> System.out.println("New connection request : " + user.getUsername()), e -> {
connectionListener.acceptConnection(socket -> user1.connect(socket,
() -> System.out.println("Connexion établie avec " + user2.getUsername()),
Throwable::printStackTrace),
e -> {
if (e instanceof java.io.EOFException) {
System.out.println("Connexion terminée");
} else {
@ -58,14 +62,19 @@ public class MainApp extends Application {
}
});
ActiveUser.create(InetAddress.getByName("192.168.43.14"), user -> System.out.println("Connexion établie avec " + user.getUsername()),
user2 = new PeerUser(InetAddress.getLocalHost().hashCode(), "Yohan");
user2.connect(InetAddress.getByName("ADDRESSE_IP"),
() -> System.out.println("Connexion établie avec " + user2.getUsername()),
Throwable::printStackTrace);
}
@Override
public void stop() throws Exception {
netDiscoverer.stopDiscovery();
connectionListener.stopAccepting();
user1.disconnect();
user2.disconnect();
super.stop();
}
}

View file

@ -6,8 +6,8 @@ import java.io.Serializable;
public class Message implements Serializable {
private final String text;
private final User sender;
private final User recipient;
private final User sender; // Send only ids ?
private final User recipient; // Send only ids ?
public Message(User sender, User recipient) {
this(sender, recipient, "");

View file

@ -1,6 +1,5 @@
package fr.insa.clavardator.network;
import fr.insa.clavardator.users.ActiveUser;
import fr.insa.clavardator.util.ErrorCallback;
import java.io.IOException;
@ -58,7 +57,7 @@ public class ConnectionListener {
System.out.println("Accepting...");
Socket clientSocket = server.accept();
System.out.println("New connection from " + clientSocket.getRemoteSocketAddress());
ActiveUser.create(clientSocket, callback::onNewConnection, errorCallback);
callback.onNewConnection(clientSocket);
}
} catch (IOException e) {
if (!shouldStop)
@ -78,6 +77,6 @@ public class ConnectionListener {
public interface NewConnectionCallback {
void onNewConnection(ActiveUser user);
void onNewConnection(Socket clientSocket);
}
}

View file

@ -13,7 +13,7 @@ public class NetDiscoverer {
private static final int RESPONSE_BUFFER_SIZE = 50;
private BroadcastSender broadcastSender;
private BroadcastListener socket;
private BroadcastListener broadcastListener;
private ResponseSender responseSender;
private ResponseListener responseListener;
@ -44,10 +44,10 @@ public class NetDiscoverer {
* @param errorCallback The function to call on error
*/
public void startDiscoveryListening(String responseMessage, @Nullable BroadcastReceivedCallback onBroadcastReceived, @Nullable ErrorCallback errorCallback) {
if (socket != null)
socket.stopListening();
if (broadcastListener != null)
broadcastListener.stopListening();
socket = new BroadcastListener((ipAddr, data) -> {
broadcastListener = new BroadcastListener((ipAddr, data) -> {
if (onBroadcastReceived != null)
onBroadcastReceived.onBroadcastReceived(ipAddr, data);
responseSender = new ResponseSender(ipAddr, responseMessage, errorCallback);
@ -57,15 +57,15 @@ public class NetDiscoverer {
errorCallback.onError(e);
});
socket.start();
broadcastListener.start();
}
/**
* Stop network discovery and listening
*/
public void stopDiscovery() {
if (socket != null)
socket.stopListening();
if (broadcastListener != null)
broadcastListener.stopListening();
if (responseListener != null)
responseListener.stopListening();
}

View file

@ -87,6 +87,10 @@ public class PeerConnection {
}
}
public boolean isOpen() {
return socket != null && socket.isConnected() && !socket.isClosed();
}
private class Connector extends Thread {
private final InetAddress ipAddr;
private final SocketConnectedCallback callback;

View file

@ -4,10 +4,11 @@ import fr.insa.clavardator.chat.Message;
import fr.insa.clavardator.network.PeerConnection;
import fr.insa.clavardator.util.ErrorCallback;
import java.io.EOFException;
import java.net.InetAddress;
import java.net.Socket;
@Deprecated
public class ActiveUser extends PeerUser {
private final transient PeerConnection connection;
@ -20,7 +21,7 @@ public class ActiveUser extends PeerUser {
* @param callback The function to call on success, with the new ActiveUser as parameter
* @param errorCallback The function to call on socket error
*/
public static void create(InetAddress ipAddr, UserCreatedCallback callback, ErrorCallback errorCallback) {
public static void create(InetAddress ipAddr, UserConnectedCallback callback, ErrorCallback errorCallback) {
// Connect to the peer
new PeerConnection(ipAddr, (thisConnection) -> {
@ -34,7 +35,7 @@ public class ActiveUser extends PeerUser {
assert msg instanceof UserInformation;
UserInformation userInfo = (UserInformation) msg;
ActiveUser user = new ActiveUser(userInfo.getId(), userInfo.getUsername(), thisConnection, errorCallback);
callback.onUserCreated(user);
callback.onUserConnected();
}, errorCallback);
}, errorCallback);
@ -47,7 +48,7 @@ public class ActiveUser extends PeerUser {
* @param callback The function to call on success, with the new ActiveUser as parameter
* @param errorCallback The function to call on socket error
*/
public static void create(Socket socket, UserCreatedCallback callback, ErrorCallback errorCallback) {
public static void create(Socket socket, UserConnectedCallback callback, ErrorCallback errorCallback) {
PeerConnection connection = new PeerConnection(socket);
// Send our username
@ -60,7 +61,7 @@ public class ActiveUser extends PeerUser {
assert msg instanceof UserInformation;
UserInformation userInfo = (UserInformation) msg;
ActiveUser user = new ActiveUser(userInfo.getId(), userInfo.getUsername(), connection, errorCallback);
callback.onUserCreated(user);
callback.onUserConnected();
}, errorCallback);
}
@ -102,11 +103,14 @@ public class ActiveUser extends PeerUser {
history.addMessage((Message) msg);
}
},
errorCallback);
e -> {
if (e.getClass().isInstance(EOFException.class)) {
// TODO: notify disconnection
} else {
errorCallback.onError(e);
}
});
}
public interface UserCreatedCallback {
void onUserCreated(ActiveUser user);
}
}

View file

@ -1,8 +1,17 @@
package fr.insa.clavardator.users;
import fr.insa.clavardator.chat.ChatHistory;
import fr.insa.clavardator.chat.Message;
import fr.insa.clavardator.network.PeerConnection;
import fr.insa.clavardator.util.ErrorCallback;
import java.io.EOFException;
import java.net.InetAddress;
import java.net.Socket;
public class PeerUser extends User {
private State state = State.DISCONNECTED;
private transient PeerConnection connection;
protected transient ChatHistory history;
@ -11,6 +20,107 @@ public class PeerUser extends User {
history = new ChatHistory(this);
}
/**
* Asynchronously connects to the user and receives its information (id, username)
*
* @param ipAddr The IP address of this user
* @param callback The function to call on success
* @param errorCallback The function to call on socket error
*/
public void connect(InetAddress ipAddr, UserConnectedCallback callback, ErrorCallback errorCallback) {
// Connect to the peer
pcs.firePropertyChange("state", state, State.CONNECTING);
state = State.CONNECTING;
connection = new PeerConnection(ipAddr, (thisConnection) -> {
init(thisConnection, callback, errorCallback);
}, e -> {
disconnect();
errorCallback.onError(e);
});
}
/**
* Asynchronously connects to the user using the socket and receives its information (id, username)
*
* @param socket A socket already connected to the user
* @param callback The function to call on success, with the new ActiveUser as parameter
* @param errorCallback The function to call on socket error
*/
public void connect(Socket socket, UserConnectedCallback callback, ErrorCallback errorCallback) {
pcs.firePropertyChange("state", state, State.CONNECTING);
state = State.CONNECTING;
connection = new PeerConnection(socket);
init(connection, callback, errorCallback);
}
private void init(PeerConnection thisConnection, UserConnectedCallback callback, ErrorCallback errorCallback) {
// Send our username
String currentUserUsername = CurrentUser.getInstance().getUsername();
int currentUserId = CurrentUser.getInstance().getId();
thisConnection.send(new UserInformation(currentUserId, currentUserUsername), null, e -> {
disconnect();
errorCallback.onError(e);
});
// Receive peer's username
thisConnection.receiveOne(msg -> {
assert msg instanceof UserInformation;
UserInformation userInfo = (UserInformation) msg;
// TODO : Check username unique
assert id == userInfo.getId();
setUsername(userInfo.getUsername());
callback.onUserConnected();
// Subscribe to incoming messages
subscribeToMessages(e -> {
disconnect();
errorCallback.onError(e);
});
// Update state to CONNECTED
pcs.firePropertyChange("state", state, State.CONNECTED);
state = State.CONNECTED;
}, e -> {
disconnect();
errorCallback.onError(e);
});
}
private void subscribeToMessages(ErrorCallback errorCallback) {
connection.receive(
msg -> {
if (msg.getClass().isInstance(UserInformation.class)) {
assert ((UserInformation) msg).getId() == getId();
setUsername(((UserInformation) msg).getUsername());
} else if (msg.getClass().isInstance(Message.class)) {
assert ((Message) msg).getRecipient() != this;
history.addMessage((Message) msg);
}
},
e -> {
if (e.getClass().isInstance(EOFException.class)) {
disconnect();
} else {
errorCallback.onError(e);
}
});
}
public void disconnect() {
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
}
pcs.firePropertyChange("state", state, State.DISCONNECTED);
state = State.DISCONNECTED;
}
/**
* Get the value of history
@ -21,4 +131,14 @@ public class PeerUser extends User {
return history;
}
enum State {
CONNECTING,
CONNECTED,
DISCONNECTED,
}
public interface UserConnectedCallback {
void onUserConnected();
}
}

View file

@ -4,15 +4,18 @@ import fr.insa.clavardator.db.DatabaseController;
import fr.insa.clavardator.network.NetDiscoverer;
import fr.insa.clavardator.util.ErrorCallback;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.function.Predicate;
public class UserList {
private ArrayList<ActiveUser> activeUsers;
private ArrayList<PeerUser> inactiveUsers;
private Map<Integer, PeerUser> inactiveUsers;
private Map<Integer, PeerUser> activeUsers;
private final ArrayList<UserConnectionCallback> activeUsersObservers = new ArrayList<>();
private final ArrayList<UserConnectionCallback> userConnectionObservers = new ArrayList<>();
private final ArrayList<UserDisconnectionCallback> userDisconnectionObservers = new ArrayList<>();
private final DatabaseController db = new DatabaseController();
private final NetDiscoverer netDiscoverer = new NetDiscoverer();
@ -24,23 +27,45 @@ public class UserList {
* Discover all active users over the network. Observers are notified for each user discovered.
*
* @param errorCallback The function to call on error
* @see UserList#addActiveUserConnectedObserver(UserConnectionCallback)
* @see UserList#addActiveUsersObserver(UserConnectionCallback, UserDisconnectionCallback)
*/
public void discoverActiveUsers(ErrorCallback errorCallback) {
netDiscoverer.discoverActiveUsers("CLAVARDATOR_BROADCAST", (ipAddr, data) -> {
ActiveUser.create(ipAddr, user -> {
activeUsers.add(user);
notifyObservers(user);
int id = getIdFromIp(ipAddr);
PeerUser user = inactiveUsers.get(id);
if (user == null) {
user = new PeerUser(id, "");
}
PeerUser finalUser = user;
user.connect(ipAddr, () -> {
notifyConnectionObservers(finalUser);
}, errorCallback);
}, errorCallback);
}
public void addActiveUserConnectedObserver(UserConnectionCallback callback) {
activeUsersObservers.add(callback);
private int getIdFromIp(InetAddress ipAddr) {
return ipAddr.hashCode();
}
private void notifyObservers(ActiveUser user) {
activeUsersObservers.forEach(callback -> callback.onUserConnected(user));
public void addActiveUser(ActiveUser user) {
}
public void addActiveUsersObserver(UserConnectionCallback connectionCallback, UserDisconnectionCallback disconnectionCallback) {
userConnectionObservers.add(connectionCallback);
userDisconnectionObservers.add(disconnectionCallback);
}
private void notifyConnectionObservers(PeerUser user) {
userConnectionObservers.forEach(callback -> callback.onUserConnected(user));
}
private void notifyDisconnectionObservers(PeerUser user) {
userDisconnectionObservers.forEach(callback -> callback.onUserDisconnected(user));
}
/**
@ -51,8 +76,8 @@ public class UserList {
*/
public boolean isUsernameAvailable(String username) {
Predicate<User> usernameEqual = user -> user.getUsername().equals(username);
return activeUsers.stream().noneMatch(usernameEqual) &&
inactiveUsers.stream().noneMatch(usernameEqual);
return activeUsers.values().stream().noneMatch(usernameEqual) &&
inactiveUsers.values().stream().noneMatch(usernameEqual);
}
/**
@ -71,21 +96,25 @@ public class UserList {
public void destroy() {
netDiscoverer.stopDiscovery();
// db.close();
for (ActiveUser user : activeUsers) {
user.destroy();
for (PeerUser user : activeUsers.values()) {
user.disconnect();
}
}
public ArrayList<ActiveUser> getActiveUsers() {
public Map<Integer, PeerUser> getActiveUsers() {
return activeUsers;
}
public ArrayList<PeerUser> getInactiveUsers() {
public Map<Integer, PeerUser> getInactiveUsers() {
return inactiveUsers;
}
interface UserConnectionCallback {
void onUserConnected(ActiveUser user);
void onUserConnected(PeerUser user);
}
interface UserDisconnectionCallback {
void onUserDisconnected(PeerUser user);
}
}