Source code for Modulators.ContSpecModulator

# This file is part of NFDMLab.
#
# NFDMLab is free software; you can redistribute it and/or
# modify it under the terms of the version 2 of the GNU General
# Public License as published by the Free Software Foundation.
#
# NFDMLab is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with NFDMLab; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# 02111-1307 USA
#
# Contributors:
# Sander Wahls (TU Delft) 2018-2019
# Shrinivas Chimmalgi (TU Delft) 2018

import numpy as np
import matplotlib.pyplot as plt
from FNFTpy.fnft_nsev_inverse_wrapper import nsev_inverse_wrapper, nsev_inverse_xi_wrapper
from FNFTpy.options_handling import fnft_nsev_inverse_default_options_wrapper
from FNFTpy.fnft_nsev_wrapper import nsev_wrapper
from FNFTpy.options_handling import fnft_nsev_default_options_wrapper
from Modulators import BaseModulator
from Helpers import NFSpectrum
from Helpers import next_pow2

[docs]class ContSpecModulator(BaseModulator): """This modulator embeds symbols in the continuous spectrum using a classic multi-carrier approach."""
[docs] def __init__(self, carrier_waveform_fun, carrier_spacing, n_symbols_per_block, normalized_distance, T, required_normalized_dt, required_dxi, contspec_type, percent_precompensation = 0.0, power_control_factor = 1.0, use_power_normalization_map = False): """Constructor. Parameters ---------- carrier_waveform_fun : function A function that maps numpy.array(float) to numpy.array(complex). The inputs of this function are vectors with nonlinear frequencies xi. The outputs are vectors with the values of the carrier waveforms at the corresponding xi. See Helpers.CarrierWaveforms for examples. carrier_spacing : float Spacing between consequtive carriers in the nonlinear frequency domain (xi). n_symbols_per_block : int Number of carriers. normalized_distance : float Length of the fiber link in normalized units. T : numpy.array(float) Vector of length two with T[0]<T[1]. Specifies a time interval, in normalized units, used during the generation of the fiber inputs. required_normalized_dt : float The modulator will choose the number of time domain samples such that the time step, in normalized units, does not exceed this value. required_dxi : float The modulator will choose the number of samples in the nonlinear frequency domain such that the nonlinear frequency step is not larger than this value. contspec_type : str Type of continuous spectrum used for encoding data. Choose "b/a" for reflection coefficients and "b" for b-coefficients. percent_precompensation : float Value between 0.0 and 100.0. A value of 50.0 means that transmitter and receiver both compensate 50% of the phase change in the continuous spectrum induced by the channel. A value of 0.0 means that the phase changes is completely reverted at the receiver. power_control_factor : float A positive real number. The carrier waveform is effectively scaled by this number. use_power_normalization_map : bool If True, the modulator applies a power normalized map as described in Yangzhang et. al (Proc. OFC'17) to the nonlinear Fourier spectrum it has generated before passing it on to the inverse NFT. Currently only implemented for reflection coefficients (contspec_type="b/a"). """ # Save some given parameters for later use. self._carrier_waveform_fun = carrier_waveform_fun self._carrier_spacing = carrier_spacing self._n_symbols_per_block = n_symbols_per_block self._normalized_distance = normalized_distance self._T = T self._contspec_type = contspec_type self._percent_precompensation = percent_precompensation self._power_control_factor = power_control_factor self._use_power_normalization_map = use_power_normalization_map # Choose number of time domain samples such that the user requirements # on the time step are fulfilled. self._n_samples = next_pow2((T[1] - T[0])/required_normalized_dt + 1) self._normalized_dt = (self._T[1] - self._T[0])/(self.n_samples - 1) assert(self.normalized_dt <= required_normalized_dt) # The time window T provided by the user is typically too short to fit # the complete signal that corresponds to the generated nonlinear # Fourier spectra. Normally, this causes numerical errors. # To avoid these errors, we internally extend the time domain and # increase the number of time domain samples such that the time step # does not change. The additional samples generated outside the interval # T[0] <= t <= T[1] will be discarded in the end. self._O = 4 self._Tnew = np.array([T[0]-self.normalized_dt*(self._O-1)*self.n_samples/2, T[1]+self.normalized_dt*(self._O-1)*self.n_samples/2]) self._n_samples_new = self._O*self.n_samples # We now determine the number nonlinear Fourier domain samples based on # the requirements on the nonlinear frequency step of the user self._M = self._n_samples_new self._dxi = np.Inf while self._dxi > required_dxi: self._M *= 2 rv, self._XI = nsev_inverse_xi_wrapper(self._n_samples_new, self._Tnew[0], self._Tnew[1], self._M) self._dxi = (self._XI[1] - self._XI[0])/(self._M - 1) # Generate some time and nonlinear frequency grids for later use. self._t = self._T[0] + np.arange(0, self.n_samples)*self.normalized_dt self._xi = self._XI[0] + np.arange(0, self._M)*self._dxi self._carrier_centers = (np.arange(0, n_symbols_per_block)-(n_symbols_per_block-1)/2.0)*carrier_spacing self._carrier_center_idx = np.zeros(n_symbols_per_block, dtype=int) for n in range(0, n_symbols_per_block): self._carrier_center_idx[n] = np.argmin(abs(self._xi - self._carrier_centers[n])) # Set the right type of continuos spectrum for the NFT routines. self._opts_inv = fnft_nsev_inverse_default_options_wrapper() self._opts_fwd = fnft_nsev_default_options_wrapper() #self._opts_inv.discretization = self._opts_fwd.discretization if self._contspec_type == "b": self._opts_fwd.contspec_type = 1 # b_of_xi self._opts_inv.contspec_type = 1 # ab elif self._contspec_type != "b/a": raise Exception("Unknown contspec_type '"+str(self._contspec_type)+"'") self._opts_fwd.discspec_type = 3 # skip computation of discrete spectrum # Determine interval [xi_min, xi_max] outside of which the carrier # waveform is effectively zero. This is essential to speed the # modulation process when the number of carriers is getting larger. vals = self._carrier_waveform_fun(self._xi) tol = 10 * np.finfo(vals[0]).eps * np.max(np.abs(vals)) idx = np.argwhere(np.abs(vals)>tol) i1 = idx[0][0] i2 = idx[-1][0] self._xi_min = self._xi[i1] self._xi_max = self._xi[i2]
def _new_nfspec(self): """Generates an empty NFSpectrum object with the right contspec_type and plotting hints.""" nfspec = NFSpectrum(self._contspec_type, "none") nfspec.xi = self._xi nfspec.xi_plot_range = np.array([-1.5, 1.5])*self.n_symbols_per_block/2*self._carrier_spacing return nfspec
[docs] def modulate(self, symbols): # Docstring is inherited from base class. # Generate a nonlinear Fourier spectrum in which the given symbols are # embedded. nc = np.size(symbols) assert nc == self.n_symbols_per_block nfspec = self._new_nfspec() nfspec.cont = np.zeros(self._M, dtype=complex) for n in range(0, nc): shifted_xi = self._xi - self._carrier_centers[n] idx = np.logical_and(shifted_xi>=self._xi_min, shifted_xi<=self._xi_max) nfspec.cont[idx] = nfspec.cont[idx] + symbols[n]*self._carrier_waveform_fun(shifted_xi[idx]) for n in range(0, nc): i = self._carrier_center_idx[n] # Apply the power control factor and, if requested, the power # normalization map nfspec.cont *= self._power_control_factor if self._use_power_normalization_map: if self._contspec_type == "b": raise Exception("Power normalization map currently only implemented for continuous spectra of reflection coefficient type (b/a)") nfspec.cont = np.sqrt(np.exp(np.abs(nfspec.cont)**2) - 1.0) * np.exp(1j*np.angle(nfspec.cont)) # Pre-equalize for the phase change induced by the channel nfspec.cont *= np.exp(-2.0j*self._xi**2*self._normalized_distance*(self._percent_precompensation/100)) # Generate corresponding time domain signal on the extended grid # specified construction rdict = nsev_inverse_wrapper(self._M, nfspec.cont, self._XI[0], self._XI[1], 0, [], [], self._n_samples_new, self._Tnew[0], self._Tnew[1], +1, self._opts_inv) if rdict['return_value'] != 0: raise Exception("FNFT failed") # Remove all samples that are not in the time range specified by the # user. q = rdict['q'][ int((self._O-1)*self.n_samples/2) :int((self._O+1)*self.n_samples/2)] return q, nfspec
[docs] def demodulate(self, q): # Docstring is inherited from base class. # Oversample the given time domain signal and compute the continous # spectrum. assert np.size(q) == self.n_samples from scipy import signal DO = self._O * self.n_samples qO, tO = signal.resample(q, DO, self._t) rdict = nsev_wrapper(DO, qO, tO[0], tO[-1], self._XI[0], self._XI[1], self._M, 0, +1, self._opts_fwd) if rdict['return_value'] != 0: raise Exception("FNFT failed") nfspec = self._new_nfspec() if self._contspec_type == "b/a": nfspec.cont = rdict['cont_ref'] elif self._contspec_type == "b": nfspec.cont = rdict['cont_b'] else: raise Exception("Unknown contspec_type '"+str(self._contspec_type)+"'") # Remove any remaining phase changes due to the channel that have not # already been removed at the transmitter nfspec.cont *= np.exp(-2.0j*self._xi**2*self._normalized_distance *(1 - self._percent_precompensation/100)) # Recover the symbols symbols = np.zeros(self.n_symbols_per_block, dtype=complex) scl = np.abs(self._carrier_waveform_fun(0.0)) * self._power_control_factor for n in range(0, self.n_symbols_per_block): symbols[n] = nfspec.cont[self._carrier_center_idx[n]] if self._use_power_normalization_map: symbols[n] = np.sqrt(np.log(np.abs(symbols[n])**2 + 1.0)) * np.exp(1j*np.angle(symbols[n])) symbols[n] /= scl return symbols, nfspec