Skip to content

Reference

Federated Models

A boilerplate code for creating Federated Model

FederatedModel

This class is used to encapsulate the (PyTorch) federated model that we will train. It accepts only the PyTorch models and provides a utility functions to initialize the model, retrieve the weights or perform an indicated number of traning epochs.

Source code in FedJust\model\federated_model.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class FederatedModel:
    """This class is used to encapsulate the (PyTorch) federated model that
    we will train. It accepts only the PyTorch models and 
    provides a utility functions to initialize the model, 
    retrieve the weights or perform an indicated number of traning
    epochs.
    """
    def __init__(
        self,
        net: torch.nn.Module,
        optimizer_template: partial,
        loader_batch_size: int,
        force_cpu: bool = False) -> None:
        """Initialize the Federated Model. This model will be attached to a 
        specific client and will wait for further instructionss

        Parameters
        ----------
        net: nn.Module 
            The Neural Network architecture that we want to use.
        optimizer_template: functools.partial
            The partial function of the optimizer that will be used as a template
        node_name: int | str
            The name of the node
        loader_batch_size: int
            Batch size of the trainloader and testloader.
        force_gpu: bool = False
            Option to force the calculations on cpu even if the gpu is available.

        Returns
        -------
        None
        """
        # Primary computation device: GPU or CPU
        if force_cpu:
            self.device = torch.device('cpu')
        else:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Second computaiton device to offload parameters: CPU
        self.cpu = torch.device("cpu")

        self.initial_model = None
        self.optimizer = None  
        self.net = copy.deepcopy(net)
        self.node_name = None
        self.batch_size = loader_batch_size
        # List containing all the parameters to update
        params_to_update = []
        for _, param in self.net.named_parameters():
            if param.requires_grad is True:
                params_to_update.append(param)
        self.optimizer = optimizer_template(params_to_update)


    def attach_dataset_id(
        self,
        local_dataset: list[datasets.arrow_dataset.Dataset, datasets.arrow_dataset.Dataset] | list[datasets.arrow_dataset.Dataset],
        node_name: int | str,
        only_test: bool = False,
        hugging_face_map: bool = True
        ) -> None:
        """Attaches huggingface dataset to the model by firstly converting it into a pytorch-appropiate standard.

        Parameters
        ----------
        local_dataset: list[datasets.arrow_dataset.Dataset] 
            A local dataset that should be loaded into DataLoader
        node_name: int | str
            The name of the node attributed to particular dataset
        only_test: bool [default to False]: 
            If true, only a test set will be returned
        batch_size: int [default to 32]:
            Batch size used in test and train loader
        higging_face_map: bool [default to True]:
            If set to True, will use hugging face map function,
            that takes more time to process but results in a more
            stable and reversable transformation of the results.
        Returns
        -------------
        None
        """
        self.node_name = node_name
        if only_test == False:
            if hugging_face_map:
                convert_tensor = transforms.ToTensor()
                local_dataset[0] = local_dataset[0].map(lambda sample: {"image": convert_tensor(sample['image'])})
                local_dataset[0].set_format("pt", columns=["image"], output_all_columns=True)

                local_dataset[1] = local_dataset[1].map(lambda sample: {"image": convert_tensor(sample['image'])})
                local_dataset[1].set_format("pt", columns=["image"], output_all_columns=True)
            else:
                local_dataset[0] = local_dataset[0].with_transform(self.transform_func)
                local_dataset[1] = local_dataset[1].with_transform(self.transform_func)
            self.trainloader = torch.utils.data.DataLoader(
                local_dataset[0],
                batch_size=self.batch_size,
                shuffle=True,
                num_workers=0,
            )
            self.testloader = torch.utils.data.DataLoader(
                local_dataset[1],
                batch_size=self.batch_size,
                shuffle=False,
                num_workers=0,
            )
        else:
            if hugging_face_map:
                convert_tensor = transforms.ToTensor()
                local_dataset[0] = local_dataset[0].map(lambda sample: {"image": convert_tensor(sample['image'])})
                local_dataset[0].set_format("pt", columns=["image"], output_all_columns=True)
            else:
                local_dataset[0] = local_dataset[0].with_transform(self.transform_func)
            self.testloader = torch.utils.data.DataLoader(
                local_dataset[0],
                batch_size=self.batch_size,
                shuffle=False,
                num_workers=0,
            )


    # def print_model_footprint(self) -> None:
    #     """Prints all the information about the model..
    #     Args:
    #     """
    #     unique_hash = hash(next(iter(self.trainloader))['image'] + self.node_name)

    #     string = f"""
    #     model id: {self.node_name}
    #     device: {self.device},
    #     optimizer: {self.optimizer},
    #     unique hash: {unique_hash}
    #     """
    #     return (self.node_name, self.device, self.optimizer, unique_hash)

    #     # num_examples = {
    #     #     "trainset": len(self.training_set),
    #     #     "testset": len(self.test_set),
    #     # }
    #     # targets = []
    #     # for _, data in enumerate(trainloader, 0):
    #     #     targets.append(data[1])
    #     # targets = [item.item() for sublist in targets for item in sublist]
    #     # model_logger.info(f"{self.node_name}, {Counter(targets)}")
    #     # model_logger.info(f"{self.node_name}: Training set size: {num_examples['trainset']}")
    #     # model_logger.info(f"{self.node_name}: Test set size: {num_examples['testset']}")


    # def get_weights_list(self) -> list[float]:
    #     """Get the parameters of the network.

    #     Parameters
    #     ----------

    #     Returns
    #     -------
    #     List[float]: parameters of the network
    #     """
    #     return [val.cpu().numpy() for _, val in self.net.state_dict().items()]


    def get_weights(self) -> None:
        """Get the weights of the network.

        Parameters
        ----------

        Raises
        -------------
            Exception: if the model is not initialized it raises an exception.

        Returns
        -------------
            _type_: weights of the network
        """
        self.net.to(self.cpu) # Dupming weights on cpu.
        return copy.deepcopy(self.net.state_dict())


    def get_gradients(self) -> None:
        """Get the gradients of the network (differences between received and trained model)

        Parameters
        ----------

        Raises
        -------------
            Exception: if the original model was not preserved.

        Returns
        -------------
            Oredered_Dict: Gradients of the network.
        """
        assert self.initial_model != None, "Computing gradients require saving initial model first!"
        self.net.to(self.cpu) # Dupming weights on cpu.
        self.initial_model.to(self.cpu)
        weights_trained = self.net.state_dict()
        weights_initial = self.initial_model.state_dict()

        self.gradients = OrderedDict.fromkeys(weights_trained.keys(), 0)
        for key in weights_trained:
            self.gradients[key] =  weights_trained[key] - weights_initial[key]

        return self.gradients # Try: to provide original weights, no copies


    def update_weights(
        self, 
        avg_tensors
        ) -> None:
        """Updates the weights of the network stored on client with passed tensors.

        Parameters
        ----------
        avg_tensors: Ordered_Dict
            An Ordered Dictionary containing a averaged tensors

        Raises
        ------
        Exception: _description_

        Returns
        -------
        None
        """
        self.net.load_state_dict(copy.deepcopy(avg_tensors), strict=True)


    def store_model_on_disk(
        self,
        iteration: int,
        path: str
        ) -> None:
        """Saves local model in a .pt format.
        Parameters
        ----------
        Iteration: int
            Current iteration
        Path: str
            Path to the saved repository

        Returns: 
        -------
        None

        Raises
        -------
            Exception if the model is not initialized it raises an exception
        """
        name = f"node_{self.node_name}_iteration_{iteration}.pt"
        save_path = os.path.join(path, name)
        torch.save(
            self.net.state_dict(),
            save_path,
        )


    def preserve_initial_model(self) -> None:
        """Preserve the initial model provided at the
        end of the turn (necessary for computing gradients,
        when using aggregating methods such as FedOpt).

        Parameters
        ----------

        Returns
        -------
            Tuple[float, float]: Loss and accuracy on the training set.
        """
        self.initial_model = copy.deepcopy(self.net)


    def train(
        self,
        iteration: int,
        epoch: int
        ) -> tuple[float, torch.tensor]:
        """Train the network and computes loss and accuracy.

        Parameters
        ----------
        iterations: int 
            Current iteration
        epoch: int
            Current (local) epoch

        Returns
        -------
        None
        """
        criterion = torch.nn.CrossEntropyLoss()
        train_loss = 0
        correct = 0
        total = 0
        # Try: to place a net on the device during the training stage
        self.net.to(self.device)
        self.net.train()
        for _, dic in enumerate(self.trainloader):
            inputs = dic['image']
            targets = dic['label']
            inputs, targets = inputs.to(self.device), targets.to(self.device)
            self.net.zero_grad() # Zero grading the network                        
            # forward pass, backward pass and optimization
            outputs = self.net(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            # Emptying the cuda_cache
            # if torch.cuda.is_available():
            #     torch.cuda.empty_cache()

        loss = train_loss / len(self.trainloader)
        accuracy = correct / total
        model_logger.info(f"[ITERATION {iteration} | EPOCH {epoch} | NODE {self.node_name}] Training on {self.node_name} results: loss: {loss}, accuracy: {accuracy}")

        return (loss, 
                accuracy)


    def evaluate_model(self) -> tuple[float, float, float, float, float, list]:
        """Validate the network on the local test set.

        Parameters
        ----------

        Returns
        -------
            Tuple[float, float]: loss and accuracy on the test set.
        """
        # Try: to place net on device directly during the evaluation stage.
        self.net.to(self.device)
        self.net.eval()
        evaluation_results = {}
        criterion = torch.nn.CrossEntropyLoss()
        test_loss = 0
        correct = 0
        total = 0
        y_pred = []
        y_true = []
        losses = []

        with torch.no_grad():
            for _, dic in enumerate(self.testloader):
                inputs = dic['image']
                targets = dic['label']
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                output = self.net(inputs)

                total += targets.size(0)
                test_loss = criterion(output, targets).item()
                losses.append(test_loss)
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(targets.view_as(pred)).sum().item()
                y_pred.append(pred)
                y_true.append(targets)

        evaluation_results['test_loss'] = np.mean(losses)
        evaluation_results['accuracy'] = correct / total

        y_true = [item.item() for sublist in y_true for item in sublist]
        y_pred = [item.item() for sublist in y_pred for item in sublist]

        evaluation_results['f1score'] = f1_score(y_true, y_pred, average="macro")
        evaluation_results['precision'] = precision_score(y_true, y_pred, average="macro")
        evaluation_results['recall'] = recall_score(y_true, y_pred, average="macro")

        cm = confusion_matrix(y_true, y_pred, labels=[i for i in range(10)])
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
        accuracy_per_class = cm.diagonal()
        accuracy_per_class_expanded = {
            f'accuracy_per_{class_id}': value for class_id, value in enumerate(accuracy_per_class)
        }
        evaluation_results.update(accuracy_per_class_expanded)

        # true_positives = np.diag(cm)
        # num_classes = len(list(set(y_true)))

        # false_positives = []
        # for i in range(num_classes):
        #     false_positives.append(sum(cm[:,i]) - cm[i,i])

        # false_negatives = []
        # for i in range(num_classes):
        #     false_negatives.append(sum(cm[i,:]) - cm[i,i])

        # true_negatives = []
        # for i in range(num_classes):
        #     temp = np.delete(cm, i, 0)   # delete ith row
        #     temp = np.delete(temp, i, 1)  # delete ith column
        #     true_negatives.append(sum(sum(temp)))

        # denominator = [sum(x) for x in zip(false_positives, true_negatives)]
        # false_positive_rate = [num/den for num, den in zip(false_positives, denominator)]

        # denominator = [sum(x) for x in zip(true_positives, false_negatives)]
        # true_positive_rate = [num/den for num, den in zip(true_positives, denominator)]

        # # # Emptying the cuda_cache
        # # if torch.cuda.is_available():
        # #     torch.cuda.empty_cache()

        return evaluation_results


    # def quick_evaluate(self) -> tuple[float, float]:
    #     """Quicker version of the evaluate_model(function) 
    #     Validate the network on the local test set returning only the loss and accuracy.

    #     Parameters
    #     ----------

    #     Returns
    #     -------
    #         Tuple[float, float]: loss and accuracy on the test set.
    #     """
    #     # Try: to place net on device directly during the evaluation stage.
    #     self.net.to(self.device)
    #     criterion = nn.CrossEntropyLoss()
    #     test_loss = 0
    #     correct = 0
    #     total = 0

    #     with torch.no_grad():
    #         for _, dic in enumerate(self.testloader):
    #             inputs = dic['image']
    #             targets = dic['label']
    #             inputs, targets = inputs.to(self.device), targets.to(self.device)
    #             outputs = self.net(inputs)
    #             loss = criterion(outputs, targets)

    #             test_loss += loss.item()
    #             _, predicted = outputs.max(1)
    #             total += targets.size(0)
    #             correct += predicted.eq(targets).sum().item()

    #     test_loss = test_loss / len(self.testloader)
    #     accuracy = correct / total

    #     # # Emptying the cuda_cache
    #     # if torch.cuda.is_available():
    #     #     torch.cuda.empty_cache()

    #     return (
    #         test_loss,
    #         accuracy
    #         )


    def transform_func(
        self,
        data: datasets.arrow_dataset.Dataset
        ) -> None:
        """ Convers datasets.arrow_dataset.Dataset into a PyTorch Tensor
        Parameters
        ----------
        local_dataset: datasets.arrow_dataset.Dataset
            A local dataset that should be loaded into DataLoader
        only_test: bool [default to False]: 
            If true, only a test set will be returned

        Returns
        -------------
        None"""
        convert_tensor = transforms.ToTensor()
        data['image'] = [convert_tensor(img) for img in data['image']]
        return data

__init__(net, optimizer_template, loader_batch_size, force_cpu=False)

Initialize the Federated Model. This model will be attached to a specific client and will wait for further instructionss

Parameters

net: nn.Module The Neural Network architecture that we want to use. optimizer_template: functools.partial The partial function of the optimizer that will be used as a template node_name: int | str The name of the node loader_batch_size: int Batch size of the trainloader and testloader. force_gpu: bool = False Option to force the calculations on cpu even if the gpu is available.

Returns

None

Source code in FedJust\model\federated_model.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    net: torch.nn.Module,
    optimizer_template: partial,
    loader_batch_size: int,
    force_cpu: bool = False) -> None:
    """Initialize the Federated Model. This model will be attached to a 
    specific client and will wait for further instructionss

    Parameters
    ----------
    net: nn.Module 
        The Neural Network architecture that we want to use.
    optimizer_template: functools.partial
        The partial function of the optimizer that will be used as a template
    node_name: int | str
        The name of the node
    loader_batch_size: int
        Batch size of the trainloader and testloader.
    force_gpu: bool = False
        Option to force the calculations on cpu even if the gpu is available.

    Returns
    -------
    None
    """
    # Primary computation device: GPU or CPU
    if force_cpu:
        self.device = torch.device('cpu')
    else:
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Second computaiton device to offload parameters: CPU
    self.cpu = torch.device("cpu")

    self.initial_model = None
    self.optimizer = None  
    self.net = copy.deepcopy(net)
    self.node_name = None
    self.batch_size = loader_batch_size
    # List containing all the parameters to update
    params_to_update = []
    for _, param in self.net.named_parameters():
        if param.requires_grad is True:
            params_to_update.append(param)
    self.optimizer = optimizer_template(params_to_update)

attach_dataset_id(local_dataset, node_name, only_test=False, hugging_face_map=True)

Attaches huggingface dataset to the model by firstly converting it into a pytorch-appropiate standard.

Parameters

local_dataset: list[datasets.arrow_dataset.Dataset] A local dataset that should be loaded into DataLoader node_name: int | str The name of the node attributed to particular dataset only_test: bool [default to False]: If true, only a test set will be returned batch_size: int [default to 32]: Batch size used in test and train loader higging_face_map: bool [default to True]: If set to True, will use hugging face map function, that takes more time to process but results in a more stable and reversable transformation of the results. Returns


None

Source code in FedJust\model\federated_model.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def attach_dataset_id(
    self,
    local_dataset: list[datasets.arrow_dataset.Dataset, datasets.arrow_dataset.Dataset] | list[datasets.arrow_dataset.Dataset],
    node_name: int | str,
    only_test: bool = False,
    hugging_face_map: bool = True
    ) -> None:
    """Attaches huggingface dataset to the model by firstly converting it into a pytorch-appropiate standard.

    Parameters
    ----------
    local_dataset: list[datasets.arrow_dataset.Dataset] 
        A local dataset that should be loaded into DataLoader
    node_name: int | str
        The name of the node attributed to particular dataset
    only_test: bool [default to False]: 
        If true, only a test set will be returned
    batch_size: int [default to 32]:
        Batch size used in test and train loader
    higging_face_map: bool [default to True]:
        If set to True, will use hugging face map function,
        that takes more time to process but results in a more
        stable and reversable transformation of the results.
    Returns
    -------------
    None
    """
    self.node_name = node_name
    if only_test == False:
        if hugging_face_map:
            convert_tensor = transforms.ToTensor()
            local_dataset[0] = local_dataset[0].map(lambda sample: {"image": convert_tensor(sample['image'])})
            local_dataset[0].set_format("pt", columns=["image"], output_all_columns=True)

            local_dataset[1] = local_dataset[1].map(lambda sample: {"image": convert_tensor(sample['image'])})
            local_dataset[1].set_format("pt", columns=["image"], output_all_columns=True)
        else:
            local_dataset[0] = local_dataset[0].with_transform(self.transform_func)
            local_dataset[1] = local_dataset[1].with_transform(self.transform_func)
        self.trainloader = torch.utils.data.DataLoader(
            local_dataset[0],
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=0,
        )
        self.testloader = torch.utils.data.DataLoader(
            local_dataset[1],
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=0,
        )
    else:
        if hugging_face_map:
            convert_tensor = transforms.ToTensor()
            local_dataset[0] = local_dataset[0].map(lambda sample: {"image": convert_tensor(sample['image'])})
            local_dataset[0].set_format("pt", columns=["image"], output_all_columns=True)
        else:
            local_dataset[0] = local_dataset[0].with_transform(self.transform_func)
        self.testloader = torch.utils.data.DataLoader(
            local_dataset[0],
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=0,
        )

evaluate_model()

Validate the network on the local test set.

Parameters
Returns
Tuple[float, float]: loss and accuracy on the test set.
Source code in FedJust\model\federated_model.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def evaluate_model(self) -> tuple[float, float, float, float, float, list]:
    """Validate the network on the local test set.

    Parameters
    ----------

    Returns
    -------
        Tuple[float, float]: loss and accuracy on the test set.
    """
    # Try: to place net on device directly during the evaluation stage.
    self.net.to(self.device)
    self.net.eval()
    evaluation_results = {}
    criterion = torch.nn.CrossEntropyLoss()
    test_loss = 0
    correct = 0
    total = 0
    y_pred = []
    y_true = []
    losses = []

    with torch.no_grad():
        for _, dic in enumerate(self.testloader):
            inputs = dic['image']
            targets = dic['label']
            inputs, targets = inputs.to(self.device), targets.to(self.device)
            output = self.net(inputs)

            total += targets.size(0)
            test_loss = criterion(output, targets).item()
            losses.append(test_loss)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(targets.view_as(pred)).sum().item()
            y_pred.append(pred)
            y_true.append(targets)

    evaluation_results['test_loss'] = np.mean(losses)
    evaluation_results['accuracy'] = correct / total

    y_true = [item.item() for sublist in y_true for item in sublist]
    y_pred = [item.item() for sublist in y_pred for item in sublist]

    evaluation_results['f1score'] = f1_score(y_true, y_pred, average="macro")
    evaluation_results['precision'] = precision_score(y_true, y_pred, average="macro")
    evaluation_results['recall'] = recall_score(y_true, y_pred, average="macro")

    cm = confusion_matrix(y_true, y_pred, labels=[i for i in range(10)])
    cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
    accuracy_per_class = cm.diagonal()
    accuracy_per_class_expanded = {
        f'accuracy_per_{class_id}': value for class_id, value in enumerate(accuracy_per_class)
    }
    evaluation_results.update(accuracy_per_class_expanded)

    # true_positives = np.diag(cm)
    # num_classes = len(list(set(y_true)))

    # false_positives = []
    # for i in range(num_classes):
    #     false_positives.append(sum(cm[:,i]) - cm[i,i])

    # false_negatives = []
    # for i in range(num_classes):
    #     false_negatives.append(sum(cm[i,:]) - cm[i,i])

    # true_negatives = []
    # for i in range(num_classes):
    #     temp = np.delete(cm, i, 0)   # delete ith row
    #     temp = np.delete(temp, i, 1)  # delete ith column
    #     true_negatives.append(sum(sum(temp)))

    # denominator = [sum(x) for x in zip(false_positives, true_negatives)]
    # false_positive_rate = [num/den for num, den in zip(false_positives, denominator)]

    # denominator = [sum(x) for x in zip(true_positives, false_negatives)]
    # true_positive_rate = [num/den for num, den in zip(true_positives, denominator)]

    # # # Emptying the cuda_cache
    # # if torch.cuda.is_available():
    # #     torch.cuda.empty_cache()

    return evaluation_results

get_gradients()

Get the gradients of the network (differences between received and trained model)

Parameters
Raises
Exception: if the original model was not preserved.
Returns
Oredered_Dict: Gradients of the network.
Source code in FedJust\model\federated_model.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_gradients(self) -> None:
    """Get the gradients of the network (differences between received and trained model)

    Parameters
    ----------

    Raises
    -------------
        Exception: if the original model was not preserved.

    Returns
    -------------
        Oredered_Dict: Gradients of the network.
    """
    assert self.initial_model != None, "Computing gradients require saving initial model first!"
    self.net.to(self.cpu) # Dupming weights on cpu.
    self.initial_model.to(self.cpu)
    weights_trained = self.net.state_dict()
    weights_initial = self.initial_model.state_dict()

    self.gradients = OrderedDict.fromkeys(weights_trained.keys(), 0)
    for key in weights_trained:
        self.gradients[key] =  weights_trained[key] - weights_initial[key]

    return self.gradients # Try: to provide original weights, no copies

get_weights()

Get the weights of the network.

Parameters
Raises
Exception: if the model is not initialized it raises an exception.
Returns
_type_: weights of the network
Source code in FedJust\model\federated_model.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def get_weights(self) -> None:
    """Get the weights of the network.

    Parameters
    ----------

    Raises
    -------------
        Exception: if the model is not initialized it raises an exception.

    Returns
    -------------
        _type_: weights of the network
    """
    self.net.to(self.cpu) # Dupming weights on cpu.
    return copy.deepcopy(self.net.state_dict())

preserve_initial_model()

Preserve the initial model provided at the end of the turn (necessary for computing gradients, when using aggregating methods such as FedOpt).

Parameters
Returns
Tuple[float, float]: Loss and accuracy on the training set.
Source code in FedJust\model\federated_model.py
277
278
279
280
281
282
283
284
285
286
287
288
289
def preserve_initial_model(self) -> None:
    """Preserve the initial model provided at the
    end of the turn (necessary for computing gradients,
    when using aggregating methods such as FedOpt).

    Parameters
    ----------

    Returns
    -------
        Tuple[float, float]: Loss and accuracy on the training set.
    """
    self.initial_model = copy.deepcopy(self.net)

store_model_on_disk(iteration, path)

Saves local model in a .pt format. Parameters


Iteration: int Current iteration Path: str Path to the saved repository

Returns:

None

Raises
Exception if the model is not initialized it raises an exception
Source code in FedJust\model\federated_model.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def store_model_on_disk(
    self,
    iteration: int,
    path: str
    ) -> None:
    """Saves local model in a .pt format.
    Parameters
    ----------
    Iteration: int
        Current iteration
    Path: str
        Path to the saved repository

    Returns: 
    -------
    None

    Raises
    -------
        Exception if the model is not initialized it raises an exception
    """
    name = f"node_{self.node_name}_iteration_{iteration}.pt"
    save_path = os.path.join(path, name)
    torch.save(
        self.net.state_dict(),
        save_path,
    )

train(iteration, epoch)

Train the network and computes loss and accuracy.

Parameters

iterations: int Current iteration epoch: int Current (local) epoch

Returns

None

Source code in FedJust\model\federated_model.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def train(
    self,
    iteration: int,
    epoch: int
    ) -> tuple[float, torch.tensor]:
    """Train the network and computes loss and accuracy.

    Parameters
    ----------
    iterations: int 
        Current iteration
    epoch: int
        Current (local) epoch

    Returns
    -------
    None
    """
    criterion = torch.nn.CrossEntropyLoss()
    train_loss = 0
    correct = 0
    total = 0
    # Try: to place a net on the device during the training stage
    self.net.to(self.device)
    self.net.train()
    for _, dic in enumerate(self.trainloader):
        inputs = dic['image']
        targets = dic['label']
        inputs, targets = inputs.to(self.device), targets.to(self.device)
        self.net.zero_grad() # Zero grading the network                        
        # forward pass, backward pass and optimization
        outputs = self.net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        self.optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        # Emptying the cuda_cache
        # if torch.cuda.is_available():
        #     torch.cuda.empty_cache()

    loss = train_loss / len(self.trainloader)
    accuracy = correct / total
    model_logger.info(f"[ITERATION {iteration} | EPOCH {epoch} | NODE {self.node_name}] Training on {self.node_name} results: loss: {loss}, accuracy: {accuracy}")

    return (loss, 
            accuracy)

transform_func(data)

Convers datasets.arrow_dataset.Dataset into a PyTorch Tensor Parameters


local_dataset: datasets.arrow_dataset.Dataset A local dataset that should be loaded into DataLoader only_test: bool [default to False]: If true, only a test set will be returned

Returns

None

Source code in FedJust\model\federated_model.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def transform_func(
    self,
    data: datasets.arrow_dataset.Dataset
    ) -> None:
    """ Convers datasets.arrow_dataset.Dataset into a PyTorch Tensor
    Parameters
    ----------
    local_dataset: datasets.arrow_dataset.Dataset
        A local dataset that should be loaded into DataLoader
    only_test: bool [default to False]: 
        If true, only a test set will be returned

    Returns
    -------------
    None"""
    convert_tensor = transforms.ToTensor()
    data['image'] = [convert_tensor(img) for img in data['image']]
    return data

update_weights(avg_tensors)

Updates the weights of the network stored on client with passed tensors.

Parameters

avg_tensors: Ordered_Dict An Ordered Dictionary containing a averaged tensors

Raises

Exception: description

Returns

None

Source code in FedJust\model\federated_model.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def update_weights(
    self, 
    avg_tensors
    ) -> None:
    """Updates the weights of the network stored on client with passed tensors.

    Parameters
    ----------
    avg_tensors: Ordered_Dict
        An Ordered Dictionary containing a averaged tensors

    Raises
    ------
    Exception: _description_

    Returns
    -------
    None
    """
    self.net.load_state_dict(copy.deepcopy(avg_tensors), strict=True)

Federated Nodes

A boilerplate code for creating Federated Node

FederatedNode

Source code in FedJust\node\federated_node.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class FederatedNode:
    def __init__(self) -> None:
        """An abstract object representing a single node in the federated training.

        Parameters
        ----------
        None
        Returns
        -------
        None
        """
        self.node_id = None
        self.model = None


    def connect_data_id(
        self,
        node_id: int | str,
        model: FederatedModel,
        data: Any,
        orchestrator: bool = False
    ) -> None:
        """Attaches dataset and id to a node, creating an individualised version.

        Parameters
        ----------
        node_id: int | str
            ID of the node
        model: FederatedModel
            FederatedModel template that should be copied and attached to the node
        data: Any
            Data to be attached to a FederatedModel.
        orchestrator: bool (default to False)
            Boolean flag if the node is belonging to the orchestrator
        Returns
        -------
        None
        """
        self.node_id = node_id
        self.model = copy.deepcopy(model)
        self.model.attach_dataset_id(
            local_dataset=data,
            node_name=self.node_id,
            only_test=orchestrator)


    def get_weights(self) -> OrderedDict:
        """Extended API call to recover model weights. Causes the same effect as calling
        node.model.get_weigts()
        Parameters
        ----------
        None
        Returns
        OrderedDict
        None
        """
        return self.model.get_weights()


    def update_weights(self,
                       weights: OrderedDict) -> None:
        """Extended API call to update model. Causes the same effect as calling
        node.model.update_weights()
        Parameters
        ----------
        weights:OrderedDict
            OrderedDict to be uploaded onto the model.
        Returns
        None
        """
        self.model.update_weights(weights)


    def train_local_model(self,
                          iteration: int,
                          local_epochs: int,
                          mode: str = 'weights',
                          save_model: bool = False,
                          save_path: str = None) -> tuple[int, OrderedDict, list[float], list[float]]:
        """This function starts the server phase of the federated learning.
        In particular, it trains the model locally and then sends the weights.
        Then the updated weights are received and used to update
        the global model.

        Parameters
        ----------
        node: FederatedNode 
            Node that we want to train.
        iteration: int
            Current global iteration
        local_epochs: int
            Number of local epochs for which the node should be training.
        mode: str (default to 'weights')
            Mode = 'weights': Node will return model's weights.
            Mode = 'gradients': Node will return model's gradients.
        save_model: bool (default to False)
            Boolean flag to save a model.
        save_path: str (defualt to None)
            Path object used to save a model

        Returns
        -------
            Tuple[int, OrderedDict, List[float], List[float]]:
        """
        node_logger.info(f"[ITERATION {iteration} | NODE {self.node_id}] Starting training on node {self.node_id}")
        loss_list: list[float] = []
        accuracy_list: list[float] = []

        # If mode is set to "gradients" -> preserve a local model to calculate gradients
        if mode == 'gradients':
            self.model.preserve_initial_model()

        for epoch in range(local_epochs):
            metrics = self.local_training(
                iteration=iteration, 
                epoch=epoch
                )
            loss_list.append(metrics["loss"])
            accuracy_list.append(metrics["accuracy"])
        if save_model:
            self.model.store_model_on_disk(
                iteration=iteration, 
                path=save_path
                )
        node_logger.debug(f"[ITERATION {iteration} | NODE {self.node_id}] Results of training on node {self.node_id}: {accuracy_list}")

        if mode == 'weights':
            return (
                self.node_id,
                self.model.get_weights(),
                loss_list,
                accuracy_list
                )
        elif mode == 'gradients':
            return (
                self.node_id,
                self.model.get_gradients(),
                loss_list,
                accuracy_list
                )
        else:
            raise NameError()


    def local_training(self,
                       iteration: int,
                       epoch: int
                       ) -> dict[int, int]:
        """Helper method for performing one epoch of local training.
        Performs one round of Federated Training and pack the
        results (metrics) into the appropiate data structure.

        Parameters
        ----------

        Returns
        -------
            dict[int, int]: metrics from the training.
        """
        loss, accuracy = self.model.train(iteration=iteration, epoch=epoch)
        return {"loss": loss, "accuracy": accuracy}

__init__()

An abstract object representing a single node in the federated training.

Parameters

None Returns


None

Source code in FedJust\node\federated_node.py
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self) -> None:
    """An abstract object representing a single node in the federated training.

    Parameters
    ----------
    None
    Returns
    -------
    None
    """
    self.node_id = None
    self.model = None

connect_data_id(node_id, model, data, orchestrator=False)

Attaches dataset and id to a node, creating an individualised version.

Parameters

node_id: int | str ID of the node model: FederatedModel FederatedModel template that should be copied and attached to the node data: Any Data to be attached to a FederatedModel. orchestrator: bool (default to False) Boolean flag if the node is belonging to the orchestrator Returns


None

Source code in FedJust\node\federated_node.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def connect_data_id(
    self,
    node_id: int | str,
    model: FederatedModel,
    data: Any,
    orchestrator: bool = False
) -> None:
    """Attaches dataset and id to a node, creating an individualised version.

    Parameters
    ----------
    node_id: int | str
        ID of the node
    model: FederatedModel
        FederatedModel template that should be copied and attached to the node
    data: Any
        Data to be attached to a FederatedModel.
    orchestrator: bool (default to False)
        Boolean flag if the node is belonging to the orchestrator
    Returns
    -------
    None
    """
    self.node_id = node_id
    self.model = copy.deepcopy(model)
    self.model.attach_dataset_id(
        local_dataset=data,
        node_name=self.node_id,
        only_test=orchestrator)

get_weights()

Extended API call to recover model weights. Causes the same effect as calling node.model.get_weigts() Parameters


None Returns OrderedDict None

Source code in FedJust\node\federated_node.py
67
68
69
70
71
72
73
74
75
76
77
def get_weights(self) -> OrderedDict:
    """Extended API call to recover model weights. Causes the same effect as calling
    node.model.get_weigts()
    Parameters
    ----------
    None
    Returns
    OrderedDict
    None
    """
    return self.model.get_weights()

local_training(iteration, epoch)

Helper method for performing one epoch of local training. Performs one round of Federated Training and pack the results (metrics) into the appropiate data structure.

Parameters
Returns
dict[int, int]: metrics from the training.
Source code in FedJust\node\federated_node.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def local_training(self,
                   iteration: int,
                   epoch: int
                   ) -> dict[int, int]:
    """Helper method for performing one epoch of local training.
    Performs one round of Federated Training and pack the
    results (metrics) into the appropiate data structure.

    Parameters
    ----------

    Returns
    -------
        dict[int, int]: metrics from the training.
    """
    loss, accuracy = self.model.train(iteration=iteration, epoch=epoch)
    return {"loss": loss, "accuracy": accuracy}

train_local_model(iteration, local_epochs, mode='weights', save_model=False, save_path=None)

This function starts the server phase of the federated learning. In particular, it trains the model locally and then sends the weights. Then the updated weights are received and used to update the global model.

Parameters

node: FederatedNode Node that we want to train. iteration: int Current global iteration local_epochs: int Number of local epochs for which the node should be training. mode: str (default to 'weights') Mode = 'weights': Node will return model's weights. Mode = 'gradients': Node will return model's gradients. save_model: bool (default to False) Boolean flag to save a model. save_path: str (defualt to None) Path object used to save a model

Returns
Tuple[int, OrderedDict, List[float], List[float]]:
Source code in FedJust\node\federated_node.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def train_local_model(self,
                      iteration: int,
                      local_epochs: int,
                      mode: str = 'weights',
                      save_model: bool = False,
                      save_path: str = None) -> tuple[int, OrderedDict, list[float], list[float]]:
    """This function starts the server phase of the federated learning.
    In particular, it trains the model locally and then sends the weights.
    Then the updated weights are received and used to update
    the global model.

    Parameters
    ----------
    node: FederatedNode 
        Node that we want to train.
    iteration: int
        Current global iteration
    local_epochs: int
        Number of local epochs for which the node should be training.
    mode: str (default to 'weights')
        Mode = 'weights': Node will return model's weights.
        Mode = 'gradients': Node will return model's gradients.
    save_model: bool (default to False)
        Boolean flag to save a model.
    save_path: str (defualt to None)
        Path object used to save a model

    Returns
    -------
        Tuple[int, OrderedDict, List[float], List[float]]:
    """
    node_logger.info(f"[ITERATION {iteration} | NODE {self.node_id}] Starting training on node {self.node_id}")
    loss_list: list[float] = []
    accuracy_list: list[float] = []

    # If mode is set to "gradients" -> preserve a local model to calculate gradients
    if mode == 'gradients':
        self.model.preserve_initial_model()

    for epoch in range(local_epochs):
        metrics = self.local_training(
            iteration=iteration, 
            epoch=epoch
            )
        loss_list.append(metrics["loss"])
        accuracy_list.append(metrics["accuracy"])
    if save_model:
        self.model.store_model_on_disk(
            iteration=iteration, 
            path=save_path
            )
    node_logger.debug(f"[ITERATION {iteration} | NODE {self.node_id}] Results of training on node {self.node_id}: {accuracy_list}")

    if mode == 'weights':
        return (
            self.node_id,
            self.model.get_weights(),
            loss_list,
            accuracy_list
            )
    elif mode == 'gradients':
        return (
            self.node_id,
            self.model.get_gradients(),
            loss_list,
            accuracy_list
            )
    else:
        raise NameError()

update_weights(weights)

Extended API call to update model. Causes the same effect as calling node.model.update_weights() Parameters


weights:OrderedDict OrderedDict to be uploaded onto the model. Returns None

Source code in FedJust\node\federated_node.py
80
81
82
83
84
85
86
87
88
89
90
91
def update_weights(self,
                   weights: OrderedDict) -> None:
    """Extended API call to update model. Causes the same effect as calling
    node.model.update_weights()
    Parameters
    ----------
    weights:OrderedDict
        OrderedDict to be uploaded onto the model.
    Returns
    None
    """
    self.model.update_weights(weights)

Aggregators

Aggregators serve as a boilerplate for merging the weights derived from local models

Aggregator

Basic class for all Federated Aggregators

Source code in FedJust\aggregators\aggregator.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Aggregator():
    """Basic class for all Federated Aggregators"""
    def __init__(self) -> None:
        pass


    def aggregate_weights(
        self,
        weights: dict[int: OrderedDict]) -> OrderedDict:
        """Basic aggregate function (equal to FedAvg) that returns the aggregate version of the
        weights. Perform deepcopy on the passed parameters.

        Parameters
        ----------
        weights: dict[int: OrderedDict]

        Returns
        -------
        OrderedDict"""

        results = OrderedDict()
        for params in weights.values():
            for key in params:
                if results.get(key) is None:
                    results[key] = copy.deepcopy(params[key]) #Here we could add copy and deepcopy
                else:
                    results[key] += copy.deepcopy(params[key])

        for key in results:
            results[key] = torch.div(results[key], len(weights))
        return results

aggregate_weights(weights)

Basic aggregate function (equal to FedAvg) that returns the aggregate version of the weights. Perform deepcopy on the passed parameters.

Parameters

weights: dict[int: OrderedDict]

Returns

OrderedDict

Source code in FedJust\aggregators\aggregator.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def aggregate_weights(
    self,
    weights: dict[int: OrderedDict]) -> OrderedDict:
    """Basic aggregate function (equal to FedAvg) that returns the aggregate version of the
    weights. Perform deepcopy on the passed parameters.

    Parameters
    ----------
    weights: dict[int: OrderedDict]

    Returns
    -------
    OrderedDict"""

    results = OrderedDict()
    for params in weights.values():
        for key in params:
            if results.get(key) is None:
                results[key] = copy.deepcopy(params[key]) #Here we could add copy and deepcopy
            else:
                results[key] += copy.deepcopy(params[key])

    for key in results:
        results[key] = torch.div(results[key], len(weights))
    return results

Fedopt_Optimizer

Bases: Aggregator

Fedopt Optimizer that performs a generalized version of Federated Averaging. Suitable for performing Federated Optimization based on gradients, with verying learning rates.

Attributes

None

Source code in FedJust\aggregators\fedopt_aggregator.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Fedopt_Optimizer(Aggregator):
    """Fedopt Optimizer that performs a generalized 
    version of Federated Averaging. Suitable for performing
    Federated Optimization based on gradients, with 
    verying learning rates.

    Attributes
    ----------
    None
    """

    def __init__(self) -> None:
        super().__init__()


    def optimize_weights(
        self,
        weights: dict[int:OrderedDict],
        gradients: dict[int: OrderedDict],
        learning_rate : float,
        ) -> OrderedDict:
        """FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the 
        updated version of the weights.

        Parameters
        ----------
        weights: dict[int: OrderedDict]
            Weights of the previous (central) model.
        gradients: dict[int: OrderedDict]
            Gradients (defined as trainedmodel - dispatched model)
        learning_rate: float
            Learning rate used to 

        Returns
        -------
        OrderedDict"""        
        updated_weights = OrderedDict((key, zeros(weights[key].size())) for key in weights.keys())
        for key in weights:
            updated_weights[key] = weights[key] + (learning_rate * (gradients[key]))
        return updated_weights

optimize_weights(weights, gradients, learning_rate)

FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the updated version of the weights.

Parameters

weights: dict[int: OrderedDict] Weights of the previous (central) model. gradients: dict[int: OrderedDict] Gradients (defined as trainedmodel - dispatched model) learning_rate: float Learning rate used to

Returns

OrderedDict

Source code in FedJust\aggregators\fedopt_aggregator.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def optimize_weights(
    self,
    weights: dict[int:OrderedDict],
    gradients: dict[int: OrderedDict],
    learning_rate : float,
    ) -> OrderedDict:
    """FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the 
    updated version of the weights.

    Parameters
    ----------
    weights: dict[int: OrderedDict]
        Weights of the previous (central) model.
    gradients: dict[int: OrderedDict]
        Gradients (defined as trainedmodel - dispatched model)
    learning_rate: float
        Learning rate used to 

    Returns
    -------
    OrderedDict"""        
    updated_weights = OrderedDict((key, zeros(weights[key].size())) for key in weights.keys())
    for key in weights:
        updated_weights[key] = weights[key] + (learning_rate * (gradients[key]))
    return updated_weights

Simulations

Simulation class serve as a boilerplate code for simulating all the actions happening in the decentralised environment

Simulation

Simulation class representing a generic simulation type.

Attributes

model_template : FederatedModel Initialized instance of a Federated Model class that is uploaded to every client. node_template : FederatedNode Initialized instance of a Federated Node class that is used to simulate nodes. data : dict Local data used for the training in a dictionary format, mapping each client to its respective dataset.

Source code in FedJust\simulation\simulation.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class Simulation():
    """Simulation class representing a generic simulation type.

        Attributes
        ----------
        model_template : FederatedModel
            Initialized instance of a Federated Model class that is uploaded to every client.
        node_template : FederatedNode
            Initialized instance of a Federated Node class that is used to simulate nodes.
        data : dict
            Local data used for the training in a dictionary format, mapping each client to its respective dataset.
        """


    def __init__(self, 
                 model_template: FederatedModel,
                 node_template: FederatedNode,
                 seed: int = 42,
                 **kwargs
                 ) -> None:
        """Creating simulation instant requires providing an already created instance of model template
        and node template. Those instances then will be copied n-times to create n different nodes, each
        with a different dataset. Additionally, a data for local nodes should be passed in form of a dictionary,
        maping dataset to each respective client.

        Parameters
        ----------
        model_template : FederatedModel
            Initialized instance of a Federated Model class that will be uploaded to every client.
        node_template : FederatedNode
            Initialized instance of a Federated Node class that will be used to simulate nodes.
        seed : int,
            Seed for the simulation, default to 42
        **kwargs : dict, optional
            Extra arguments to enable selected features of the Orchestrator.
            passing full_debug to **kwargs, allow to enter a full debug mode.

        Returns
        -------
        None
        """
        self.model_template = model_template
        self.node_template = node_template
        self.network = {}
        self.orchestrator_model = None
        self.generator = np.random.default_rng(seed=seed)

    def attach_orchestrator_model(self,
                                  orchestrator_data: Any):
        """Attaches model of the orchestrator that is saved as an instance attribute.

        Parameters
        ----------
        orchestrator_data: Any 
            Orchestrator data that should be attached to the orchestrator model.

        Returns
        -------
        None
        """
        self.orchestrator_model = copy.deepcopy(self.model_template)
        self.orchestrator_model.attach_dataset_id(local_dataset=[orchestrator_data],
                                                  node_name='orchestrator',
                                                  only_test=True)


    def attach_node_model(self,
                          nodes_data: dict):
        """Attaches models of the nodes to the simulation instance.

        Parameters
        ----------
        orchestrator_data: Any
            Orchestrator data that should be attached to nodes models.
        Returns
        -------
        None
        """
        for node_id, data in nodes_data.items():
            self.network[node_id] = copy.deepcopy(self.node_template)
            self.network[node_id].connect_data_id(node_id = node_id,
                                                  model = copy.deepcopy(self.model_template),
                                                  data=data)


    def train_epoch(self,
                    sampled_nodes: dict[int: FederatedNode],
                    iteration: int,
                    local_epochs: int, 
                    mode: str = 'weights',
                    save_model: bool = False,
                    save_path: str = None) -> tuple[dict[int, int, float, float, list, list], dict[int, OrderedDict]]:
        """Performs one training round of a federated learning. Returns training
        results upon completion.

        Parameters
        ----------
        samples_nodes: dict[int: FederatedNode]
            Dictionary containing sampled Federated Nodes
        iteration: int
            Current global iteration of the training process
        local_epochs: int
            Number of local epochs for which the local model should
            be trained.
        mode: str (default to 'weights')
            Mode = 'weights': Node will return model's weights.
            Mode = 'gradients': Node will return model's gradients.
        save_model: bool (default to False)
            Boolean flag to save a model.
        save_path: str (defualt to None)
            Path object used to save a model

        Returns
        -------
        tuple[dict[int, int, float, float, list, list], dict[int, OrderedDict]]
        """
        training_results = {}
        weights = {}
        with Pool(len(sampled_nodes)) as pool:
            results = [pool.apply_async(train_nodes, (node, iteration, local_epochs, mode, save_model, save_path)) for node in list(sampled_nodes.values())]
            for result in results:
                node_id, model_weights, loss_list, accuracy_list = result.get()
                weights[node_id] = model_weights
                training_results[node_id] = {
                    "iteration": iteration,
                    "node_id": node_id,
                    "loss": loss_list[-1], 
                    "accuracy": accuracy_list[-1],
                    "full_loss": loss_list,
                    "full_accuracy": accuracy_list}
        return (training_results, weights)


    def training_protocol(self,
                          iterations: int,
                          sample_size: int,
                          local_epochs: int,
                          aggrgator: Aggregator,
                          metrics_savepath: str,
                          nodes_models_savepath: str,
                          orchestrator_models_savepath: str) -> None:
        """Performs a full federated training according to the initialized
        settings. The train_protocol of the generic_orchestrator.Orchestrator
        follows a classic FedAvg algorithm - it averages the local weights 
        and aggregates them taking a weighted average.
        SOURCE: Communication-Efficient Learning of
        Deep Networks from Decentralized Data, H.B. McMahan et al.

        Parameters
        ----------
        iterations: int
            Number of (global) iterations // epochs to train the models for.
        sample_size: int
            Size of the sample
        local_epochs: int
            Number of local epochs for which the local model should
            be trained.
        aggregator: Aggregator
            Instance of the Aggregator object that will be used to aggregate the result each round
        metrics_savepath: str
            Path for saving the metrics
        nodes_models_savepath: str
            Path for saving the models in the .pt format.
        orchestrator_models_savepath: str
            Path for saving the orchestrator models.

        Returns
        -------
        int
            Returns 0 on the successful completion of the training.
        """
        for iteration in range(iterations):
            orchestrator_logger.info(f"Iteration {iteration}")

            # # Updating weights for every node
            # for node in self.network.values():
            #     node.update_weights(copy.deepcopy(self.central_model.get_weights()))

            # Sampling nodes
            sampled_nodes = sample_nodes(
                nodes = self.network,
                sample_size = sample_size,
                generator = self.generator
            )

            # Training nodes
            training_results, weights = self.train_epoch(
                sampled_nodes=sampled_nodes,
                iteration=iteration,
                local_epochs=local_epochs,
                mode='weights',
                save_model=True,
                save_path=nodes_models_savepath
            )

            # Preserving metrics of the training
            save_nested_dict_ascsv(
                data=training_results,
                save_path=os.path.join(metrics_savepath, 'training_metrics.csv'))

            # Testing nodes on the local dataset before the model update (only sampled nodes).
            automatic_node_evaluation(
                iteration=iteration,
                nodes=sampled_nodes,
                save_path=os.path.join(metrics_savepath, "before_update_metrics.csv"))

            # Updating weights
            new_weights = aggrgator.aggregate_weights(weights)

            # Updating weights for each node in the network
            for node in self.network.values():
                node.update_weights(new_weights)
            # Updating the weights for the central model
            self.orchestrator_model.update_weights(new_weights)

            # Preserving the orchestrator's model
            self.orchestrator_model.store_model_on_disk(iteration=iteration,
                                                        path=orchestrator_models_savepath)

            # Evaluating the new set of weights on local datasets.
            automatic_node_evaluation(
                iteration=iteration,
                nodes=self.network,
                save_path=os.path.join(metrics_savepath, "after_update_metrics.csv"))
            # Evaluating the new set of weights on orchestrator's dataset.
            evaluate_model(
                iteration=iteration,
                model=self.orchestrator_model,
                save_path=os.path.join(metrics_savepath, "orchestrator_metrics.csv"))

__init__(model_template, node_template, seed=42, **kwargs)

Creating simulation instant requires providing an already created instance of model template and node template. Those instances then will be copied n-times to create n different nodes, each with a different dataset. Additionally, a data for local nodes should be passed in form of a dictionary, maping dataset to each respective client.

Parameters

model_template : FederatedModel Initialized instance of a Federated Model class that will be uploaded to every client. node_template : FederatedNode Initialized instance of a Federated Node class that will be used to simulate nodes. seed : int, Seed for the simulation, default to 42 kwargs : dict, optional Extra arguments to enable selected features of the Orchestrator. passing full_debug to kwargs, allow to enter a full debug mode.

Returns

None

Source code in FedJust\simulation\simulation.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(self, 
             model_template: FederatedModel,
             node_template: FederatedNode,
             seed: int = 42,
             **kwargs
             ) -> None:
    """Creating simulation instant requires providing an already created instance of model template
    and node template. Those instances then will be copied n-times to create n different nodes, each
    with a different dataset. Additionally, a data for local nodes should be passed in form of a dictionary,
    maping dataset to each respective client.

    Parameters
    ----------
    model_template : FederatedModel
        Initialized instance of a Federated Model class that will be uploaded to every client.
    node_template : FederatedNode
        Initialized instance of a Federated Node class that will be used to simulate nodes.
    seed : int,
        Seed for the simulation, default to 42
    **kwargs : dict, optional
        Extra arguments to enable selected features of the Orchestrator.
        passing full_debug to **kwargs, allow to enter a full debug mode.

    Returns
    -------
    None
    """
    self.model_template = model_template
    self.node_template = node_template
    self.network = {}
    self.orchestrator_model = None
    self.generator = np.random.default_rng(seed=seed)

attach_node_model(nodes_data)

Attaches models of the nodes to the simulation instance.

Parameters

orchestrator_data: Any Orchestrator data that should be attached to nodes models. Returns


None

Source code in FedJust\simulation\simulation.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def attach_node_model(self,
                      nodes_data: dict):
    """Attaches models of the nodes to the simulation instance.

    Parameters
    ----------
    orchestrator_data: Any
        Orchestrator data that should be attached to nodes models.
    Returns
    -------
    None
    """
    for node_id, data in nodes_data.items():
        self.network[node_id] = copy.deepcopy(self.node_template)
        self.network[node_id].connect_data_id(node_id = node_id,
                                              model = copy.deepcopy(self.model_template),
                                              data=data)

attach_orchestrator_model(orchestrator_data)

Attaches model of the orchestrator that is saved as an instance attribute.

Parameters

orchestrator_data: Any Orchestrator data that should be attached to the orchestrator model.

Returns

None

Source code in FedJust\simulation\simulation.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def attach_orchestrator_model(self,
                              orchestrator_data: Any):
    """Attaches model of the orchestrator that is saved as an instance attribute.

    Parameters
    ----------
    orchestrator_data: Any 
        Orchestrator data that should be attached to the orchestrator model.

    Returns
    -------
    None
    """
    self.orchestrator_model = copy.deepcopy(self.model_template)
    self.orchestrator_model.attach_dataset_id(local_dataset=[orchestrator_data],
                                              node_name='orchestrator',
                                              only_test=True)

train_epoch(sampled_nodes, iteration, local_epochs, mode='weights', save_model=False, save_path=None)

Performs one training round of a federated learning. Returns training results upon completion.

Parameters

samples_nodes: dict[int: FederatedNode] Dictionary containing sampled Federated Nodes iteration: int Current global iteration of the training process local_epochs: int Number of local epochs for which the local model should be trained. mode: str (default to 'weights') Mode = 'weights': Node will return model's weights. Mode = 'gradients': Node will return model's gradients. save_model: bool (default to False) Boolean flag to save a model. save_path: str (defualt to None) Path object used to save a model

Returns

tuple[dict[int, int, float, float, list, list], dict[int, OrderedDict]]

Source code in FedJust\simulation\simulation.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def train_epoch(self,
                sampled_nodes: dict[int: FederatedNode],
                iteration: int,
                local_epochs: int, 
                mode: str = 'weights',
                save_model: bool = False,
                save_path: str = None) -> tuple[dict[int, int, float, float, list, list], dict[int, OrderedDict]]:
    """Performs one training round of a federated learning. Returns training
    results upon completion.

    Parameters
    ----------
    samples_nodes: dict[int: FederatedNode]
        Dictionary containing sampled Federated Nodes
    iteration: int
        Current global iteration of the training process
    local_epochs: int
        Number of local epochs for which the local model should
        be trained.
    mode: str (default to 'weights')
        Mode = 'weights': Node will return model's weights.
        Mode = 'gradients': Node will return model's gradients.
    save_model: bool (default to False)
        Boolean flag to save a model.
    save_path: str (defualt to None)
        Path object used to save a model

    Returns
    -------
    tuple[dict[int, int, float, float, list, list], dict[int, OrderedDict]]
    """
    training_results = {}
    weights = {}
    with Pool(len(sampled_nodes)) as pool:
        results = [pool.apply_async(train_nodes, (node, iteration, local_epochs, mode, save_model, save_path)) for node in list(sampled_nodes.values())]
        for result in results:
            node_id, model_weights, loss_list, accuracy_list = result.get()
            weights[node_id] = model_weights
            training_results[node_id] = {
                "iteration": iteration,
                "node_id": node_id,
                "loss": loss_list[-1], 
                "accuracy": accuracy_list[-1],
                "full_loss": loss_list,
                "full_accuracy": accuracy_list}
    return (training_results, weights)

training_protocol(iterations, sample_size, local_epochs, aggrgator, metrics_savepath, nodes_models_savepath, orchestrator_models_savepath)

Performs a full federated training according to the initialized settings. The train_protocol of the generic_orchestrator.Orchestrator follows a classic FedAvg algorithm - it averages the local weights and aggregates them taking a weighted average. SOURCE: Communication-Efficient Learning of Deep Networks from Decentralized Data, H.B. McMahan et al.

Parameters

iterations: int Number of (global) iterations // epochs to train the models for. sample_size: int Size of the sample local_epochs: int Number of local epochs for which the local model should be trained. aggregator: Aggregator Instance of the Aggregator object that will be used to aggregate the result each round metrics_savepath: str Path for saving the metrics nodes_models_savepath: str Path for saving the models in the .pt format. orchestrator_models_savepath: str Path for saving the orchestrator models.

Returns

int Returns 0 on the successful completion of the training.

Source code in FedJust\simulation\simulation.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def training_protocol(self,
                      iterations: int,
                      sample_size: int,
                      local_epochs: int,
                      aggrgator: Aggregator,
                      metrics_savepath: str,
                      nodes_models_savepath: str,
                      orchestrator_models_savepath: str) -> None:
    """Performs a full federated training according to the initialized
    settings. The train_protocol of the generic_orchestrator.Orchestrator
    follows a classic FedAvg algorithm - it averages the local weights 
    and aggregates them taking a weighted average.
    SOURCE: Communication-Efficient Learning of
    Deep Networks from Decentralized Data, H.B. McMahan et al.

    Parameters
    ----------
    iterations: int
        Number of (global) iterations // epochs to train the models for.
    sample_size: int
        Size of the sample
    local_epochs: int
        Number of local epochs for which the local model should
        be trained.
    aggregator: Aggregator
        Instance of the Aggregator object that will be used to aggregate the result each round
    metrics_savepath: str
        Path for saving the metrics
    nodes_models_savepath: str
        Path for saving the models in the .pt format.
    orchestrator_models_savepath: str
        Path for saving the orchestrator models.

    Returns
    -------
    int
        Returns 0 on the successful completion of the training.
    """
    for iteration in range(iterations):
        orchestrator_logger.info(f"Iteration {iteration}")

        # # Updating weights for every node
        # for node in self.network.values():
        #     node.update_weights(copy.deepcopy(self.central_model.get_weights()))

        # Sampling nodes
        sampled_nodes = sample_nodes(
            nodes = self.network,
            sample_size = sample_size,
            generator = self.generator
        )

        # Training nodes
        training_results, weights = self.train_epoch(
            sampled_nodes=sampled_nodes,
            iteration=iteration,
            local_epochs=local_epochs,
            mode='weights',
            save_model=True,
            save_path=nodes_models_savepath
        )

        # Preserving metrics of the training
        save_nested_dict_ascsv(
            data=training_results,
            save_path=os.path.join(metrics_savepath, 'training_metrics.csv'))

        # Testing nodes on the local dataset before the model update (only sampled nodes).
        automatic_node_evaluation(
            iteration=iteration,
            nodes=sampled_nodes,
            save_path=os.path.join(metrics_savepath, "before_update_metrics.csv"))

        # Updating weights
        new_weights = aggrgator.aggregate_weights(weights)

        # Updating weights for each node in the network
        for node in self.network.values():
            node.update_weights(new_weights)
        # Updating the weights for the central model
        self.orchestrator_model.update_weights(new_weights)

        # Preserving the orchestrator's model
        self.orchestrator_model.store_model_on_disk(iteration=iteration,
                                                    path=orchestrator_models_savepath)

        # Evaluating the new set of weights on local datasets.
        automatic_node_evaluation(
            iteration=iteration,
            nodes=self.network,
            save_path=os.path.join(metrics_savepath, "after_update_metrics.csv"))
        # Evaluating the new set of weights on orchestrator's dataset.
        evaluate_model(
            iteration=iteration,
            model=self.orchestrator_model,
            save_path=os.path.join(metrics_savepath, "orchestrator_metrics.csv"))

Adaptive_Optimizer_Simulation

Bases: Simulation

Simulation class representing a generic simulation type.

Attributes

model_template : FederatedModel Initialized instance of a Federated Model class that is uploaded to every client. node_template : FederatedNode Initialized instance of a Federated Node class that is used to simulate nodes. data : dict Local data used for the training in a dictionary format, mapping each client to its respective dataset.

Source code in FedJust\simulation\adaptive_optimizer_simulation.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Adaptive_Optimizer_Simulation(Simulation):
    """Simulation class representing a generic simulation type.

        Attributes
        ----------
        model_template : FederatedModel
            Initialized instance of a Federated Model class that is uploaded to every client.
        node_template : FederatedNode
            Initialized instance of a Federated Node class that is used to simulate nodes.
        data : dict
            Local data used for the training in a dictionary format, mapping each client to its respective dataset."""


    def __init__(
        self, 
        model_template: FederatedModel, 
        node_template: FederatedNode, 
        seed: int = 42, 
        **kwargs
        ) -> None:
        super().__init__(model_template, node_template, seed, **kwargs)


    def training_protocol(
        self,
        iterations: int,
        sample_size: int,
        local_epochs: int,
        aggrgator: Aggregator,
        learning_rate: float,
        metrics_savepath: str,
        nodes_models_savepath: str,
        orchestrator_models_savepath: str
        ) -> None:
        """Performs a full federated training according to the initialized
        settings. The train_protocol of the generic_orchestrator.Orchestrator
        follows a classic FedOpt algorithm - it averages the local gradients 
        and aggregates them using a selecred optimizer.
        SOURCE: 

        Parameters
        ----------
        iterations: int
            Number of (global) iterations // epochs to train the models for.
        sample_size: int
            Size of the sample
        local_epochs: int
            Number of local epochs for which the local model should
            be trained.
        aggregator: Aggregator
            Instance of the Aggregator object that will be used to aggregate the result each round
        learning_rate: float
            Learning rate to be used for optimization.
        metrics_savepath: str
            Path for saving the metrics
        nodes_models_savepath: str
            Path for saving the models in the .pt format.
        orchestrator_models_savepath: str
            Path for saving the orchestrator models.

        Returns
        -------
        int
            Returns 0 on the successful completion of the training.
        """
        for iteration in range(iterations):
            orchestrator_logger.info(f"Iteration {iteration}")

            # # Updating weights for every node
            # for node in self.network.values():
            #     node.update_weights(copy.deepcopy(self.central_model.get_weights()))

            # Sampling nodes
            sampled_nodes = sample_nodes(
                nodes = self.network,
                sample_size = sample_size,
                generator = self.generator
            )

            # Training nodes
            training_results, gradients = self.train_epoch(
                sampled_nodes=sampled_nodes,
                iteration=iteration,
                local_epochs=local_epochs,
                mode='gradients',
                save_model=True,
                save_path=nodes_models_savepath
            )

            # Preserving metrics of the training
            save_nested_dict_ascsv(
                data=training_results,
                save_path=os.path.join(metrics_savepath, 'training_metrics.csv'))

            # Testing nodes on the local dataset before the model update (only sampled nodes).
            automatic_node_evaluation(
                iteration=iteration,
                nodes=sampled_nodes,
                save_path=os.path.join(metrics_savepath, "before_update_metrics.csv"))

            avg_gradients = average_of_weigts(gradients)
            # Updating weights
            new_weights = aggrgator.optimize_weights(
                weights=self.orchestrator_model.get_weights(),
                gradients = avg_gradients,
                learning_rate = learning_rate,
                )

            # Updating weights for each node in the network
            for node in self.network.values():
                node.update_weights(new_weights)
            # Updating the weights for the central model
            self.orchestrator_model.update_weights(new_weights)

            # Preserving the orchestrator's model
            self.orchestrator_model.store_model_on_disk(iteration=iteration,
                                                        path=orchestrator_models_savepath)

            # Evaluating the new set of weights on local datasets.
            automatic_node_evaluation(
                iteration=iteration,
                nodes=self.network,
                save_path=os.path.join(metrics_savepath, "after_update_metrics.csv"))
            # Evaluating the new set of weights on orchestrator's dataset.
            evaluate_model(
                iteration=iteration,
                model=self.orchestrator_model,
                save_path=os.path.join(metrics_savepath, "orchestrator_metrics.csv"))

training_protocol(iterations, sample_size, local_epochs, aggrgator, learning_rate, metrics_savepath, nodes_models_savepath, orchestrator_models_savepath)

Performs a full federated training according to the initialized settings. The train_protocol of the generic_orchestrator.Orchestrator follows a classic FedOpt algorithm - it averages the local gradients and aggregates them using a selecred optimizer. SOURCE:

Parameters

iterations: int Number of (global) iterations // epochs to train the models for. sample_size: int Size of the sample local_epochs: int Number of local epochs for which the local model should be trained. aggregator: Aggregator Instance of the Aggregator object that will be used to aggregate the result each round learning_rate: float Learning rate to be used for optimization. metrics_savepath: str Path for saving the metrics nodes_models_savepath: str Path for saving the models in the .pt format. orchestrator_models_savepath: str Path for saving the orchestrator models.

Returns

int Returns 0 on the successful completion of the training.

Source code in FedJust\simulation\adaptive_optimizer_simulation.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def training_protocol(
    self,
    iterations: int,
    sample_size: int,
    local_epochs: int,
    aggrgator: Aggregator,
    learning_rate: float,
    metrics_savepath: str,
    nodes_models_savepath: str,
    orchestrator_models_savepath: str
    ) -> None:
    """Performs a full federated training according to the initialized
    settings. The train_protocol of the generic_orchestrator.Orchestrator
    follows a classic FedOpt algorithm - it averages the local gradients 
    and aggregates them using a selecred optimizer.
    SOURCE: 

    Parameters
    ----------
    iterations: int
        Number of (global) iterations // epochs to train the models for.
    sample_size: int
        Size of the sample
    local_epochs: int
        Number of local epochs for which the local model should
        be trained.
    aggregator: Aggregator
        Instance of the Aggregator object that will be used to aggregate the result each round
    learning_rate: float
        Learning rate to be used for optimization.
    metrics_savepath: str
        Path for saving the metrics
    nodes_models_savepath: str
        Path for saving the models in the .pt format.
    orchestrator_models_savepath: str
        Path for saving the orchestrator models.

    Returns
    -------
    int
        Returns 0 on the successful completion of the training.
    """
    for iteration in range(iterations):
        orchestrator_logger.info(f"Iteration {iteration}")

        # # Updating weights for every node
        # for node in self.network.values():
        #     node.update_weights(copy.deepcopy(self.central_model.get_weights()))

        # Sampling nodes
        sampled_nodes = sample_nodes(
            nodes = self.network,
            sample_size = sample_size,
            generator = self.generator
        )

        # Training nodes
        training_results, gradients = self.train_epoch(
            sampled_nodes=sampled_nodes,
            iteration=iteration,
            local_epochs=local_epochs,
            mode='gradients',
            save_model=True,
            save_path=nodes_models_savepath
        )

        # Preserving metrics of the training
        save_nested_dict_ascsv(
            data=training_results,
            save_path=os.path.join(metrics_savepath, 'training_metrics.csv'))

        # Testing nodes on the local dataset before the model update (only sampled nodes).
        automatic_node_evaluation(
            iteration=iteration,
            nodes=sampled_nodes,
            save_path=os.path.join(metrics_savepath, "before_update_metrics.csv"))

        avg_gradients = average_of_weigts(gradients)
        # Updating weights
        new_weights = aggrgator.optimize_weights(
            weights=self.orchestrator_model.get_weights(),
            gradients = avg_gradients,
            learning_rate = learning_rate,
            )

        # Updating weights for each node in the network
        for node in self.network.values():
            node.update_weights(new_weights)
        # Updating the weights for the central model
        self.orchestrator_model.update_weights(new_weights)

        # Preserving the orchestrator's model
        self.orchestrator_model.store_model_on_disk(iteration=iteration,
                                                    path=orchestrator_models_savepath)

        # Evaluating the new set of weights on local datasets.
        automatic_node_evaluation(
            iteration=iteration,
            nodes=self.network,
            save_path=os.path.join(metrics_savepath, "after_update_metrics.csv"))
        # Evaluating the new set of weights on orchestrator's dataset.
        evaluate_model(
            iteration=iteration,
            model=self.orchestrator_model,
            save_path=os.path.join(metrics_savepath, "orchestrator_metrics.csv"))

Opearations

A boilerplate code for some additional operations performed in a Federated scenario.

automatic_node_evaluation(iteration, nodes, save_path, logger=None, log_to_screen=False)

Used to automatically evaluate a set of provided node and preserve metrics in the indicated directory.

Parameters

iteration: int Current iteration of the training. nodes: dict[int: FederatedNode] Dictionary containing nodes to be evaluated. saving_path: str (default to None) The saving path of the csv file, must end with the .csv extension. logger: Logger (default to None) Logger object that we want to use to handle the logs. log_to_screen: bool (default to False) Boolean flag whether we want to log the results to the screen.

Returns

None
Source code in FedJust\operations\evaluations.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def automatic_node_evaluation(
    iteration: int, 
    nodes: dict[int: FederatedNode],
    save_path: str,
    logger = None,
    log_to_screen: bool = False
    ) -> None:
    """Used to automatically evaluate a set of provided node and preserve metrics in the indicated 
    directory.

    Parameters
    ----------
    iteration: int 
        Current iteration of the training.
    nodes: dict[int: FederatedNode]
        Dictionary containing nodes to be evaluated.
    saving_path: str (default to None)
        The saving path of the csv file, must end with the .csv extension.
    logger: Logger (default to None)
        Logger object that we want to use to handle the logs.
    log_to_screen: bool (default to False)
        Boolean flag whether we want to log the results to the screen.

    Returns
    -------
        None"""
    for node in nodes.values():
        evaluate_model(
            iteration=iteration,
            model=node.model,
            save_path=save_path,
            logger=logger,
            log_to_screen=log_to_screen
        )

evaluate_model(iteration, model, save_path, logger=None, log_to_screen=False)

Used to save the model metrics.

Parameters

iteration: int Current iteration of the training. model: FederatedModel FederatedModel to be evaluated saving_path: str (default to None) The saving path of the csv file, must end with the .csv extension. logger: Logger (default to None) Logger object that we want to use to handle the logs. log_to_screen: bool (default to False) Boolean flag whether we want to log the results to the screen.

Returns

None
Source code in FedJust\operations\evaluations.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def evaluate_model(
    iteration: int, 
    model: FederatedModel,
    save_path: str,
    logger = None,
    log_to_screen: bool = False
    ) -> None:
    """Used to save the model metrics.

    Parameters
    ----------
    iteration: int 
        Current iteration of the training.
    model: FederatedModel
        FederatedModel to be evaluated
    saving_path: str (default to None)
        The saving path of the csv file, must end with the .csv extension.
    logger: Logger (default to None)
        Logger object that we want to use to handle the logs.
    log_to_screen: bool (default to False)
        Boolean flag whether we want to log the results to the screen.

    Returns
    -------
        None"""
    try:
        evaluation_results = model.evaluate_model()
        evaluation_results['node_id'] = model.node_name
        evaluation_results['epoch'] = iteration
        if log_to_screen == True:
            pass
            #logger.info(f"Evaluating model after iteration {iteration} on node {model.node_name}. Results: {metrics}")
    except Exception as e:
        logger.warning(f"Unable to compute metrics. {e}")
    path = os.path.join(save_path)
    with open(path, 'a+', newline='') as saved_file:
            writer = csv.DictWriter(saved_file, list(evaluation_results.keys()))
            # If the file does not exist, it will create it and write the header.
            if os.path.getsize(path) == 0:
                writer.writeheader()
            writer.writerow(evaluation_results)

sample_nodes(nodes, sample_size, generator)

Sample the nodes given the provided sample size. If sample_size is bigger or equal to the number of av. nodes, the sampler will return the original list.

Parameters

nodes: dict[int: FederatedNode]) 
    Original dictionary of nodes to be sampled from.
sample_size: int,
    Size of the sample
generator: np.random.Generator
    A numpy generator initialized on the server side.

Returns

dict[id: FederatedNode]
Source code in FedJust\operations\orchestrations.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def sample_nodes(nodes: dict[int: FederatedNode], 
                 sample_size: int,
                 generator: np.random.Generator) -> dict[id: FederatedNode]:
    """Sample the nodes given the provided sample size. If sample_size is bigger
    or equal to the number of av. nodes, the sampler will return the original list.

    Parameters
    ----------
        nodes: dict[int: FederatedNode]) 
            Original dictionary of nodes to be sampled from.
        sample_size: int,
            Size of the sample
        generator: np.random.Generator
            A numpy generator initialized on the server side.

    Returns
    -------
        dict[id: FederatedNode]
    """
    sample = generator.choice(list(nodes.values()), size=sample_size, replace=False) # Conversion to array
    sample = {node.node_id: node for node in sample} # Back-conversion to dicitonary
    return sample

sample_weighted_nodes(nodes, sample_size, generator, sampling_array)

Sample the nodes given the provided sample size. It requires passing a sampling array containing list of weights associated with each node.

Parameters

nodes: dict[int: FederatedNode]) 
    Original dictionary of nodes to be sampled from.
sample_size: int,
    Size of the sample
generator: np.random.Generator
    A numpy generator initialized on the server side.
sampling_array: np.array
    Sampling array containing weights for the sampling

Returns

dict[id: FederatedNode]
Source code in FedJust\operations\orchestrations.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def sample_weighted_nodes(nodes: dict[int: FederatedNode], 
                          sample_size: int,
                          generator: np.random.Generator,
                          sampling_array: np.array) -> dict[id: FederatedNode]:
    """Sample the nodes given the provided sample size. It requires passing a sampling array
    containing list of weights associated with each node.

    Parameters
    ----------
        nodes: dict[int: FederatedNode]) 
            Original dictionary of nodes to be sampled from.
        sample_size: int,
            Size of the sample
        generator: np.random.Generator
            A numpy generator initialized on the server side.
        sampling_array: np.array
            Sampling array containing weights for the sampling

    Returns
    -------
        dict[id: FederatedNode]
    """
    sample = generator.choice(nodes, size=sample_size, p = sampling_array, replace=False)
    sample = {node.node_id: node for node in sample} # Back-conversion to dicitonary
    return sample

train_nodes(node, iteration, local_epochs, mode='weights', save_model=False, save_path=None)

Used to command the node to start the local training. Invokes .train_local_model method and returns the results.

Parameters

node: FederatedNode Node that we want to train. iteration: int Current (global) iteration. local_epochs: int Number of local epochs for which to train a node mode: str (default to False) Mode of the training. Mode = 'weights': Node will return model's weights. Mode = 'gradients': Node will return model's gradients. save_model: bool (default to False) Boolean flag to enable model saving. save_path: str (default to None) Save path for preserving a model (applicable only when save_model = True) Returns


tpule[int, OrderedDict, List[float], List[float]]
Source code in FedJust\operations\orchestrations.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def train_nodes(
    node: FederatedNode,
    iteration: int,
    local_epochs: int,
    mode: str = 'weights',
    save_model: bool = False,
    save_path: str = None) -> tuple[int, OrderedDict, List[float], List[float]]:
    """Used to command the node to start the local training.
    Invokes .train_local_model method and returns the results.

    Parameters
    ----------
    node: FederatedNode 
        Node that we want to train.
    iteration: int
        Current (global) iteration.
    local_epochs: int
        Number of local epochs for which to train a node
    mode: str (default to False)
        Mode of the training. 
        Mode = 'weights': Node will return model's weights.
        Mode = 'gradients': Node will return model's gradients.
    save_model: bool (default to False)
        Boolean flag to enable model saving.
    save_path: str (default to None)
        Save path for preserving a model (applicable only when save_model = True)
    Returns
    -------
        tpule[int, OrderedDict, List[float], List[float]]
    """
    node_id, weights, loss_list, accuracy_list = node.train_local_model(
        iteration = iteration,
        local_epochs = local_epochs,
        mode = mode,
        save_model = save_model,
        save_path=save_path)
    return (node_id, weights, loss_list, accuracy_list)

Aggregators

A boilerplate code for functions serving to merge the local weights.

Aggregator

Basic class for all Federated Aggregators

Source code in FedJust\aggregators\aggregator.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Aggregator():
    """Basic class for all Federated Aggregators"""
    def __init__(self) -> None:
        pass


    def aggregate_weights(
        self,
        weights: dict[int: OrderedDict]) -> OrderedDict:
        """Basic aggregate function (equal to FedAvg) that returns the aggregate version of the
        weights. Perform deepcopy on the passed parameters.

        Parameters
        ----------
        weights: dict[int: OrderedDict]

        Returns
        -------
        OrderedDict"""

        results = OrderedDict()
        for params in weights.values():
            for key in params:
                if results.get(key) is None:
                    results[key] = copy.deepcopy(params[key]) #Here we could add copy and deepcopy
                else:
                    results[key] += copy.deepcopy(params[key])

        for key in results:
            results[key] = torch.div(results[key], len(weights))
        return results

aggregate_weights(weights)

Basic aggregate function (equal to FedAvg) that returns the aggregate version of the weights. Perform deepcopy on the passed parameters.

Parameters

weights: dict[int: OrderedDict]

Returns

OrderedDict

Source code in FedJust\aggregators\aggregator.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def aggregate_weights(
    self,
    weights: dict[int: OrderedDict]) -> OrderedDict:
    """Basic aggregate function (equal to FedAvg) that returns the aggregate version of the
    weights. Perform deepcopy on the passed parameters.

    Parameters
    ----------
    weights: dict[int: OrderedDict]

    Returns
    -------
    OrderedDict"""

    results = OrderedDict()
    for params in weights.values():
        for key in params:
            if results.get(key) is None:
                results[key] = copy.deepcopy(params[key]) #Here we could add copy and deepcopy
            else:
                results[key] += copy.deepcopy(params[key])

    for key in results:
        results[key] = torch.div(results[key], len(weights))
    return results

Fedopt_Optimizer

Bases: Aggregator

Fedopt Optimizer that performs a generalized version of Federated Averaging. Suitable for performing Federated Optimization based on gradients, with verying learning rates.

Attributes

None

Source code in FedJust\aggregators\fedopt_aggregator.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Fedopt_Optimizer(Aggregator):
    """Fedopt Optimizer that performs a generalized 
    version of Federated Averaging. Suitable for performing
    Federated Optimization based on gradients, with 
    verying learning rates.

    Attributes
    ----------
    None
    """

    def __init__(self) -> None:
        super().__init__()


    def optimize_weights(
        self,
        weights: dict[int:OrderedDict],
        gradients: dict[int: OrderedDict],
        learning_rate : float,
        ) -> OrderedDict:
        """FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the 
        updated version of the weights.

        Parameters
        ----------
        weights: dict[int: OrderedDict]
            Weights of the previous (central) model.
        gradients: dict[int: OrderedDict]
            Gradients (defined as trainedmodel - dispatched model)
        learning_rate: float
            Learning rate used to 

        Returns
        -------
        OrderedDict"""        
        updated_weights = OrderedDict((key, zeros(weights[key].size())) for key in weights.keys())
        for key in weights:
            updated_weights[key] = weights[key] + (learning_rate * (gradients[key]))
        return updated_weights

optimize_weights(weights, gradients, learning_rate)

FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the updated version of the weights.

Parameters

weights: dict[int: OrderedDict] Weights of the previous (central) model. gradients: dict[int: OrderedDict] Gradients (defined as trainedmodel - dispatched model) learning_rate: float Learning rate used to

Returns

OrderedDict

Source code in FedJust\aggregators\fedopt_aggregator.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def optimize_weights(
    self,
    weights: dict[int:OrderedDict],
    gradients: dict[int: OrderedDict],
    learning_rate : float,
    ) -> OrderedDict:
    """FedOpt Aggregation Function (equal to FedAvg when lr=1.0) that returns the 
    updated version of the weights.

    Parameters
    ----------
    weights: dict[int: OrderedDict]
        Weights of the previous (central) model.
    gradients: dict[int: OrderedDict]
        Gradients (defined as trainedmodel - dispatched model)
    learning_rate: float
        Learning rate used to 

    Returns
    -------
    OrderedDict"""        
    updated_weights = OrderedDict((key, zeros(weights[key].size())) for key in weights.keys())
    for key in weights:
        updated_weights[key] = weights[key] + (learning_rate * (gradients[key]))
    return updated_weights