{% blocktrans %}A dataset can support one or two time attributes. If a single
- attribute is used, the dataset is considered to contain data that is valid at single points in time. If two
- attributes are used, the second attribute represents the end of a valid period hence the dataset is considered
- to contain data that is valid at certain periods in time.{% endblocktrans %}
-
{% trans "Selecting an Attribute" %}
-
{% trans "A time attribute can be" %}:
-
-
{% trans "An existing date" %}
-
{% trans "Text that can be converted to a timestamp" %}
-
{% trans "A number representing a year" %}
-
-
-
-
-
-
-
diff --git a/geonode/layers/tests.py b/geonode/layers/tests.py
index 54c04e5f324..7a7e87c80e4 100644
--- a/geonode/layers/tests.py
+++ b/geonode/layers/tests.py
@@ -28,10 +28,8 @@
from django.urls import reverse
from django.test import TestCase
-from django.forms import ValidationError
from django.test.client import RequestFactory
from django.core.management import call_command
-from django.contrib.auth.models import Group
from django.contrib.gis.geos import Polygon
from django.db.models import Count
from django.contrib.auth import get_user_model
@@ -53,15 +51,13 @@
from geonode.layers.views import _resolve_dataset
from geonode import GeoNodeException, geoserver
from geonode.people.utils import get_valid_user
-from geonode.people import Roles
from guardian.shortcuts import get_anonymous_user
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.resource.manager import resource_manager
from geonode.tests.utils import NotificationsTestsHelper
from geonode.layers.models import Dataset, Style, Attribute
-from geonode.layers.forms import DatasetForm, DatasetTimeSerieForm, JSONField
from geonode.layers.populate_datasets_data import create_dataset_data
-from geonode.base.models import TopicCategory, License, Region, Link
+from geonode.base.models import TopicCategory, Link
from geonode.utils import check_ogc_backend, set_resource_default_links
from geonode.layers.metadata import convert_keyword, set_metadata, parse_metadata
from geonode.groups.models import GroupProfile
@@ -130,42 +126,6 @@ def test_default_sourcetype(self):
obj = Dataset.objects.first()
self.assertEqual(obj.sourcetype, enumerations.SOURCE_TYPE_LOCAL)
- # Data Tests
-
- def test_describe_data_2(self):
- """/data/geonode:CA/metadata -> Test accessing the description of a layer"""
- self.assertEqual(10, get_user_model().objects.all().count())
- response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
- # Since we are not authenticated, we should not be able to access it
- self.assertEqual(response.status_code, 302)
- # but if we log in ...
- self.client.login(username="admin", password="admin")
- # ... all should be good
- response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
- self.assertEqual(response.status_code, 200)
-
- def test_describe_data_3(self):
- """/data/geonode:CA/metadata_detail -> Test accessing the description of a layer"""
- self.client.login(username="admin", password="admin")
- # ... all should be good
- response = self.client.get(reverse("dataset_metadata_detail", args=("geonode:CA",)))
- self.assertEqual(response.status_code, 200)
- self.assertContains(response, "Approved", count=1, status_code=200, msg_prefix="", html=False)
- self.assertContains(response, "Published", count=1, status_code=200, msg_prefix="", html=False)
- self.assertContains(response, "Featured", count=1, status_code=200, msg_prefix="", html=False)
- self.assertContains(response, "
Group
", count=0, status_code=200, msg_prefix="", html=False)
-
- # ... now assigning a Group to the Dataset
- lyr = Dataset.objects.get(alternate="geonode:CA")
- group = Group.objects.first()
- lyr.group = group
- lyr.save()
- response = self.client.get(reverse("dataset_metadata_detail", args=("geonode:CA",)))
- self.assertEqual(response.status_code, 200)
- self.assertContains(response, "
Group
", count=1, status_code=200, msg_prefix="", html=False)
- lyr.group = None
- lyr.save()
-
# Dataset Tests
def test_dataset_name_clash(self):
@@ -194,18 +154,6 @@ def test_dataset_name_clash(self):
self.assertIsNotNone(_ll)
self.assertEqual(_ll.name, _ll_1.name)
- def test_describe_data(self):
- """/data/geonode:CA/metadata -> Test accessing the description of a layer"""
- self.assertEqual(10, get_user_model().objects.all().count())
- response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
- # Since we are not authenticated, we should not be able to access it
- self.assertEqual(response.status_code, 302)
- # but if we log in ...
- self.client.login(username="admin", password="admin")
- # ... all should be good
- response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
- self.assertEqual(response.status_code, 200)
-
def test_dataset_attributes(self):
lyr = Dataset.objects.all().first()
# There should be a total of 3 attributes
@@ -584,27 +532,6 @@ def test_get_valid_dataset_name(self):
# And this should be used instead to avoid that:
# SimpleUploadedFile('foo', ' '.encode("UTF-8"))
- def testJSONField(self):
- field = JSONField()
- # a valid JSON document should pass
- field.clean('{ "users": [] }')
-
- # text which is not JSON should fail
- self.assertRaises(ValidationError, lambda: field.clean(""))
-
- def test_sld_upload(self):
- """Test layer remove functionality"""
- layer = Dataset.objects.all().first()
- url = reverse("dataset_sld_upload", args=(layer.alternate,))
- # Now test with a valid user
- self.client.login(username="admin", password="admin")
-
- # test a method other than POST and GET
- response = self.client.put(url)
- content = response.content.decode("utf-8")
- self.assertEqual(response.status_code, 200)
- self.assertFalse("#modal_perms" in content)
-
def test_category_counts(self):
topics = TopicCategory.objects.all()
topics = topics.annotate(**{"dataset_count": Count("resourcebase__dataset__category")})
@@ -646,80 +573,6 @@ def test_assign_change_dataset_data_perm(self):
self.assertNotIn(user, perms["users"])
self.assertNotIn(user.username, perms["users"])
- def test_batch_edit(self):
- """
- Test batch editing of metadata fields.
- """
- Model = Dataset
- view = "dataset_batch_metadata"
- resources = Model.objects.all()[:3]
- ids = ",".join(str(element.pk) for element in resources)
- # test non-admin access
- self.client.login(username="bobby", password="bob")
- response = self.client.get(reverse(view))
- self.assertTrue(response.status_code in (401, 403))
- # test group change
- group = Group.objects.first()
- self.client.login(username="admin", password="admin")
- response = self.client.post(
- reverse(view),
- data={"group": group.pk, "ids": ids, "regions": 1},
- )
- self.assertEqual(response.status_code, 302)
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- self.assertEqual(resource.group, group)
- # test owner change
- owner = get_user_model().objects.first()
- response = self.client.post(
- reverse(view),
- data={"owner": owner.pk, "ids": ids, "regions": 1},
- )
- self.assertEqual(response.status_code, 302)
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- self.assertEqual(resource.owner, owner)
- # test license change
- license = License.objects.first()
- response = self.client.post(
- reverse(view),
- data={"license": license.pk, "ids": ids, "regions": 1},
- )
- self.assertEqual(response.status_code, 302)
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- self.assertEqual(resource.license, license)
- # test regions change
- region = Region.objects.first()
- response = self.client.post(
- reverse(view),
- data={"region": region.pk, "ids": ids, "regions": 1},
- )
- self.assertEqual(response.status_code, 302)
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- if resource.regions.all():
- self.assertTrue(region in resource.regions.all())
- # test language change
- language = "eng"
- response = self.client.post(
- reverse(view),
- data={"language": language, "ids": ids, "regions": 1},
- )
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- self.assertEqual(resource.language, language)
- # test keywords change
- keywords = "some,thing,new"
- response = self.client.post(
- reverse(view),
- data={"keywords": keywords, "ids": ids, "regions": 1},
- )
- resources = Model.objects.filter(id__in=[r.pk for r in resources])
- for resource in resources:
- for word in resource.keywords.all():
- self.assertTrue(word.name in keywords.split(","))
-
def test_surrogate_escape_string(self):
surrogate_escape_raw = "Zo\udcc3\udcab"
surrogate_escape_expected = "Zoë"
@@ -1008,91 +861,6 @@ def setUp(self):
map=self.map,
)
- def test_that_keyword_multiselect_is_disabled_for_non_admin_users(self):
- """
- Test that keyword multiselect widget is disabled when the user is not an admin
- """
- self.test_dataset = resource_manager.create(
- None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
- )
-
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
- self.client.login(username=self.not_admin.username, password="very-secret")
- with self.settings(FREETEXT_KEYWORDS_READONLY=True):
- response = self.client.get(url)
- self.assertTrue(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
-
- def test_that_keyword_multiselect_is_not_disabled_for_admin_users(self):
- """
- Test that only admin users can create/edit keywords when FREETEXT_KEYWORDS_READONLY=True
- """
- admin = self.not_admin
- admin.is_superuser = True
- admin.save()
-
- self.test_dataset = resource_manager.create(
- None, resource_type=Dataset, defaults=dict(owner=admin, title="test", is_approved=True)
- )
-
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
-
- self.client.login(username=admin.username, password="very-secret")
- with self.settings(FREETEXT_KEYWORDS_READONLY=True):
- response = self.client.get(url)
- self.assertFalse(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
-
- def test_that_featured_enabling_and_disabling_for_users(self):
- self.test_dataset = resource_manager.create(
- None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
- )
-
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
- # Non Admins
- self.client.login(username=self.not_admin.username, password="very-secret")
- response = self.client.get(url)
- self.assertFalse(self.not_admin.is_superuser)
- self.assertEqual(response.status_code, 200)
- self.assertTrue(response.context["form"]["featured"].field.disabled)
- # Admin
- self.client.login(username="admin", password="admin")
- response = self.client.get(url)
- self.assertEqual(response.status_code, 200)
- self.assertFalse(response.context["form"]["featured"].field.disabled)
-
- def test_that_non_admin_user_cannot_create_edit_keyword(self):
- """
- Test that non admin users cannot edit/create keywords when FREETEXT_KEYWORDS_READONLY=True
- """
- self.test_dataset = resource_manager.create(
- None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
- )
-
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
- self.client.login(username=self.not_admin.username, password="very-secret")
- with self.settings(FREETEXT_KEYWORDS_READONLY=True):
- response = self.client.post(url, data={"resource-keywords": "wonderful-keyword"})
- self.assertEqual(response.status_code, 401)
- self.assertEqual(response.content, b"Unauthorized: Cannot edit/create Free-text Keywords")
-
- def test_that_keyword_multiselect_is_enabled_for_non_admin_users_when_freetext_keywords_readonly_istrue(self):
- """
- Test that keyword multiselect widget is not disabled when the user is not an admin
- and FREETEXT_KEYWORDS_READONLY=False
- """
- self.test_dataset = resource_manager.create(
- None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
- )
-
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
-
- self.client.login(username=self.not_admin.username, password="very-secret")
- with self.settings(FREETEXT_KEYWORDS_READONLY=False):
- response = self.client.get(url)
- self.assertFalse(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
-
- response = self.client.get(reverse("dataset_embed", args=(self.layer.alternate,)))
- self.assertIsNotNone(response.context["resource"])
-
def test_that_only_users_with_permissions_can_view_maps_in_dataset_view(self):
"""
Test only users with view permissions to a map can view them in layer detail view
@@ -1111,20 +879,16 @@ def test_update_with_a_comma_in_title_is_replaced_by_undescore(self):
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
+ from geonode.metadata.manager import metadata_manager
- data = {
- "resource-title": "test,comma,2021",
- "resource-owner": self.test_dataset.owner.id,
- "resource-date": str(self.test_dataset.date),
- "resource-date_type": self.test_dataset.date_type,
- "resource-language": self.test_dataset.language,
- "dataset_attribute_set-TOTAL_FORMS": 0,
- "dataset_attribute_set-INITIAL_FORMS": 0,
- }
+ payload = metadata_manager.build_schema_instance(self.test_dataset)
+ payload["title"] = "test,comma,2021"
- url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=self.not_admin.username, password="very-secret")
- response = self.client.post(url, data=data)
+
+ url = reverse("metadata-schema_instance", args=(self.test_dataset.id,))
+ response = self.client.put(url, data=payload, content_type="application/json")
+
self.test_dataset.refresh_from_db()
self.assertEqual(self.test_dataset.title, "test_comma_2021")
self.assertEqual(response.status_code, 200)
@@ -1423,293 +1187,6 @@ def dummy_metadata_parser(exml, uuid, vals, regions, keywords, custom):
return uuid, vals, regions, keywords, custom
-class TestDatasetForm(GeoNodeBaseTestSupport):
- def setUp(self) -> None:
- self.user = get_user_model().objects.get(username="admin")
- self.user2 = get_user_model().objects.get_or_create(username="svenzwei")
-
- self.dataset = create_single_dataset("my_single_layer", owner=self.user)
- self.sut = DatasetForm
- self.time_form = DatasetTimeSerieForm
-
- def test_resource_form_is_invalid_extra_metadata_not_json_format(self):
- self.client.login(username="admin", password="admin")
- url = reverse("dataset_metadata", args=(self.dataset.alternate,))
- response = self.client.post(
- url,
- data={
- "resource-owner": self.dataset.owner.id,
- "resource-title": "layer_title",
- "resource-date": "2022-01-24 16:38 pm",
- "resource-date_type": "creation",
- "resource-language": "eng",
- "resource-extra_metadata": "not-a-json",
- },
- )
- expected = {
- "success": False,
- "errors": ["extra_metadata: The value provided for the Extra metadata field is not a valid JSON"],
- }
- self.assertDictEqual(expected, response.json())
-
- def test_change_owner_in_metadata(self):
- try:
- test_user = get_user_model().objects.create_user(
- username="non_auth", email="non_auth@geonode.org", password="password"
- )
- norman = get_user_model().objects.get(username="norman")
- dataset = Dataset.objects.first()
- data = {
- "resource-title": "geoapp_title",
- "resource-date": "2022-01-24 16:38 pm",
- "resource-date_type": "creation",
- "resource-language": "eng",
- "dataset_attribute_set-TOTAL_FORMS": 0,
- "dataset_attribute_set-INITIAL_FORMS": 0,
- }
- perm_spec = {
- "users": {
- "non_auth": [
- "change_resourcebase_metadata",
- "change_resourcebase",
- ],
- "norman": ["change_resourcebase_metadata", "change_resourcebase_permissions"],
- }
- }
- self.assertTrue(dataset.set_permissions(perm_spec))
- self.assertFalse(test_user.has_perm("change_resourcebase_permissions", dataset.get_self_resource()))
-
- url = reverse("dataset_metadata", args=(dataset.alternate,))
- # post as non-authorised user
- self.client.login(username="non_auth", password="password")
- data["resource-owner"] = test_user.id
- response = self.client.post(url, data=data)
- self.assertEqual(response.status_code, 200)
- self.assertNotEqual(dataset.owner, test_user)
- # post as admin
- self.client.login(username="admin", password="admin")
- response = self.client.post(url, data=data)
- dataset.refresh_from_db()
- self.assertEqual(response.status_code, 200)
- self.assertEqual(dataset.owner, test_user)
- # post as an authorised user
- self.client.login(username="norman", password="norman")
- self.assertTrue(norman.has_perm("change_resourcebase_permissions", dataset.get_self_resource()))
- data["resource-owner"] = norman.id
- response = self.client.post(url, data=data)
- dataset.refresh_from_db()
- self.assertEqual(response.status_code, 200)
- self.assertEqual(dataset.owner, norman)
- finally:
- get_user_model().objects.filter(username="non_auth").delete
- Dataset.objects.filter(name="dataset_name").delete()
-
- @override_settings(EXTRA_METADATA_SCHEMA={"key": "value"})
- def test_resource_form_is_invalid_extra_metadata_not_schema_in_settings(self):
- self.client.login(username="admin", password="admin")
- url = reverse("dataset_metadata", args=(self.dataset.alternate,))
- response = self.client.post(
- url,
- data={
- "resource-owner": self.dataset.owner.id,
- "resource-title": "layer_title",
- "resource-date": "2022-01-24 16:38 pm",
- "resource-date_type": "creation",
- "resource-language": "eng",
- "resource-extra_metadata": "[{'key': 'value'}]",
- },
- )
- expected = {
- "success": False,
- "errors": ["extra_metadata: EXTRA_METADATA_SCHEMA validation schema is not available for resource dataset"],
- }
- self.assertDictEqual(expected, response.json())
-
- def test_resource_form_is_invalid_extra_metadata_invalids_schema_entry(self):
- self.client.login(username="admin", password="admin")
- url = reverse("dataset_metadata", args=(self.dataset.alternate,))
- response = self.client.post(
- url,
- data={
- "resource-owner": self.dataset.owner.id,
- "resource-title": "layer_title",
- "resource-date": "2022-01-24 16:38 pm",
- "resource-date_type": "creation",
- "resource-language": "eng",
- "resource-extra_metadata": '[{"key": "value"},{"id": "int", "filter_header": "object", "field_name": "object", "field_label": "object", "field_value": "object"}]',
- },
- )
- expected = (
- "extra_metadata: Missing keys: 'field_label', 'field_name', 'field_value', 'filter_header' at index 0 "
- )
- self.assertIn(expected, response.json()["errors"][0])
-
- def test_resource_form_is_valid_extra_metadata(self):
- form = self.sut(
- instance=self.dataset,
- data={
- "owner": self.dataset.owner.id,
- "title": "layer_title",
- "date": "2022-01-24 16:38 pm",
- "date_type": "creation",
- "language": "eng",
- "extra_metadata": '[{"id": 1, "filter_header": "object", "field_name": "object", "field_label": "object", "field_value": "object"}]',
- },
- )
- self.assertTrue(form.is_valid())
-
- def test_dataset_time_form_should_work(self):
- attr, _ = Attribute.objects.get_or_create(
- dataset=self.dataset, attribute="field_date", attribute_type="xsd:dateTime"
- )
- self.dataset.attribute_set.add(attr)
- self.dataset.save()
- form = self.time_form(
- instance=self.dataset,
- data={
- "attribute": self.dataset.attributes.first().id,
- "end_attribute": "",
- "presentation": "DISCRETE_INTERVAL",
- "precision_value": 12345,
- "precision_step": "seconds",
- },
- )
- self.assertTrue(form.is_valid())
- self.assertDictEqual({}, form.errors)
-
- def test_dataset_time_form_should_work_with_date_attribute(self):
- attr, _ = Attribute.objects.get_or_create(
- dataset=self.dataset, attribute="field_date", attribute_type="xsd:date"
- )
- self.dataset.attribute_set.add(attr)
- self.dataset.save()
- form = self.time_form(
- instance=self.dataset,
- data={
- "attribute": self.dataset.attributes.first().id,
- "end_attribute": "",
- "presentation": "DISCRETE_INTERVAL",
- "precision_value": 12345,
- "precision_step": "seconds",
- },
- )
- self.assertTrue(form.is_valid())
- self.assertDictEqual({}, form.errors)
- expected_choises = [(None, "-----"), (self.dataset.attributes.first().id, "field_date")]
- actual_choices = form.fields.get("attribute").choices
- self.assertListEqual(expected_choises, actual_choices)
-
- def test_timeserie_raise_error_if_not_valid_attribute(self):
- attr, _ = Attribute.objects.get_or_create(
- dataset=self.dataset, attribute="field_date", attribute_type="xsd:string"
- )
- self.dataset.attribute_set.add(attr)
- self.dataset.save()
- form = self.time_form(
- instance=self.dataset,
- data={
- "attribute": self.dataset.attributes.first().id,
- "end_attribute": "",
- "presentation": "DISCRETE_INTERVAL",
- "precision_value": 12345,
- "precision_step": "seconds",
- },
- )
- self.assertFalse(form.is_valid())
- self.assertEqual(
- f"Select a valid choice. {self.dataset.attributes.first().id} is not one of the available choices.",
- form.errors.get("attribute")[0],
- )
- expected_choises = [(None, "-----")]
- actual_choices = form.fields.get("attribute").choices
- self.assertListEqual(expected_choises, actual_choices)
-
- def test_dataset_time_form_should_raise_error_if_invalid_payload(self):
- attr, _ = Attribute.objects.get_or_create(
- dataset=self.dataset, attribute="field_date", attribute_type="xsd:dateTime"
- )
- self.dataset.attribute_set.add(attr)
- self.dataset.save()
- form = self.time_form(
- instance=self.dataset,
- data={
- "attribute": self.dataset.attributes.first().id,
- "end_attribute": "",
- "presentation": "INVALID_PRESENTATION_VALUE",
- "precision_value": 12345,
- "precision_step": "seconds",
- },
- )
- self.assertFalse(form.is_valid())
- self.assertTrue("presentation" in form.errors)
- self.assertEqual(
- "Select a valid choice. INVALID_PRESENTATION_VALUE is not one of the available choices.",
- form.errors["presentation"][0],
- )
-
- def test_resource_form_is_valid_single_user_contact_role(self):
- """test if passing a single user to a contact role form is working"""
- users = get_user_model().objects.filter(username="svenzwei")
- cr = Roles.get_multivalue_ones()[0]
- form = self.sut(
- instance=self.dataset,
- data={
- "owner": self.dataset.owner.id,
- cr.name: [u.username for u in users],
- "title": "layer_title",
- "date": "2022-01-24 16:38 pm",
- "date_type": "creation",
- "language": "eng",
- "extra_metadata": '[{"id": 1, "filter_header": "object", "field_name": "object", "field_label": "object", "field_value": "object"}]',
- },
- )
- self.assertTrue(form.is_valid())
- self.assertEqual(list(form.cleaned_data[cr.name]), list(users))
-
- def test_resource_form_is_valid_multiple_user_contact_role_as_queryset(self):
- """test if passing a multiple user to a contact role form is working"""
- users = get_user_model().objects.filter(username__in=["svenzwei", "admin"])
- for cr in Roles.get_multivalue_ones():
- form = self.sut(
- instance=self.dataset,
- data={
- "owner": self.dataset.owner.id,
- cr.name: [u.username for u in users],
- "title": "layer_title",
- "date": "2022-01-24 16:38 pm",
- "date_type": "creation",
- "language": "eng",
- "extra_metadata": '[{"id": 1, "filter_header": "object", "field_name": "object", "field_label": "object", "field_value": "object"}]',
- },
- )
- self.assertTrue(form.is_valid())
- self.assertEqual(list(form.cleaned_data[cr.name]), list(users))
-
- def test_resource_form_is_invalid_with_incompleted_timeserie_data(self):
- self.client.login(username="admin", password="admin")
- url = reverse("dataset_metadata", args=(self.dataset.alternate,))
- response = self.client.post(
- url,
- data={
- "resource-owner": self.dataset.owner.id,
- "resource-title": "layer_title",
- "resource-date": "2022-01-24 16:38 pm",
- "resource-date_type": "creation",
- "resource-language": "eng",
- "resource-has_time": True,
- "dataset_attribute_set-TOTAL_FORMS": 0,
- "dataset_attribute_set-INITIAL_FORMS": 0,
- },
- )
- expected = {
- "success": False,
- "errors": [
- "The Timeseries configuration is invalid. Please select at least one option between the `attribute` and `end_attribute`, otherwise remove the 'has_time' flag"
- ],
- }
- self.assertDictEqual(expected, response.json())
-
-
class SetLayersPermissionsCommand(GeoNodeBaseTestSupport):
"""
Unittest to ensure that the management command "set_layers_permissions"
diff --git a/geonode/layers/urls.py b/geonode/layers/urls.py
index 883d00d7f5d..937b74e3ae6 100644
--- a/geonode/layers/urls.py
+++ b/geonode/layers/urls.py
@@ -29,26 +29,17 @@
urlpatterns = [
# 'geonode.layers.views',
- re_path(r"^upload_metadata$", views.dataset_metadata_upload, name="dataset_metadata_upload"),
re_path(r"^load_dataset_data$", views.load_dataset_data, name="load_dataset_data"),
- re_path(r"^(?P[^/]*)/metadata$", views.dataset_metadata, name="dataset_metadata"),
- re_path(
- r"^(?P[^/]*)/metadata_advanced$", views.dataset_metadata_advanced, name="dataset_metadata_advanced"
- ),
re_path(
r"^(?P[^/]*)/(?P[^/]*)/granule_remove$",
views.dataset_granule_remove,
name="dataset_granule_remove",
),
re_path(r"^(?P[^/]*)/get$", views.get_dataset, name="get_dataset"),
- re_path(r"^(?P[^/]*)/metadata_detail$", views.dataset_metadata_detail, name="dataset_metadata_detail"),
- re_path(r"^(?P[^/]*)/metadata_upload$", views.dataset_metadata_upload, name="dataset_metadata_upload"),
re_path(r"^(?P[^/]+)/embed$", views.dataset_embed, name="dataset_embed"),
- re_path(r"^(?P[^/]*)/style_upload$", views.dataset_sld_upload, name="dataset_sld_upload"),
re_path(
r"^(?P[^/]*)/feature_catalogue$", views.dataset_feature_catalogue, name="dataset_feature_catalogue"
),
- re_path(r"^metadata/batch/$", views.dataset_batch_metadata, name="dataset_batch_metadata"),
re_path(r"^(?P[^/]*)/dataset_download$", views.dataset_download, name="dataset_download"),
re_path(r"^", include("geonode.layers.api.urls")),
]
diff --git a/geonode/layers/views.py b/geonode/layers/views.py
index 965176f6bf5..addb81d4084 100644
--- a/geonode/layers/views.py
+++ b/geonode/layers/views.py
@@ -16,25 +16,21 @@
# along with this program. If not, see .
#
#########################################################################
-import re
import json
import decimal
import logging
-import warnings
import traceback
from owslib.wfs import WebFeatureService
from django.conf import settings
-from django.db.models import F
from django.http import Http404
from django.contrib import messages
from django.shortcuts import render
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import PermissionDenied
-from django.forms.models import inlineformset_factory
from django.template.response import TemplateResponse
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
@@ -42,29 +38,18 @@
from django.views.decorators.clickjacking import xframe_options_exempt
from geonode import geoserver
-from geonode.resource.manager import resource_manager
from geonode.base.auth import get_or_create_token
-from geonode.base.forms import CategoryForm, TKeywordForm, ThesaurusAvailableForm
-from geonode.base.views import batch_modify
-from geonode.base.models import Thesaurus, TopicCategory
-from geonode.decorators import check_keyword_write_perms
-from geonode.layers.forms import DatasetForm, DatasetTimeSerieForm, LayerAttributeForm
-from geonode.layers.models import Dataset, Attribute
+from geonode.layers.models import Dataset
from geonode.layers.utils import (
get_default_dataset_download_handler,
)
from geonode.services.models import Service
from geonode.base import register_event
-from geonode.monitoring.models import EventType
-from geonode.groups.models import GroupProfile
-from geonode.security.utils import get_user_visible_groups
-from geonode.people.forms import ProfileForm
-from geonode.utils import check_ogc_backend, llbbox_to_mercator, resolve_object
+from geonode.utils import check_ogc_backend, resolve_object
from geonode.geoserver.helpers import ogc_server_settings
-from geonode.security.registry import permissions_registry
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
- from geonode.geoserver.helpers import gs_catalog, get_time_info
+ from geonode.geoserver.helpers import gs_catalog
CONTEXT_LOG_FILE = ogc_server_settings.LOG_FILE
@@ -192,343 +177,6 @@ def dataset_feature_catalogue(request, layername, template="../../catalogue/temp
return render(request, template, context=context_dict, content_type="application/xml")
-@login_required
-@check_keyword_write_perms
-def dataset_metadata(
- request,
- layername,
- template="datasets/dataset_metadata.html",
- panel_template="layouts/panels.html",
- custom_metadata=None,
- ajax=True,
-):
- try:
- layer = _resolve_dataset(request, layername, "base.change_resourcebase_metadata", _PERMISSION_MSG_METADATA)
- except PermissionDenied as e:
- return HttpResponse(Exception(_("Not allowed"), e), status=403)
- except Exception as e:
- raise Http404(Exception(_("Not found"), e))
- if not layer:
- raise Http404(_("Not found"))
- dataset_attribute_set = inlineformset_factory(
- Dataset,
- Attribute,
- extra=0,
- form=LayerAttributeForm,
- )
- current_keywords = [keyword.name for keyword in layer.keywords.all()]
- topic_category = layer.category
-
- topic_thesaurus = layer.tkeywords.all()
-
- # Add metadata_author or poc if missing
- layer.add_missing_metadata_author_or_poc()
-
- # assert False, str(dataset_bbox)
- config = layer.attribute_config()
-
- # Add required parameters for GXP lazy-loading
- dataset_bbox = layer.bbox
- bbox = [float(coord) for coord in list(dataset_bbox[0:4])]
- if hasattr(layer, "srid"):
- config["crs"] = {"type": "name", "properties": layer.srid}
- config["srs"] = getattr(settings, "DEFAULT_MAP_CRS", "EPSG:3857")
- config["bbox"] = bbox if config["srs"] != "EPSG:3857" else llbbox_to_mercator([float(coord) for coord in bbox])
- config["title"] = layer.title
- config["queryable"] = True
-
- # Update count for popularity ranking,
- # but do not includes admins or resource owners
- if request.user != layer.owner and not request.user.is_superuser:
- Dataset.objects.filter(id=layer.id).update(popular_count=F("popular_count") + 1)
-
- if request.method == "POST":
- if layer.metadata_uploaded_preserve: # layer metadata cannot be edited
- out = {"success": False, "errors": METADATA_UPLOADED_PRESERVE_ERROR}
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
-
- thumbnail_url = layer.thumbnail_url
- dataset_form = DatasetForm(request.POST, instance=layer, prefix="resource", user=request.user)
-
- if not dataset_form.is_valid():
- logger.error(f"Dataset Metadata form is not valid: {dataset_form.errors}")
- out = {
- "success": False,
- "errors": [f"{x}: {y[0].messages[0]}" for x, y in dataset_form.errors.as_data().items()],
- }
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
- if not layer.thumbnail_url:
- layer.thumbnail_url = thumbnail_url
- attribute_form = dataset_attribute_set(
- request.POST,
- instance=layer,
- prefix="dataset_attribute_set",
- queryset=Attribute.objects.order_by("display_order"),
- )
- if not attribute_form.is_valid():
- logger.error(f"Dataset Attributes form is not valid: {attribute_form.errors}")
- out = {
- "success": False,
- "errors": [re.sub(re.compile("<.*?>"), "", str(err)) for err in attribute_form.errors],
- }
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
- category_form = CategoryForm(
- request.POST,
- prefix="category_choice_field",
- initial=(
- int(request.POST["category_choice_field"])
- if "category_choice_field" in request.POST and request.POST["category_choice_field"]
- else None
- ),
- )
- if not category_form.is_valid():
- logger.error(f"Dataset Category form is not valid: {category_form.errors}")
- out = {
- "success": False,
- "errors": [re.sub(re.compile("<.*?>"), "", str(err)) for err in category_form.errors],
- }
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
- if hasattr(settings, "THESAURUS"):
- tkeywords_form = TKeywordForm(request.POST)
- else:
- tkeywords_form = ThesaurusAvailableForm(request.POST, prefix="tkeywords")
- # set initial values for thesaurus form
- if not tkeywords_form.is_valid():
- logger.error(f"Dataset Thesauri Keywords form is not valid: {tkeywords_form.errors}")
- out = {
- "success": False,
- "errors": [re.sub(re.compile("<.*?>"), "", str(err)) for err in tkeywords_form.errors],
- }
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
-
- timeseries_form = DatasetTimeSerieForm(request.POST, instance=layer, prefix="timeseries")
- if not timeseries_form.is_valid():
- out = {
- "success": False,
- "errors": [f"{x}: {y[0].messages[0]}" for x, y in timeseries_form.errors.as_data().items()],
- }
- logger.error(f"{out.get('errors')}")
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
- elif (
- layer.has_time
- and timeseries_form.is_valid()
- and not timeseries_form.cleaned_data.get("attribute", "")
- and not timeseries_form.cleaned_data.get("end_attribute", "")
- ):
- out = {
- "success": False,
- "errors": [
- "The Timeseries configuration is invalid. Please select at least one option between the `attribute` and `end_attribute`, otherwise remove the 'has_time' flag"
- ],
- }
- logger.error(f"{out.get('errors')}")
- return HttpResponse(json.dumps(out), content_type="application/json", status=400)
- else:
- dataset_form = DatasetForm(instance=layer, prefix="resource", user=request.user)
- dataset_form.disable_keywords_widget_for_non_superuser(request.user)
- attribute_form = dataset_attribute_set(
- instance=layer, prefix="dataset_attribute_set", queryset=Attribute.objects.order_by("display_order")
- )
- category_form = CategoryForm(
- prefix="category_choice_field", initial=topic_category.id if topic_category else None
- )
-
- initial = {}
- if layer.supports_time and layer.has_time:
- initial = get_time_info(layer)
-
- timeseries_form = DatasetTimeSerieForm(instance=layer, prefix="timeseries", initial=initial)
-
- # Create THESAURUS widgets
- lang = settings.THESAURUS_DEFAULT_LANG if hasattr(settings, "THESAURUS_DEFAULT_LANG") else "en"
- if hasattr(settings, "THESAURUS") and settings.THESAURUS:
- warnings.warn(
- "The settings for Thesaurus has been moved to Model, \
- this feature will be removed in next releases",
- DeprecationWarning,
- )
- dataset_tkeywords = layer.tkeywords.all()
- tkeywords_list = ""
- if dataset_tkeywords and len(dataset_tkeywords) > 0:
- tkeywords_ids = dataset_tkeywords.values_list("id", flat=True)
- if hasattr(settings, "THESAURUS") and settings.THESAURUS:
- el = settings.THESAURUS
- thesaurus_name = el["name"]
- try:
- t = Thesaurus.objects.get(identifier=thesaurus_name)
- for tk in t.thesaurus.filter(pk__in=tkeywords_ids):
- tkl = tk.keyword.filter(lang=lang)
- if len(tkl) > 0:
- tkl_ids = ",".join(map(str, tkl.values_list("id", flat=True)))
- tkeywords_list += f",{tkl_ids}" if len(tkeywords_list) > 0 else tkl_ids
- except Exception:
- tb = traceback.format_exc()
- logger.error(tb)
- tkeywords_form = TKeywordForm(instance=layer)
- else:
- tkeywords_form = ThesaurusAvailableForm(prefix="tkeywords")
- # set initial values for thesaurus form
- for tid in tkeywords_form.fields:
- values = []
- values = [keyword.id for keyword in topic_thesaurus if int(tid) == keyword.thesaurus.id]
- tkeywords_form.fields[tid].initial = values
- if (
- request.method == "POST"
- and dataset_form.is_valid()
- and attribute_form.is_valid()
- and category_form.is_valid()
- and tkeywords_form.is_valid()
- and timeseries_form.is_valid()
- ):
- new_category = None
- if (
- category_form
- and "category_choice_field" in category_form.cleaned_data
- and category_form.cleaned_data["category_choice_field"]
- ):
- new_category = TopicCategory.objects.get(id=int(category_form.cleaned_data["category_choice_field"]))
-
- for form in attribute_form.cleaned_data:
- la = Attribute.objects.get(id=int(form["id"].id))
- la.description = form["description"]
- la.attribute_label = form["attribute_label"]
- la.visible = form["visible"]
- la.display_order = form["display_order"]
- la.featureinfo_type = form["featureinfo_type"]
- la.save()
-
- # update contact roles
- layer.set_contact_roles_from_metadata_edit(dataset_form)
- layer.save()
-
- new_keywords = current_keywords if request.keyword_readonly else dataset_form.cleaned_data["keywords"]
- new_regions = [x.strip() for x in dataset_form.cleaned_data["regions"]]
-
- layer.keywords.clear()
- if new_keywords:
- layer.keywords.add(*new_keywords)
- layer.regions.clear()
- if new_regions:
- layer.regions.add(*new_regions)
- layer.category = new_category
-
- dataset_form.save_linked_resources()
-
- register_event(request, EventType.EVENT_CHANGE_METADATA, layer)
- if not ajax:
- return HttpResponseRedirect(layer.get_absolute_url())
-
- message = layer.alternate
-
- try:
- if not tkeywords_form.is_valid():
- return HttpResponse(json.dumps({"message": "Invalid thesaurus keywords"}, status_code=400))
-
- thesaurus_setting = getattr(settings, "THESAURUS", None)
- if thesaurus_setting:
- tkeywords_data = tkeywords_form.cleaned_data["tkeywords"]
- tkeywords_data = tkeywords_data.filter(thesaurus__identifier=thesaurus_setting["name"])
- layer.tkeywords.set(tkeywords_data)
- elif Thesaurus.objects.all().exists():
- fields = tkeywords_form.cleaned_data
- layer.tkeywords.set(tkeywords_form.cleanx(fields))
-
- except Exception:
- tb = traceback.format_exc()
- logger.error(tb)
-
- vals = {}
- if "group" in dataset_form.changed_data:
- vals["group"] = dataset_form.cleaned_data.get("group")
- if any([x in dataset_form.changed_data for x in ["is_approved", "is_published"]]):
- vals["is_approved"] = dataset_form.cleaned_data.get("is_approved", layer.is_approved)
- vals["is_published"] = dataset_form.cleaned_data.get("is_published", layer.is_published)
-
- layer.has_time = dataset_form.cleaned_data.get("has_time", layer.has_time)
-
- if (
- layer.supports_time
- and timeseries_form.cleaned_data
- and ("has_time" in dataset_form.changed_data or timeseries_form.changed_data)
- ):
- ts = timeseries_form.cleaned_data
- end_attr = layer.attributes.get(pk=ts.get("end_attribute")).attribute if ts.get("end_attribute") else None
- start_attr = layer.attributes.get(pk=ts.get("attribute")).attribute if ts.get("attribute") else None
- resource_manager.exec(
- "set_time_info",
- None,
- instance=layer,
- time_info={
- "attribute": start_attr,
- "end_attribute": end_attr,
- "presentation": ts.get("presentation", None),
- "precision_value": ts.get("precision_value", None),
- "precision_step": ts.get("precision_step", None),
- "enabled": dataset_form.cleaned_data.get("has_time", False),
- },
- )
-
- resource_manager.update(
- layer.uuid,
- instance=layer,
- notify=True,
- vals=vals,
- extra_metadata=json.loads(dataset_form.cleaned_data["extra_metadata"]),
- )
-
- return HttpResponse(json.dumps({"message": message}))
-
- if not request.user.can_publish(layer):
- dataset_form.fields["is_published"].widget.attrs.update({"disabled": "true"})
- if not request.user.can_approve(layer):
- dataset_form.fields["is_approved"].widget.attrs.update({"disabled": "true"})
-
- # define contact role forms
- contact_role_forms_context = {}
- for role in layer.get_multivalue_role_property_names():
- dataset_form.fields[role].initial = [p.username for p in layer.__getattribute__(role)]
- role_form = ProfileForm(prefix=role)
- role_form.hidden = True
- contact_role_forms_context[f"{role}_form"] = role_form
-
- metadata_author_groups = get_user_visible_groups(request.user)
-
- register_event(request, "view_metadata", layer)
- return render(
- request,
- template,
- context={
- "resource": layer,
- "dataset": layer,
- "panel_template": panel_template,
- "custom_metadata": custom_metadata,
- "dataset_form": dataset_form,
- "attribute_form": attribute_form,
- "timeseries_form": timeseries_form,
- "category_form": category_form,
- "tkeywords_form": tkeywords_form,
- "preview": getattr(settings, "GEONODE_CLIENT_LAYER_PREVIEW_LIBRARY", "mapstore"),
- "crs": getattr(settings, "DEFAULT_MAP_CRS", "EPSG:3857"),
- "metadataxsl": getattr(settings, "GEONODE_CATALOGUE_METADATA_XSL", True),
- "freetext_readonly": getattr(settings, "FREETEXT_KEYWORDS_READONLY", False),
- "metadata_author_groups": metadata_author_groups,
- "TOPICCATEGORY_MANDATORY": getattr(settings, "TOPICCATEGORY_MANDATORY", False),
- "GROUP_MANDATORY_RESOURCES": getattr(settings, "GROUP_MANDATORY_RESOURCES", False),
- "UI_MANDATORY_FIELDS": list(
- set(getattr(settings, "UI_DEFAULT_MANDATORY_FIELDS", []))
- | set(getattr(settings, "UI_REQUIRED_FIELDS", []))
- ),
- **contact_role_forms_context,
- "UI_ROLES_IN_TOGGLE_VIEW": layer.get_ui_toggled_role_property_names(),
- },
- )
-
-
-@login_required
-def dataset_metadata_advanced(request, layername):
- return dataset_metadata(request, layername, template="datasets/dataset_metadata_advanced.html")
-
-
@csrf_exempt
def dataset_download(request, layername):
handler = get_default_dataset_download_handler()
@@ -609,68 +257,6 @@ def decimal_default(obj):
)
-def dataset_metadata_detail(request, layername, template="datasets/dataset_metadata_detail.html", custom_metadata=None):
- try:
- layer = _resolve_dataset(request, layername, "view_resourcebase", _PERMISSION_MSG_METADATA)
- except PermissionDenied:
- return HttpResponse(_("Not allowed"), status=403)
- except Exception:
- raise Http404(_("Not found"))
- if not layer:
- raise Http404(_("Not found"))
-
- group = None
- if layer.group:
- try:
- group = GroupProfile.objects.get(slug=layer.group.name)
- except GroupProfile.DoesNotExist:
- group = None
- site_url = settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL
-
- register_event(request, "view_metadata", layer)
- perms_list = permissions_registry.get_perms(instance=layer, user=request.user)
-
- return render(
- request,
- template,
- context={
- "resource": layer,
- "perms_list": perms_list,
- "group": group,
- "SITEURL": site_url,
- "custom_metadata": custom_metadata,
- },
- )
-
-
-def dataset_metadata_upload(request, layername, template="datasets/dataset_metadata_upload.html"):
- try:
- layer = _resolve_dataset(request, layername, "base.change_resourcebase", _PERMISSION_MSG_METADATA)
- except PermissionDenied:
- return HttpResponse(_("Not allowed"), status=403)
- except Exception:
- raise Http404(_("Not found"))
- if not layer:
- raise Http404(_("Not found"))
-
- site_url = settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL
- return render(request, template, context={"resource": layer, "layer": layer, "SITEURL": site_url})
-
-
-def dataset_sld_upload(request, layername, template="datasets/dataset_style_upload.html"):
- try:
- layer = _resolve_dataset(request, layername, "base.change_resourcebase", _PERMISSION_MSG_METADATA)
- except PermissionDenied:
- return HttpResponse(_("Not allowed"), status=403)
- except Exception:
- raise Http404(_("Not found"))
- if not layer:
- raise Http404(_("Not found"))
-
- site_url = settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL
- return render(request, template, context={"resource": layer, "dataset": layer, "SITEURL": site_url})
-
-
@xframe_options_exempt
def dataset_embed(request, layername):
try:
@@ -701,11 +287,6 @@ def dataset_embed(request, layername):
return TemplateResponse(request, "datasets/dataset_embed.html", context=context_dict)
-@login_required
-def dataset_batch_metadata(request):
- return batch_modify(request, "Dataset")
-
-
def dataset_view_counter(dataset_id, viewer):
_l = Dataset.objects.get(id=dataset_id)
_u = get_user_model().objects.get(username=viewer)
diff --git a/geonode/maps/admin.py b/geonode/maps/admin.py
index f7eeba34056..5e72fa03cae 100644
--- a/geonode/maps/admin.py
+++ b/geonode/maps/admin.py
@@ -24,7 +24,6 @@
from geonode.maps.models import Map, MapLayer
from geonode.base.admin import ResourceBaseAdminForm
-from geonode.base.admin import metadata_batch_edit
class MapLayerInline(admin.TabularInline):
@@ -81,7 +80,6 @@ class MapAdmin(TabbedTranslationAdmin):
)
readonly_fields = ("geographic_bounding_box",)
form = MapAdminForm
- actions = [metadata_batch_edit]
def delete_queryset(self, request, queryset):
"""
diff --git a/geonode/maps/forms.py b/geonode/maps/forms.py
deleted file mode 100644
index 3be536a79ab..00000000000
--- a/geonode/maps/forms.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#########################################################################
-#
-# Copyright (C) 2016 OSGeo
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#
-#########################################################################
-
-from geonode.maps.models import Map
-from geonode.base.forms import ResourceBaseForm, get_tree_data
-
-
-class MapForm(ResourceBaseForm):
- class Meta(ResourceBaseForm.Meta):
- model = Map
- exclude = ResourceBaseForm.Meta.exclude
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.fields["regions"].choices = get_tree_data()
- for field in self.fields:
- help_text = self.fields[field].help_text
- self.fields[field].help_text = None
- if help_text != "":
- self.fields[field].widget.attrs.update(
- {
- "class": "has-external-popover",
- "data-content": help_text,
- "placeholder": help_text,
- "data-placement": "right",
- "data-container": "body",
- "data-html": "true",
- }
- )
diff --git a/geonode/maps/templates/layouts/map_panels.html b/geonode/maps/templates/layouts/map_panels.html
deleted file mode 100644
index eb73fa8952f..00000000000
--- a/geonode/maps/templates/layouts/map_panels.html
+++ /dev/null
@@ -1,661 +0,0 @@
-{% load i18n %}
-{% load static %}
-{% load floppyforms %}
-{% load contact_roles %}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-{% block body_outer %}
-